2016-05-04 1 views
1

Ich habe einen Datensatz mit 10 Feld und 5000 Zeilen. Ich möchte diesen Datensatz mit einigen statistischen Methoden in Spark with Scala vervollständigen. Ich habe die leeren Zellen in einem Feld mit dem Mittelwert dieses Feldes gefüllt, wenn es aus kontinuierlichen Werten besteht, und ich gebe den häufigsten Wert in das Feld ein, wenn es aus diskreten Werten besteht. Hier ist mein Code:Wie kann ich die Leistung verbessern, wenn ich eine Tabelle mit statistischen Methoden in Apache-Spark ausfülle?

for(col <- cols){ 

    val datacount = table.select(col).rdd.map(r => r(0)).filter(_ == null).count()  

    if(datacount > 0) 
    {  
    if (continuous_lst contains col)    // put mean of data to null values 
    {    
     var avg = table.select(mean(col)).first()(0).asInstanceOf[Double]  
     df = df.na.fill(avg, Seq(col))    
    } 

    else if(discrete_lst contains col)   // put most frequent categorical value to null values 
    { 
     val group_df = df.groupBy(col).count() 
     val sorted = group_df.orderBy(desc("count")).take(1) 

     val most_frequent = sorted.map(t => t(0)) 
     val most_frequent_ = most_frequent(0).toString.toDouble.toInt 

     val type__ = ctype.filter(t => t._1 == col) 
     val type_ = type__.map(t => t._2) 

     df = df.na.fill(most_frequent_, Seq(col)) 
     } 

    } 
    } 

Das Problem ist, dass dieser Code sehr langsam mit diesen Daten arbeitet. Ich verwende spark-submit mit executor memory 8G Parameter. Und ich benutze parpartition (4) Parameter vor dem Senden der Daten an diese Funktion.

Ich sollte größere Datenmengen arbeiten. Wie kann ich diesen Code beschleunigen?

Danke für Ihre Hilfe. Hier

Antwort

2

ist ein Vorschlag:

import org.apache.spark.sql.funcitons._ 

def most_frequent(df: DataFrame, col: Column) = { 
    df.select(col).map { case Row(colVal) => (colVal, 1) } 
    .reduceByKey(_ + _) 
    .reduce({case ((val1, cnt1), (val2, cnt2)) => if (cnt1 > cnt2) (val1, cnt1) else (val2, cnt2)})._1 
} 

val new_continuous_cols = continuous_lst.map { 
    col => coalesce(col, mean(col)).as(col.toString) 
}.toArray 

val new_discrete_cols = discrete_lst.map { 
    col => coalesce(col, lit(most_frequent(table, col)).as(col.toString)) 
}.toArray 

val all_new_cols = new_continuous_cols ++ new_discrete_cols 
val newDF = table.select(all_new_cols: _*) 

Überlegungen:

  • ich davon aus, dass continuous_lst und discrete_lst sind Listen von Column. Wenn sie Listen von String sind, ist die Idee die gleiche, aber einige Anpassungen sind notwendig;
  • Beachten Sie, dass ich map und reduce verwendet habe, um den häufigsten Wert einer Spalte zu berechnen. Das kann in einigen Fällen besser als Gruppieren und Aggregieren sein. (Vielleicht gibt es hier Raum für Verbesserungen, indem die häufigsten Werte für alle diskreten Spalten gleichzeitig berechnet werden);
  • Zusätzlich habe ich coalesce anstelle von fill NULL-Werte zu ersetzen. Dies kann auch zu einer gewissen Verbesserung führen. (Weitere Informationen über die coalesce Funktion in der scaladoc API);
  • Ich kann im Moment nicht testen, also könnte etwas fehlen, das ich nicht gesehen habe.
Verwandte Themen