2016-04-14 19 views
3

Ich habe ein Datenrahmen von Format wie untenFunken: Match Spalten aus zwei Datenrahmen

+---+---+------+---+ 
| sp|sp2|colour|sp3| 
+---+---+------+---+ 
| 0| 1|  1| 0| 
| 1| 0|  0| 1| 
| 0| 0|  1| 0| 
+---+---+------+---+ 

eine andere Datenrahmen-Koeffizienten für jede Spalte in der ersten Datenrahmen enthält. zum Beispiel

+------+------+---------+------+ 
| CE_sp|CE_sp2|CE_colour|CE_sp3| 
+------+------+---------+------+ 
| 0.94| 0.31|  0.11| 0.72| 
+------+------+---------+------+ 

Jetzt möchte ich eine Spalte zur ersten Datenrahmen hinzuzufügen, die durch Zugabe von Noten aus dem zweiten Datenrahmen berechnet wird.

für ex.

+---+---+------+---+-----+ 
| sp|sp2|colour|sp3|Score| 
+---+---+------+---+-----+ 
| 0| 1|  1| 0| 0.42| 
| 1| 0|  0| 1| 1.66| 
| 0| 0|  1| 0| 0.11| 
+---+---+------+---+-----+ 

d.h

r -> row of first dataframe 
score = r(0)*CE_sp + r(1)*CE_sp2 + r(2)*CE_colour + r(3)*CE_sp3 

Es kann n Anzahl der Spalten und die Reihenfolge der Spalten sein können unterschiedlich sein.

Vielen Dank im voraus !!!

+1

so Ihr zweiter Datenrahmen enthält tatsächlich 1 Reihe mit 4 Werten? – eliasah

+0

@eliasah-Werte können zunehmen, aber keine der Zeilen bleibt immer 1 im zweiten Datenrahmen. – nareshbabral

+1

Sie brauchen eigentlich keinen zweiten DataFrame – eliasah

Antwort

4

Schnell und einfach:

import org.apache.spark.sql.functions.col 

val df = Seq(
    (0, 1, 1, 0), (1, 0, 0, 1), (0, 0, 1, 0) 
).toDF("sp","sp2", "colour", "sp3") 

val coefs = Map("sp" -> 0.94, "sp2" -> 0.32, "colour" -> 0.11, "sp3" -> 0.72) 
val score = df.columns.map(
    c => col(c) * coefs.getOrElse(c, 0.0)).reduce(_ + _) 

df.withColumn("score", score) 

Und das Gleiche in PySpark:

from pyspark.sql.functions import col 

df = sc.parallelize([ 
    (0, 1, 1, 0), (1, 0, 0, 1), (0, 0, 1, 0) 
]).toDF(["sp","sp2", "colour", "sp3"]) 

coefs = {"sp": 0.94, "sp2": 0.32, "colour": 0.11, "sp3": 0.72} 
df.withColumn("score", sum(col(c) * coefs.get(c, 0) for c in df.columns)) 
+0

Danke es dient dem Zweck – nareshbabral

1

Ich glaube, dass es viele Möglichkeiten gibt zu erreichen, was Sie versuchen zu tun. In allen Fällen brauchen Sie diesen zweiten DataFrame nicht, wie ich in den Kommentaren gesagt habe.

Hier ist eine Möglichkeit:

import org.apache.spark.ml.feature.{ElementwiseProduct, VectorAssembler} 
import org.apache.spark.mllib.linalg.{Vectors,Vector => MLVector} 

val df = Seq((0, 1, 1, 0), (1, 0, 0, 1), (0, 0, 1, 0)).toDF("sp", "sp2", "colour", "sp3") 

// Your coefficient represents a dense Vector 
val coeffSp = 0.94 
val coeffSp2 = 0.31 
val coeffColour = 0.11 
val coeffSp3 = 0.72 

val weightVectors = Vectors.dense(Array(coeffSp, coeffSp2, coeffColour, coeffSp3)) 

// You can assemble the features with VectorAssembler 
val assembler = new VectorAssembler() 
    .setInputCols(df.columns) // since you need to compute on all your columns 
    .setOutputCol("features") 

// Once these features assembled we can perform an element wise product with the weight vector 
val output = assembler.transform(df) 
val transformer = new ElementwiseProduct() 
    .setScalingVec(weightVectors) 
    .setInputCol("features") 
    .setOutputCol("weightedFeatures") 

// Create an UDF to sum the weighted vectors values 
import org.apache.spark.sql.functions.udf 
def score = udf((score: MLVector) => { score.toDense.toArray.sum }) 

// Apply the UDF on the weightedFeatures 
val scores = transformer.transform(output).withColumn("score",score('weightedFeatures)) 
scores.show 
// +---+---+------+---+-----------------+-------------------+-----+ 
// | sp|sp2|colour|sp3|   features| weightedFeatures|score| 
// +---+---+------+---+-----------------+-------------------+-----+ 
// | 0| 1|  1| 0|[0.0,1.0,1.0,0.0]|[0.0,0.31,0.11,0.0]| 0.42| 
// | 1| 0|  0| 1|[1.0,0.0,0.0,1.0]|[0.94,0.0,0.0,0.72]| 1.66| 
// | 0| 0|  1| 0| (4,[2],[1.0])|  (4,[2],[0.11])| 0.11| 
// +---+---+------+---+-----------------+-------------------+-----+ 

Ich hoffe, das hilft. Zögern Sie nicht, wenn Sie weitere Fragen haben.

+0

Hallo. In der Zeile ".setInputCols (df.columns) // da Sie auf alle Ihre Spalten berechnen müssen", ist es möglich, nur einige Spalten anstelle von allen auszuwählen? – user1384205

+0

Ich bin mir nicht sicher, ob ich Ihre Frage bekomme. – eliasah

+0

Ich habe ein df mit String und int Spalten. Ich möchte eine Spalte für gewichtete Features aller int-Spalten hinzufügen. Im obigen Fall werden alle Spalten (sp, sp2, color, sp3) verwendet, um Gewichtungen zu berechnen. In meinem Fall möchte ich nur ein paar Spalten auswählen, um die Gewichtsspalte zu berechnen. Ist das möglich? Ich habe versucht, etwas wie diese Val-Felder: Array [String] = Array ("Sat1", "Sat2", "Sat3", "Sat4", "Sat5", "Sat6", "Sat7", "Sat8", "Sat9" , "sAtt10") val assembler = neuer VectorAssembler(). setInputCols (Felder) .setOutputCol ("features") – user1384205

1

ist hier eine einfache Lösung:

scala> df_wght.show 
+-----+------+---------+------+ 
|ce_sp|ce_sp2|ce_colour|ce_sp3| 
+-----+------+---------+------+ 
| 1|  2|  3|  4| 
+-----+------+---------+------+ 

scala> df.show 
+---+---+------+---+ 
| sp|sp2|colour|sp3| 
+---+---+------+---+ 
| 0| 1|  1| 0| 
| 1| 0|  0| 1| 
| 0| 0|  1| 0| 
+---+---+------+---+ 

Dann können wir tun, nur ein einfaches Kreuz beitreten und crossproduct.

val scored = df.join(df_wght).selectExpr("(sp*ce_sp + sp2*ce_sp2 + colour*ce_colour + sp3*ce_sp3) as final_score") 

Der Ausgang:

scala> scored.show 
+-----------+                 
|final_score| 
+-----------+ 
|   5| 
|   5| 
|   3| 
+-----------+ 
Verwandte Themen