2017-02-08 4 views
0

Um mit Spaltennamen meiner DataFrame arbeiten zu können, ohne die . zu entkommen, brauche ich eine Funktion zu "validieren" alle Spaltennamen - aber keine der Methoden, die ich versuchte, macht den Job in a rechtzeitig (Ich bin nach 5 Minuten abbrechen).Scala Spark: Performance-Problem Umbenennen große Anzahl von Spalten

Der Dataset, auf dem ich meine Algorithmen versuche, ist das golub-Dataset (get it here). Es ist eine 2,2 MB CSV-Datei mit 7200 Spalten. alle Spalten umbenennen sollte eine Sache von Sekunden

-Code sein, um die CSV in

var dfGolub = spark.read 
    .option("header", "true") 
    .option("inferSchema", "true") 
    .csv("golub_merged.csv") 
    .drop("_c0") // drop the first column 
    .repartition(numOfCores) 

Versuche zu lesen Spalten umbenennen:

def validifyColumnnames1(df : DataFrame) : DataFrame = { 
    import org.apache.spark.sql.functions.col 
    val cols = df.columns 
    val colsRenamed = cols.map(name => col(name).as(name.replaceAll("\\.",""))) 
    df.select(colsRenamed : _*) 
} 


def validifyColumnnames2[T](df : Dataset[T]) : DataFrame = { 
    val newColumnNames = ArrayBuffer[String]() 
    for(oldCol <- df.columns) { 
     newColumnNames += oldCol.replaceAll("\\.","") 
    } 
    df.toDF(newColumnNames : _*) 
} 

def validifyColumnnames3(df : DataFrame) : DataFrame = { 
    var newDf = df 
    for(col <- df.columns){ 
     newDf = newDf.withColumnRenamed(col,col.replaceAll("\\.","")) 
    } 
    newDf 
} 

Irgendwelche Ideen, was dieses Leistungsproblem verursacht?

Setup: Ich bin mit Spark-2.1.0 auf Ubuntu 16.04 in local[24] Modus auf einer Maschine mit 16cores * 2 Threads und 96 GB RAM

+6

lesen Sie Daten ohne Spaltennamen als RDD, dann lesen Sie nur Spaltennamen als Schema. Kombiniere Schema und RDD, um deinen DF zu erhalten. – toofrellik

Antwort

2

Vorausgesetzt, dass Sie die Typen kennen, können Sie einfach das Schema erstellen statt infering es (schließt das Schema Kosten Leistung und könnte sogar falsch für CSV sein).

Lässt der Einfachheit halber annehmen, dass Sie die Datei Beispiel.csv haben wie folgt:

A.B, A.C, A.D 
a,3,1 

Sie etwas tun können:

val scehma = StructType(Seq(StructField("A_B",StringType),StructField("A_C", IntegerType), StructField("AD", IntegerType))) 
val df = spark.read.option("header","true").schema(scehma).csv("example.csv") 
df.show() 

+---+---+---+ 
|A_B|A_C| AD| 
+---+---+---+ 
| a| 3| 1| 
+---+---+---+ 

Wenn Sie die Informationen im Voraus nicht wissen, können Sie Verwenden Sie das Schema inferent wie zuvor. Dann können Sie das Schema mithilfe des Datenrahmens generieren:

und der Datagramm erneut lesen mit diesem Schema wie zuvor

Verwandte Themen