2016-04-19 7 views
2

Ich versuche, Programmierung mit Dataframes zu lernen. Mit dem folgenden Code versuche ich zwei CSV in einer Spalte zu verbinden und dann als eine kombinierte CSV zu speichern. Wenn ich diesen Code in SCALA IDE ausführe, sehe ich fast 200 kleine Teildateien. Könnten Sie mir bitte helfen zu verstehen, was falsch läuft hier-Datenframe Speichern nach Join erstellt zahlreiche Teildateien

import org.apache.spark.SparkContext 

object JoinData { 
    def main(args: Array[String]) { 
    val sc = new SparkContext(args(0), "Csv Joining example")  
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    val df1 = sqlContext.load("com.databricks.spark.csv", Map("path" -> args(1), "header" -> "true")) 
    val df2 = sqlContext.load("com.databricks.spark.csv", Map("path" -> args(2), "header" -> "true")) 
    import org.apache.spark.sql.functions._ 
    val df_join = df1.join(df2, df1("Dept") === df2("dept"), "inner") 
    df_join.repartition(1) //This is also not helping 
    //Below line is generating 200 part files in Output_join folder 
    df_join.save("Output_join","com.databricks.spark.csv", org.apache.spark.sql.SaveMode.Overwrite) 

    } 
} 

Programm Argumente - lokalen src/main/resources/emp.csv src/main/resources/dept.csv

CSV-Daten verwendet werden

empId,empName,Dept,salution 
111,ABC,sales,mr 
112,ABC,it,mr 
113,ABC,tech,mr 
114,ABC,sales,mr 
115,ABC,sales,mr 
116,ABC,it,mr 
117,ABC,tech,mr 

dept,name 
sales,Sales of Cap 
it,Internal Training 
tech,Tech staff 
support,support services 

Konsolenausgabe

[Stage 4:>              (2 + 1)/200] 
[Stage 4:=>              (4 + 1)/200] 
[Stage 4:=>              (6 + 1)/200] 
[Stage 4:==>              (8 + 1)/200] 
[Stage 4:===>             (11 + 1)/200] 
[Stage 4:===>             (14 + 1)/200] 
[Stage 4:====>             (17 + 1)/200] 
[Stage 4:=====>             (19 + 1)/200] 
[Stage 4:=====>             (21 + 1)/200] 
[Stage 4:======>             (24 + 1)/200] 
[Stage 4:=======>            (26 + 1)/200] 
[Stage 4:=======>            (28 + 1)/200] 
[Stage 4:========>            (30 + 1)/200] 
[Stage 4:========>            (32 + 1)/200] 
[Stage 4:=========>            (34 + 1)/200] 
[Stage 4:==========>            (37 + 1)/200] 
[Stage 4:===========>           (40 + 1)/200] 
[Stage 4:============>           (43 + 1)/200] 
[Stage 4:============>           (46 + 1)/200] 
[Stage 4:=============>           (49 + 1)/200] 
[Stage 4:==============>           (52 + 1)/200] 
[Stage 4:===============>          (55 + 1)/200] 
[Stage 4:================>          (58 + 1)/200] 
[Stage 4:=================>          (61 + 1)/200] 
[Stage 4:=================>          (64 + 1)/200] 
[Stage 4:==================>          (67 + 1)/200] 
[Stage 4:===================>         (69 + 1)/200] 
[Stage 4:====================>         (72 + 1)/200] 
[Stage 4:=====================>         (75 + 1)/200] 
[Stage 4:=====================>         (78 + 1)/200] 
[Stage 4:======================>         (81 + 1)/200] 
[Stage 4:=======================>        (84 + 1)/200] 
[Stage 4:========================>        (87 + 1)/200] 
[Stage 4:=========================>        (90 + 1)/200] 
[Stage 4:=========================>        (92 + 1)/200] 
[Stage 4:==========================>        (95 + 1)/200] 
[Stage 4:===========================>       (98 + 1)/200] 
[Stage 4:===========================>       (101 + 1)/200] 
[Stage 4:============================>       (104 + 1)/200] 
[Stage 4:=============================>       (107 + 1)/200] 
[Stage 4:==============================>      (110 + 1)/200] 
[Stage 4:===============================>      (113 + 1)/200] 
[Stage 4:===============================>      (116 + 1)/200] 
[Stage 4:================================>      (119 + 1)/200] 
[Stage 4:=================================>      (122 + 1)/200] 
[Stage 4:=================================>      (123 + 1)/200] 
[Stage 4:==================================>     (126 + 1)/200] 
[Stage 4:===================================>     (129 + 1)/200] 
[Stage 4:====================================>     (132 + 1)/200] 
[Stage 4:=====================================>     (135 + 1)/200] 
[Stage 4:=====================================>     (138 + 1)/200] 
[Stage 4:======================================>    (140 + 1)/200] 
[Stage 4:======================================>    (141 + 1)/200] 
[Stage 4:=======================================>    (144 + 1)/200] 
[Stage 4:========================================>    (148 + 1)/200] 
[Stage 4:=========================================>    (151 + 1)/200] 
[Stage 4:==========================================>   (154 + 1)/200] 
[Stage 4:==========================================>   (156 + 2)/200] 
[Stage 4:===========================================>   (159 + 1)/200] 
[Stage 4:============================================>   (161 + 1)/200] 
[Stage 4:============================================>   (162 + 1)/200] 
[Stage 4:=============================================>   (164 + 1)/200] 
[Stage 4:=============================================>   (165 + 1)/200] 
[Stage 4:==============================================>  (168 + 1)/200] 
[Stage 4:===============================================>  (171 + 1)/200] 
[Stage 4:===============================================>  (174 + 1)/200] 
[Stage 4:================================================>  (177 + 1)/200] 
[Stage 4:=================================================>  (180 + 1)/200] 
[Stage 4:==================================================> (183 + 1)/200] 
[Stage 4:===================================================> (186 + 1)/200] 
[Stage 4:===================================================> (189 + 1)/200] 
[Stage 4:=====================================================> (193 + 1)/200] 
[Stage 4:=====================================================> (196 + 1)/200] 
[Stage 4:======================================================>(199 + 1)/200] 

Antwort

0

Der Grund für mehrere Dateien ist, weil die Berechnung verteilt ist. Um es einfach auszudrücken, die Anzahl der Ausgabedateien entspricht der Anzahl der Partitionen in Ihrem Dataframe/RDD. Um die Anzahl der Partitionen zu ändern, können Sie eine Neupartitionierung oder Koaleszenz auf einem Dataframe/RDD aufrufen. 'repartition' wird die ursprünglichen Partitionen mischen und dann neu partitionieren, während 'coalesce' wird nur Original-Partitionen mit der neuen Anzahl von Partitionen kombinieren.

val resultDF = df_join.coalesce(10) 

Konfigurieren Sie den Wert innerhalb coalesce nach Ihren Vorgaben. Ich benutzte coalesce anstelle von Aufteilung ist, weil shuffling (getan von Aufteilung) könnte sehr teuer sein. Wenn Sie nur die Anzahl der Partitionen reduzieren möchten, ist Koaleszieren der beste Weg.

+0

Vielen Dank für Ihre Antwort. df_join.coalesce (10) ist im Dataframe nicht erlaubt. Und die Aufteilung hilft nicht. Spark generiert automatisch 200 Partitionen –

+0

Welche Version von Spark verwenden Sie? – dheee

Verwandte Themen