Erste - seit LabeledPoint
einen Vektor von Double
s erwartet, ich nehme an, Sie wollen auch durch Doppelpunkt (:
) in jedem features
Array jedes Element geteilt, und behandeln die rechte Seite davon, wie die Doppel, zB:
"d90_pv_1sec:1.4471580313422192" --> 1.4471580313422192
Wenn ja - hier ist die Transformation:
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
// sample data - DataFrame with label, features and other columns
val df = Seq(
(1, Array("d90_pv_1sec:1.4471580313422192", "d3_pv_1sec:0.9030899869919435"), 4.0),
(2, Array("d7_pv_1sec:0.9030899869919435", "d30_pv_1sec:1.414973347970818"), 5.0)
).toDF("label", "features", "ignored")
// extract relevant fields from Row and convert WrappedArray[String] into Vector:
val result = df.rdd.map(r => {
val label = r.getAs[Int]("label")
val featuresArray = r.getAs[mutable.WrappedArray[String]]("features")
val features: Vector = Vectors.dense(
featuresArray.map(_.split(":")(1).toDouble).toArray
)
LabeledPoint(label, features)
})
result.foreach(println)
// (1.0,[1.4471580313422192,0.9030899869919435])
// (2.0,[0.9030899869919435,1.414973347970818])
EDIT: pro Klarstellung nun jedes Element in dem Eingangsfeld unter der Annahme, enthält die erwartete Index in einem resultierenden sparse vector:
"d90_pv_1sec:1.4471580313422192" --> index = 90; value = 1.4471580313422192
Der modifizierte Code wäre:
val vectorSize: Int = 100 // just a guess - should be the maximum index + 1
val result = df.rdd.map(r => {
val label = r.getAs[Int]("label")
val arr = r.getAs[mutable.WrappedArray[String]]("features").toArray
// parse each item into (index, value) tuple to use in sparse vector
val elements = arr.map(_.split(":")).map {
case Array(s, d) => (s.replaceAll("d|_pv_1sec","").toInt, d.toDouble)
}
LabeledPoint(label, Vectors.sparse(vectorSize, elements))
})
result.foreach(println)
// (1.0,(100,[3,90],[0.9030899869919435,1.4471580313422192]))
// (2.0,(100,[7,30],[0.9030899869919435,1.414973347970818]))
NOTE : Die Verwendung von s.replaceAll("d|_pv_1sec","")
ist möglicherweise etwas langsam, da für jedes Element ein regulärer Ausdruck separat kompiliert wird. Wenn das der Fall ist, kann es durch das schnellere (noch hässlichere) s.replace("d", "").replace("_pv_1sec", "")
ersetzt werden, das keine regulären Ausdrücke verwendet.
danke für die Antwort! Es klappt! –
er ... Ich möchte ein Etikett und ein spärliches Feature-Vektor-Format wie: LabeledPoint (Label, Vectors.sparse (3, Array ("d90_pv_1sec", "d3_pv_1sec"), Array (1.4471580313422192, 0.9030899869919435))) –
Sie können t - 'LabeledPoint.features' hat den Typ' org.apache.spark.mllib.linalg.Vector', der notwendigerweise ein Vektor von 'Doubles' und nicht von Strings/Arrays ist - siehe https://github.com/apache/spark /blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala#L41 –