5

Ich habe einen Datenrahmen, der wie Folge aussieht:Spark, Dataframe: apply Transformator/Schätzer auf Gruppen

+-----------+-----+------------+ 
|  userID|group| features| 
+-----------+-----+------------+ 
|12462563356| 1| [5.0,43.0]| 
|12462563701| 2| [1.0,8.0]| 
|12462563701| 1| [2.0,12.0]| 
|12462564356| 1| [1.0,1.0]| 
|12462565487| 3| [2.0,3.0]| 
|12462565698| 2| [1.0,1.0]| 
|12462565698| 1| [1.0,1.0]| 
|12462566081| 2| [1.0,2.0]| 
|12462566081| 1| [1.0,15.0]| 
|12462566225| 2| [1.0,1.0]| 
|12462566225| 1| [9.0,85.0]| 
|12462566526| 2| [1.0,1.0]| 
|12462566526| 1| [3.0,79.0]| 
|12462567006| 2| [11.0,15.0]| 
|12462567006| 1| [10.0,15.0]| 
|12462567006| 3| [10.0,15.0]| 
|12462586595| 2| [2.0,42.0]| 
|12462586595| 3| [2.0,16.0]| 
|12462589343| 3| [1.0,1.0]| 
+-----------+-----+------------+ 

Sind die Spalten-Typen sind: Benutzer-ID: Lang, Gruppe: Int und Features: Vektor.

Dies ist bereits ein gruppierter Datenrahmen, d. H. Eine Benutzer-ID wird maximal einmal in einer bestimmten Gruppe angezeigt.

Mein Ziel ist es, die Spalte pro Gruppe zu skalieren.

Gibt es eine Möglichkeit ein feature transformer anzuwenden (in meinem Fall möchte ich eine StandardScaler anwenden) pro Gruppe statt es auf die volle Datenrahmen angewendet wird.

P.S. die Verwendung von ML ist nicht obligatorisch, also kein Problem, wenn die Lösung auf MLlib basiert.

+0

Wie planen Sie den StandardScaler? Auf jeder Gruppe? – eliasah

+0

Ich möchte jede Dimension des Features Vektor pro Gruppe skalieren. – Rami

+1

AFAIK es nicht, aber Sie können immer alle Operationen direkt anwenden. Scaler arbeitet sowieso mit RDDs, also ist es nur eine Frage der Datenstatistik und der Transformation pro Gruppe. – zero323

Antwort

5

Sie können Statistiken Gruppe berechnen fast den gleichen Code wie Standard mit Scaler:

import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer 
import org.apache.spark.mllib.linalg.{Vector, Vectors} 
import org.apache.spark.sql.Row 

// Compute Multivariate Statistics 
val summary = data.select($"group", $"features") 
    .rdd 
    .map { 
     case Row(group: Int, features: Vector) => (group, features) 
    } 
    .aggregateByKey(new MultivariateOnlineSummarizer)(/* Create an empty new MultivariateOnlineSummarizer */ 
     (agg, v) => agg.add(v), /* seqOp : Add a new sample Vector to this summarizer, and update the statistical summary. */ 
     (agg1, agg2) => agg1.merge(agg2)) /* combOp : As MultivariateOnlineSummarizer accepts a merge action with another MultivariateOnlineSummarizer, and update the statistical summary. */ 
    .mapValues { 
     s => (
     s.variance.toArray.map(math.sqrt(_)), /* compute the square root variance for each key */ 
     s.mean.toArray /* fetch the mean for each key */ 
    ) 
    }.collectAsMap 

Wenn erwartete Anzahl von Gruppen ist relativ gering Sie diese übertragen können:

val summaryBd = sc.broadcast(summary) 

und Ihre Daten umwandeln :

val scaledRows = df.map{ case Row(userID, group: Int, features: Vector) => 
    val (stdev, mean) = summaryBd.value(group) 
    val vs = features.toArray.clone() 
    for (i <- 0 until vs.size) { 
    vs(i) = if(stdev(i) == 0.0) 0.0 else (vs(i) - mean(i)) * (1/stdev(i)) 
    } 
    Row(userID, group, Vectors.dense(vs)) 
} 
val scaledDf = sqlContext.createDataFrame(scaledRows, df.schema) 

Ansonsten können Sie einfach beitreten. Es sollte nicht schwierig sein, dies als ML-Transformator mit Gruppenspalte als Parameter zu umhüllen.

+1

Dies ist eine ausgezeichnete Antwort! – eliasah

Verwandte Themen