2017-01-02 5 views
0

Spark-SQL ist mir ziemlich klar. Allerdings beginne ich gerade mit der RDD-API von spark. Wie spark apply function to columns in parallel sollte mich langsam schlurft bekommen erlaubt dies weist darauf hin, los fürFunken convert Funken SQL zu RDD API

def handleBias(df: DataFrame, colName: String, target: String = this.target) = { 
    val w1 = Window.partitionBy(colName) 
    val w2 = Window.partitionBy(colName, target) 

    df.withColumn("cnt_group", count("*").over(w2)) 
     .withColumn("pre2_" + colName, mean(target).over(w1)) 
     .withColumn("pre_" + colName, coalesce(min(col("cnt_group")/col("cnt_foo_eq_1")).over(w1), lit(0D))) 
     .drop("cnt_group") 
    } 
} 

In Pseudo-Code: df foreach column (handleBias(column) So ein minimaler Datenrahmen bis

geladen wird
val input = Seq(
    (0, "A", "B", "C", "D"), 
    (1, "A", "B", "C", "D"), 
    (0, "d", "a", "jkl", "d"), 
    (0, "d", "g", "C", "D"), 
    (1, "A", "d", "t", "k"), 
    (1, "d", "c", "C", "D"), 
    (1, "c", "B", "C", "D") 
) 
    val inputDf = input.toDF("TARGET", "col1", "col2", "col3TooMany", "col4") 

aber richtig

zur Karte schlägt fehl
val rdd1_inputDf = inputDf.rdd.flatMap { x => {(0 until x.size).map(idx => (idx, x(idx)))}} 
     rdd1_inputDf.toDF.show 

Es scheitert mit

java.lang.ClassNotFoundException: scala.Any 
java.lang.ClassNotFoundException: scala.Any 

Ein Beispiel kann in dieser Frage dargelegt https://github.com/geoHeil/sparkContrastCoding jeweils https://github.com/geoHeil/sparkContrastCoding/blob/master/src/main/scala/ColumnParallel.scala für das Problem zu finden.

Antwort

2

Wenn Sie .rdd auf einem DataFrame aufrufen, erhalten Sie eine RDD[Row], die nicht stark typisiert ist. Wenn Sie über die Elemente abbilden können, wollen Sie Muster Spiel über brauchen:

scala> val input = Seq(
    |  (0, "A", "B", "C", "D"), 
    |  (1, "A", "B", "C", "D"), 
    |  (0, "d", "a", "jkl", "d"), 
    |  (0, "d", "g", "C", "D"), 
    |  (1, "A", "d", "t", "k"), 
    |  (1, "d", "c", "C", "D"), 
    |  (1, "c", "B", "C", "D") 
    | ) 
input: Seq[(Int, String, String, String, String)] = List((0,A,B,C,D), (1,A,B,C,D), (0,d,a,jkl,d), (0,d,g,C,D), (1,A,d,t,k), (1,d,c,C,D), (1,c,B,C,D)) 

scala> val inputDf = input.toDF("TARGET", "col1", "col2", "col3TooMany", "col4") 
inputDf: org.apache.spark.sql.DataFrame = [TARGET: int, col1: string ... 3 more fields] 

scala> import org.apache.spark.sql.Row 
import org.apache.spark.sql.Row 

scala> val rowRDD = inputDf.rdd 
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at rdd at <console>:27 

scala> val typedRDD = rowRDD.map{case Row(a: Int, b: String, c: String, d: String, e: String) => (a,b,c,d,e)} 
typedRDD: org.apache.spark.rdd.RDD[(Int, String, String, String, String)] = MapPartitionsRDD[20] at map at <console>:29 

scala> typedRDD.keyBy(_._1).groupByKey.foreach{println} 
[Stage 7:>               (0 + 0)/4] 
(0,CompactBuffer((A,B,C,D), (d,a,jkl,d), (d,g,C,D))) 
(1,CompactBuffer((A,B,C,D), (A,d,t,k), (d,c,C,D), (c,B,C,D))) 

Ansonsten können Sie ein Dataset getippt verwenden:

scala> val ds = input.toDS 
ds: org.apache.spark.sql.Dataset[(Int, String, String, String, String)] = [_1: int, _2: string ... 3 more fields] 

scala> ds.rdd 
res2: org.apache.spark.rdd.RDD[(Int, String, String, String, String)] = MapPartitionsRDD[8] at rdd at <console>:30 

scala> ds.rdd.keyBy(_._1).groupByKey.foreach{println} 
[Stage 0:>               (0 + 0)/4] 
(0,CompactBuffer((0,A,B,C,D), (0,d,a,jkl,d), (0,d,g,C,D))) 
(1,CompactBuffer((1,A,B,C,D), (1,A,d,t,k), (1,d,c,C,D), (1,c,B,C,D))) 
+1

Während ich dies in einem ml verwenden möchten .Pipeline und der Ausgabeschritt ist Datenrahmen die das „Schema verloren“ zB Ich muss Mustererkennung verwenden? ist das richtig? Aber es gibt eine ganze Reihe von Spalten ist es eine Möglichkeit zu „schließen“, um ihr etwas (teilweise shcema? –

+0

Ja die 'DF => RDD' Konvertierung das Schema nicht überhaupt leider verwenden (und ich glaube nicht, gibt es Ein guter Weg, um seine Verwendung zu erzwingen. Aber sehen Sie sich mein neues 'Dataset' Beispiel an: Es besteht keine Notwendigkeit, einen Intermediate' Dataframe' zu ​​verwenden und es sieht so aus als ob 'DataSet' die Typen gut leitet (in Spark 2.0) alles, was Sie mit einem DF tun könnte auch mit einem DS) –

+0

@GeorgHeiler (nicht sicher, ob Sie von ^^^^) –