2016-11-17 2 views
0

Ich möchte eine Spark-SQL Datenrahmen wie folgt zu transformieren: wie dieseSpark-SQL Dataframe Transformation beteiligt Partitionierung und hinken

animal value 
------------ 
cat  8 
cat  5 
cat  6 
dog  2 
dog  4 
dog  3 
rat  7 
rat  4 
rat  9 

in einen Datenrahmen:

animal value previous-value 
----------------------------- 
cat  8    0 
cat  5    8 
cat  6    5 
dog  2    0 
dog  4    2 
dog  3    4 
rat  7    0 
rat  4    7 
rat  9    4 

Ich Art von animal partitionieren möchten und dann für jede animal, previous-value Verzögerung eine Zeile hinter value (mit einem Standardwert von 0), und legen Sie die Partitionen wieder zusammen.

+0

Ich habe keine Zeit es jetzt zu versuchen, aber 1) verlassen Sie sich nicht auf Datenrahmen bestellen, fügen Sie eine explizite 'index' Spalte und 2) versuchen Sie "partitionieren" mit "Tier" und verwenden Sie dann "mapPartitions", um Ihre Zeilenverschiebung durchzuführen. Es wird wahrscheinlich nicht schön sein. –

Antwort

2

Dies erreicht eine window function Verwendung werden kann.

import org.apache.spark.sql.expressions.Window 
import sqlContext.implicits._ 

val df = sc.parallelize(Seq(("cat", 8, "01:00"),("cat", 5, "02:00"),("cat", 6, "03:00"),("dog", 2, "02:00"),("dog", 4, "04:00"),("dog", 3, "06:00"),("rat", 7, "01:00"),("rat", 4, "03:00"),("rat", 9, "05:00"))).toDF("animal", "value", "time") 

df.show 
+------+-----+-----+ 
|animal|value| time| 
+------+-----+-----+ 
| cat| 8|01:00| 
| cat| 5|02:00| 
| cat| 6|03:00| 
| dog| 2|02:00| 
| dog| 4|04:00| 
| dog| 3|06:00| 
| rat| 7|01:00| 
| rat| 4|03:00| 
| rat| 9|05:00| 
+------+-----+-----+ 

Ich habe ein "Zeit" -Feld hinzugefügt, um orderBy zu veranschaulichen.

val w1 = Window.partitionBy($"animal").orderBy($"time") 

val previous_value = lag($"value", 1).over(w1) 
val df1 = df.withColumn("previous", previous_value) 

df1.show 
+------+-----+-----+--------+             
|animal|value| time|previous| 
+------+-----+-----+--------+ 
| dog| 2|02:00| null| 
| dog| 4|04:00|  2| 
| dog| 3|06:00|  4| 
| cat| 8|01:00| null| 
| cat| 5|02:00|  8| 
| cat| 6|03:00|  5| 
| rat| 7|01:00| null| 
| rat| 4|03:00|  7| 
| rat| 9|05:00|  4| 
+------+-----+-----+--------+ 

Wenn Sie NULL-Werte mit 0 ersetzt werden soll:

val df2 = df1.na.fill(0) 
df2.show 
+------+-----+-----+--------+ 
|animal|value| time|previous| 
+------+-----+-----+--------+ 
| dog| 2|02:00|  0| 
| dog| 4|04:00|  2| 
| dog| 3|06:00|  4| 
| cat| 8|01:00|  0| 
| cat| 5|02:00|  8| 
| cat| 6|03:00|  5| 
| rat| 7|01:00|  0| 
| rat| 4|03:00|  7| 
| rat| 9|05:00|  4| 
+------+-----+-----+--------+ 
+0

Ich würde vorsichtig sein, wenn ich '' String ''sortiere, bin ich mir ziemlich sicher, dass der Funke sie lexikographisch sortiert (also' "12" 'wäre weniger als' "2" 'was nicht ist) gewünscht). –

+0

'scala>" 12 "<" 2 " res0: Boolean = true' –

+1

Guter Punkt, @ evan058. Im wirklichen Leben hätte ich einen Zeitstempel benutzt. – sjstanley

1

würde dieses Stückchen Code arbeiten:

val df = spark.read.format("CSV").option("header","true").load("/home/shivansh/Desktop/foo.csv") 
val df2 = df.groupBy("animal").agg(collect_list("value") as "listValue") 
val desiredDF = df2.rdd.flatMap{row=> 
     val animal=row.getAs[String]("animal") 
     val valueList=row.getAs[Seq[String]]("listValue").toList 
     val newlist=valueList zip "0"::valueList 
     newlist.map(a=>(animal,a._1,a._2)) 
    }.toDF("animal","value","previousValue") 

Auf der Spark-Shell:

scala> val df=spark.read.format("CSV").option("header","true").load("/home/shivansh/Desktop/foo.csv") 
df: org.apache.spark.sql.DataFrame = [animal: string, value: string] 

scala> df.show() 
+------+-----+ 
|animal|value| 
+------+-----+ 
| cat| 8| 
| cat| 5| 
| cat| 6| 
| dog| 2| 
| dog| 4| 
| dog| 3| 
| rat| 7| 
| rat| 4 | 
| rat| 9| 
+------+-----+ 


scala> val df2=df.groupBy("animal").agg(collect_list("value") as "listValue") 
df2: org.apache.spark.sql.DataFrame = [animal: string, listValue: array<string>] 

scala> df2.show() 
+------+----------+ 
|animal| listValue| 
+------+----------+ 
| rat|[7, 4 , 9]| 
| dog| [2, 4, 3]| 
| cat| [8, 5, 6]| 
+------+----------+ 


scala> val desiredDF=df2.rdd.flatMap{row=> 
    | val animal=row.getAs[String]("animal") 
    | val valueList=row.getAs[Seq[String]]("listValue").toList 
    | val newlist=valueList zip "0"::valueList 
    | newlist.map(a=>(animal,a._1,a._2)) 
    | }.toDF("animal","value","previousValue") 
desiredDF: org.apache.spark.sql.DataFrame = [animal: string, value: string ... 1 more field] 

scala> desiredDF.show() 
+------+-----+-------------+              
|animal|value|previousValue| 
+------+-----+-------------+ 
| rat| 7|   0| 
| rat| 4 |   7| 
| rat| 9|   4 | 
| dog| 2|   0| 
| dog| 4|   2| 
| dog| 3|   4| 
| cat| 8|   0| 
| cat| 5|   8| 
| cat| 6|   5| 
+------+-----+-------------+ 
Verwandte Themen