2016-10-01 1 views
0

Wie Zeilen in Spalten ohne Pivot mit RDD oder Datenrahmen zur Umsetzung ...Funken: Reihen zu Colimns (Art Transponieren oder Pivot)

SessionId,date,orig, dest, legind, nbr 

1 9/20/16,abc0,xyz0,o,1 
1 9/20/16,abc1,xyz1,o,2 
1 9/20/16,abc2,xyz2,i,3 
1 9/20/16,abc3,xyz3,i,4 

So mag ich wie neues Schema generieren:

SessionId,date,orig1, orig2, orig3, orig4, dest1, dest2, dest3,dest4 

1,9/20/16,abc0,abc1,null, null, xyz0,xyz1, null, null 

Logik ist, wenn:

  • NBR- 1 ist und o = Legind dann Wert orig1 (aus Zeile 1 fetch) ...

  • nbr 3 und Legind = i dann DEST1 Wert (Fetch von Zeile 3)

So, wie die Zeilen zu Spalten transponieren ...

Jede Idee geschätzt wird groß sein.

mit unter Option versucht, aber es ist einfach alles .. in einreihigem abflachen

val keys = List("SessionId"); 
val selectFirstValueOfNoneGroupedColumns = 
    df.columns 
    .filterNot(keys.toSet) 
    .map(_ -> "first").toMap 
val grouped = 
    df.groupBy(keys.head, keys.tail: _*) 
    .agg(selectFirstValueOfNoneGroupedColumns).show() 
+0

Weclome bis SO. bitte verbringen Sie etwas Zeit auf [Hilfe] (http://Stackoverflow.com/help/how-to-ask), um clariry zu bekommen, wie reif Sie Fragen stellen und ordentlich formatieren können –

+0

Ja, es ist einfach und es hat bereits eine Antwort! Folge dem Link, den ich angegeben habe. –

+0

@TheArchetypalPaul: Der Link, den Sie angegeben haben, ist eine andere Frage und Vorgehensweise. – Ankur

Antwort

1

Es ist relativ einfach, wenn Sie pivot Funktion verwenden. Zunächst lässt einen Datensatz, wie die in Ihrer Frage erstellen:

import org.apache.spark.sql.functions.{concat, first, lit, when} 

val df = Seq(
    ("1", "9/20/16", "abc0", "xyz0", "o", "1"), 
    ("1", "9/20/16", "abc1", "xyz1", "o", "2"), 
    ("1", "9/20/16", "abc2", "xyz2", "i", "3"), 
    ("1", "9/20/16", "abc3", "xyz3", "i", "4") 
).toDF("SessionId", "date", "orig", "dest", "legind", "nbr") 

dann definieren und fügen Sie Spalten Helfer:

// This will be the column name 
val key = when($"legind" === "o", concat(lit("orig"), $"nbr")) 
      .when($"legind" === "i", concat(lit("dest"), $"nbr")) 

// This will be the value 
val value = when($"legind" === "o", $"orig")  // If o take origin 
       .when($"legind" === "i", $"dest") // If i take dest 

val withKV = df.withColumn("key", key).withColumn("value", value) 

Dieses in einem DataFrame wie dies zur Folge haben wird:

+---------+-------+----+----+------+---+-----+-----+ 
|SessionId| date|orig|dest|legind|nbr| key|value| 
+---------+-------+----+----+------+---+-----+-----+ 
|  1|9/20/16|abc0|xyz0|  o| 1|orig1| abc0| 
|  1|9/20/16|abc1|xyz1|  o| 2|orig2| abc1| 
|  1|9/20/16|abc2|xyz2|  i| 3|dest3| xyz2| 
|  1|9/20/16|abc3|xyz3|  i| 4|dest4| xyz3| 
+---------+-------+----+----+------+---+-----+-----+ 

Als nächstes definieren wir eine Liste möglicher Ebenen:

val levels = Seq("orig", "dest").flatMap(x => (1 to 4).map(y => s"$x$y")) 

und schließlich pivot

val result = withKV 
    .groupBy($"sessionId", $"date") 
    .pivot("key", levels) 
    .agg(first($"value", true)).show 

Und die result ist:

+---------+-------+-----+-----+-----+-----+-----+-----+-----+-----+ 
|sessionId| date|orig1|orig2|orig3|orig4|dest1|dest2|dest3|dest4| 
+---------+-------+-----+-----+-----+-----+-----+-----+-----+-----+ 
|  1|9/20/16| abc0| abc1| null| null| null| null| xyz2| xyz3| 
+---------+-------+-----+-----+-----+-----+-----+-----+-----+-----+