2016-12-28 2 views
1

Ich habe eine Produktinformationsdatei mit mehr als Millionen Datensätze. Die CSV-Datei sieht folgendermaßen aus: So ersetzen Sie den Spalteninhalt mithilfe von Funken

 
    Product CategoryName SalesUnit Other Columns... 
     p1   a12    41
p2 x5 72
p3 x5 69
p4 c21 80
p5 b16 59
p6 x5 75 .. .. ..
Und ich habe eine Zuordnungsdatei (CategoryCode < -> CategoryName) wie folgt. Die Zuordnungsdatei hat rund 200 Datensätze:
 
    CategoryCode CategoryName
1.0 a12
2.0 b13 3.0 b16 4.0 c12
5.0 c21
6.0 x5
.. ..
Schließlich möchte ich Wert von Kategorie mit CategoryCode ersetzen:
 
    Product Category SalesUnit Other Colulmns.. 
    p1   1.0   41
p2 6.0 72
p3 6.0 69
p4 5.0 80
p5 3.0 59
p6 6.0 75 .. .. ..
Mein Ansatz ist es, die UDF von Funkendatenrahmen zu verwenden:
 
    udf { (CategoryName: String) => 
     if (CategoryName.trim() == "a12") 1.0 
     else if (CategoryName.trim() == "b13") 2.0 
     else if (CategoryName.trim() == "b16") 3.0 
     else if (CategoryName.trim() == "c12") 4.0 
     else if (CategoryName.trim() == "c21") 5.0 
     else if (CategoryName.trim() == "x5") 6.0 
     else if (CategoryName.trim() == "z12") 7.0 
     else if (...) ... 
     ... ... 
     else 999.0 
    } 
Jede andere elegante Ansatz den Austausch zu erreichen, ohne durch Codierung so viele wenn ... else-Klausel? Vielen Dank.

Antwort

3

Verbinden Sie die Zuordnungsdatei mit dem csv auf getrimmten Kategorie dann nur die Felder, die Sie

+0

Danke, Ihr Ansatz ist besser als meins. :) –

2

benötigen auswählen können Sie sowohl den auf dem Kategorienamen Datenrahmen verbinden und dann das Category selbst fallen, wie Sie es nicht danach benötigt werden.

Sie können etwas tun:

scala> //Can have more columns , have taken just these columns just to demonstrate 

scala> val df1=sc.parallelize(Seq(("p1","a12",41),("p2","x5",72),("p3","x5",69))).toDF("Product","CategoryName","SalesUnit") 
df1: org.apache.spark.sql.DataFrame = [Product: string, CategoryName: string ... 1 more field] 

scala> //Category code dataFrame 

scala> val df2=sc.parallelize(Seq((1.0,"a12"),(4.0,"c12"),(5.0,"c21"),(6.0,"x5"))).toDF("CategoryCode","CategoryName") 
df2: org.apache.spark.sql.DataFrame = [CategoryCode: double, CategoryName: string] 

scala> val resultDF=df1.join(df2,"CategoryName").withColumnRenamed("CategoryCode","Category").drop("CategoryName") 
resultDF: org.apache.spark.sql.DataFrame = [Product: string, SalesUnit: int ... 1 more field] 

scala> resultDF.show() 
+-------+---------+--------+              
|Product|SalesUnit|Category| 
+-------+---------+--------+ 
|  p1|  41|  1.0| 
|  p2|  72|  6.0| 
|  p3|  69|  6.0| 
+-------+---------+--------+ 

P. S: Dies ist nur eine kleine Demonstration ist.

+0

Vielen Dank für Ihre Antwort. Ihre Demonstration ist sehr hilfreich für mich. Ich akzeptiere die Antwort von Arnon Rotem-Gal-Oz, da er schnell reagiert hat. –

+0

@ JeromeLi: Und ich habe es aufgewertet! –

Verwandte Themen