2016-03-19 3 views
0

Ich habe eine CSV-Datei, die in diesem Format von Daten besteht:Sortieren CSV-Datei mit Schlüsseln in Apache Spark-

id, name, surname, morecolumns 
5, John, Lok,  more 
2, John2, Lok2,  more 
1, John3, Lok3,  more 
etc.. 

Ich mag meine CSV-Datei sortieren die ID als Schlüssel und speichern Sie die sortierten Ergebnisse in einem anderen Datei.

Was ich bisher gemacht habe, um JavaPairs von (id, rest_of_line) zu erstellen.

SparkConf conf = new SparkConf().setAppName.....; 

    JavaSparkContext sc = new JavaSparkContext(conf); 

    JavaRDD<String> file = sc.textFile("inputfile.csv"); 

    // extract the header 
    JavaRDD<String> lines = file.filter(s -> !s.equals(header)); 

    // create JavaPairs 
    JavaPairRDD<Integer, String> pairRdd = lines.mapToPair(
     new PairFunction<String, Integer, String>() { 
     public Tuple2<Integer, String> call(final String line) { 


      String str = line.split(",", 2)[0]; 
      String str2 = line.split(",", 2)[1]; 
      int id = Integer.parseInt(str); 

      return new Tuple2(id, str2); 
     } 
    }); 

    // sort and save the output 
    pairRdd.sortByKey(true, 1); 
    pairRdd.coalesce(1).saveAsTextFile("sorted.csv"); 

Dies funktioniert in Fällen, in denen ich kleine Dateien habe. Wenn ich jedoch größere Dateien verwende, wird die Ausgabe nicht richtig sortiert. Ich denke, dies passiert, weil die Sortierprozedur auf verschiedenen Knoten stattfindet, so dass die Zusammenführung aller Prozeduren von allen Knoten nicht die erwartete Ausgabe ergibt.

So ist die Frage, wie kann ich meine CSV-Datei mit der ID als Schlüssel sortieren und die sortierten Ergebnisse in einer anderen Datei speichern.

+1

Nicht 'koaleszieren (1)'? Es ist ein guter Kandidat für den nutzlosesten Methodenaufruf in Spark. – zero323

+0

Nun, wenn ich es entferne, teilt es zuerst die Ergebnisse in Teile auf, während ich sie in einer Datei haben will, und zweitens sind sie immer noch nicht sortiert. – Spithas

+1

Der Bereich "sortByKey" partitioniert die Daten (jede Ausgabedatei hat einen bestimmten Bereich) von Werten und sortiert die Werte für jede Partition (jede Datei ist sortiert). Die Verschmelzung zu einer Datei ist in Gattungen nutzlos, weil: sie belastet eine einzelne Maschine, wobei verteilte Dateisystemdaten mehrfach übertragen werden - zu Knoten zu kohle und von Knoten zu verteilt. Wenn die Daten klein genug sind, um es durchführbar zu machen, und Sie wirklich eine einzige Datei benötigen, ist das Sammeln oder Zusammenführen eine bessere Wahl. – zero323

Antwort

0

Die Methode coalesce ist wahrscheinlich die eine Schuld, da es anscheinend nicht vertraglich die Bestellung oder die resultierende RDD garantiert (siehe Which operations preserve RDD order?). Also, wenn Sie solche coalesce vermeiden, werden die resultierenden Ausgabedateien geordnet. Da Sie eine eindeutige csv Datei möchten, könnten Sie die Ergebnisse von jedem Dateisystem erhalten, das Sie verwenden, aber auf ihre tatsächliche Reihenfolge achten, und sie zusammenführen. Wenn Sie beispielsweise HDFS (wie von @PinoSan angegeben) verwenden, können Sie dies mit dem Befehl hdfs dfs -getmerge <hdfs-output-dir> <local-file.csv> tun.

0

Wie von @mauriciojost gezeigt, sollten Sie nicht coalesce tun. Stattdessen ist dies besser pairRdd.sortByKey(true,pairRdd.getNumPartitions()).saveAsTextFile(path), so dass die maximal mögliche Arbeit auf Partitionen, die Daten enthalten, ausgeführt wird.