2016-08-16 5 views
-1

Ich habe eine Eingabe rdd (JavaRDD<List<String>>) und ich möchte es in JavaRDD<String> als Ausgabe konvertieren.
Jedes Element der Eingabe-RDD-Liste sollte ein einzelnes Element in der Ausgabe rdd werden.RDD-Liste in RDD von einzelnen Element in Funken konvertieren

wie es in Java zu erreichen?

JavaRDD<List<String>> input; //suppose rdd length is 2 
input.saveAsTextFile(...) 

Ausgang:

[a, b]
[c, d]

, was ich will:

a
b
c
d

+0

Nein, Flatmap würde nicht funktionieren, weil es wieder Liste der Elemente haben wird und wenn ich es speichern wird nicht jedes Element in einer separaten Zeile gespeichert. Meine Eingabe RDD wird gleichmäßig von Flatmap ausgegeben. –

+0

Du solltest nicht so unhöflich sein, ich habe versucht zu helfen. Übrigens, ich habe nicht abgestimmt, ob du das gedacht hast. Aber das ist es, was Sie wollen, aber in der schönen "Scala". [Ein veröffentlichtes Notizbuch] (https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2485090270202665/382256499319578/8589256059752547/latest.html) –

+0

Ich bin neu zu funken und entschuldige mich, wenn mein Kommentar unhöflich klingt . –

Antwort

0

ich ein workarou tat nd unter Verwendung von Code-Schnipsel:

Concat jedes Element der Liste mit Trennzeichen '\n' dann speichern RDD mit Standard-Funke-API.

inputRdd.map(new Function<List<String>, String>() { 
      @Override 
      public String call(List<String> scores) throws Exception { 
       int size = scores.size(); 
       StringBuffer sb = new StringBuffer(); 
       for (int i=0; i <size;i++){ 
        sb.append(scores.get(i)); 
        if(i!=size-1){ 
         sb.append("\n"); 
        } 
       } 
       return sb.toString(); 
      } 
     }).saveAsTextFile("/tmp/data")); 
0

Konvertieren Sie es in einen DataFrame und verwenden Sie die Funktion "Explode UDF".

+0

bitte Codebeispiel hinzufügen. Danke, –

-1

unten Ihr Problem

var conf = new SparkConf().setAppName("test") 
      .setMaster("local[1]") 
      .setExecutorEnv("executor-cores", "2") 
var sc = new SparkContext(conf) 
val a = sc.parallelize(Array(List("a", "b"), List("c", "d"))) 
a.flatMap(x => x).foreach(println) 

Ausgang lösen:

ein
b
c
d

+0

meine Dateigröße ist sehr groß, und ich möchte RDD in Datei speichern. –

+0

von flatpMap-Funktion kann ich jeden Datensatz in Datei schreiben? –

0

Wenn der rdd Typ RDD[List[String]] ist, können Sie dies nur tun:

val newrdd = rdd.flatmap(line => line)

Jedes der Elemente wird eine neue Zeile in der neuen rdd sein.

Verwandte Themen