2016-12-29 1 views
2

Ich bin neu in Scala und ich möchte Dataframe in rdd konvertieren. Lassen Sie die Label, Funktionen umwandeln in RDD[labelPoint] für die Eingabe von MLlib. Aber ich kann den Weg nicht finden, mit WrappedArray umzugehen.Konvertieren von Spark-Datenframe (mit WrappedArray) zu RDD [labelPoint] in der Scala

scala> test.printSchema 
root 
|-- user_id: long (nullable = true) 
|-- brand_store_sn: string (nullable = true) 
|-- label: integer (nullable = true) 
|-- money_score: double (nullable = true) 
|-- normal_score: double (nullable = true) 
|-- action_score: double (nullable = true) 
|-- features: array (nullable = true) 
| |-- element: string (containsNull = true) 
|-- flag: string (nullable = true) 
|-- dt: string (nullable = true) 


scala> test.head 
res21: org.apache.spark.sql.Row = [2533,10005072,1,2.0,1.0,1.0,WrappedArray(["d90_pv_1sec:1.4471580313422192", "d3_pv_1sec:0.9030899869919435", "d7_pv_1sec:0.9030899869919435", "d30_pv_1sec:1.414973347970818", "d90_pv_week_decay:1.4235871662780681", "d1_pv_1sec:0.9030899869919435", "d120_pv_1sec:1.4471580313422192"]),user_positive,20161130] 

Antwort

1

Erste - seit LabeledPoint einen Vektor von Double s erwartet, ich nehme an, Sie wollen auch durch Doppelpunkt (:) in jedem features Array jedes Element geteilt, und behandeln die rechte Seite davon, wie die Doppel, zB:

"d90_pv_1sec:1.4471580313422192" --> 1.4471580313422192 

Wenn ja - hier ist die Transformation:

import org.apache.spark.mllib.linalg.{Vector, Vectors} 
import org.apache.spark.mllib.regression.LabeledPoint 

// sample data - DataFrame with label, features and other columns 
val df = Seq(
    (1, Array("d90_pv_1sec:1.4471580313422192", "d3_pv_1sec:0.9030899869919435"), 4.0), 
    (2, Array("d7_pv_1sec:0.9030899869919435", "d30_pv_1sec:1.414973347970818"), 5.0) 
).toDF("label", "features", "ignored") 

// extract relevant fields from Row and convert WrappedArray[String] into Vector: 
val result = df.rdd.map(r => { 
    val label = r.getAs[Int]("label") 
    val featuresArray = r.getAs[mutable.WrappedArray[String]]("features") 
    val features: Vector = Vectors.dense(
    featuresArray.map(_.split(":")(1).toDouble).toArray 
) 
    LabeledPoint(label, features) 
}) 

result.foreach(println) 
// (1.0,[1.4471580313422192,0.9030899869919435]) 
// (2.0,[0.9030899869919435,1.414973347970818]) 

EDIT: pro Klarstellung nun jedes Element in dem Eingangsfeld unter der Annahme, enthält die erwartete Index in einem resultierenden sparse vector:

"d90_pv_1sec:1.4471580313422192" --> index = 90; value = 1.4471580313422192 

Der modifizierte Code wäre:

val vectorSize: Int = 100 // just a guess - should be the maximum index + 1 

val result = df.rdd.map(r => { 
    val label = r.getAs[Int]("label") 
    val arr = r.getAs[mutable.WrappedArray[String]]("features").toArray 
    // parse each item into (index, value) tuple to use in sparse vector 
    val elements = arr.map(_.split(":")).map { 
    case Array(s, d) => (s.replaceAll("d|_pv_1sec","").toInt, d.toDouble) 
    } 
    LabeledPoint(label, Vectors.sparse(vectorSize, elements)) 
}) 

result.foreach(println) 
// (1.0,(100,[3,90],[0.9030899869919435,1.4471580313422192])) 
// (2.0,(100,[7,30],[0.9030899869919435,1.414973347970818])) 

NOTE : Die Verwendung von s.replaceAll("d|_pv_1sec","") ist möglicherweise etwas langsam, da für jedes Element ein regulärer Ausdruck separat kompiliert wird. Wenn das der Fall ist, kann es durch das schnellere (noch hässlichere) s.replace("d", "").replace("_pv_1sec", "") ersetzt werden, das keine regulären Ausdrücke verwendet.

+0

danke für die Antwort! Es klappt! –

+0

er ... Ich möchte ein Etikett und ein spärliches Feature-Vektor-Format wie: LabeledPoint (Label, Vectors.sparse (3, Array ("d90_pv_1sec", "d3_pv_1sec"), Array (1.4471580313422192, 0.9030899869919435))) –

+0

Sie können t - 'LabeledPoint.features' hat den Typ' org.apache.spark.mllib.linalg.Vector', der notwendigerweise ein Vektor von 'Doubles' und nicht von Strings/Arrays ist - siehe https://github.com/apache/spark /blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala#L41 –

Verwandte Themen