1

Ich möchte von Spark-Streaming zu paar elastischen Suchindizes speichern. Ich erstelle Paare von <key(index), value>, wenn ich groupByKey das Ergebnis ist Tuple von <key(index), Iterable<value>> ausführen, aber um Elasticsearch mit elasticsearch-spark Plugin zu speichern brauche ich die Werte als JavaRDD<value>.Convert iterable to RDD

Ich weiß, dass es eine Option von sparkContext.parallelize (Liste) gibt, die JavaRDD aus Liste erstellt, aber das kann nur auf dem Treiber ausgeführt werden.

Gibt es eine andere Option zum Erstellen von JavaRDD, die auf dem Executor ausgeführt werden kann? Oder eine andere Möglichkeit, die ich erreichen kann Tuple2<key(index), JavaRDD<value>>, die auf Executor funktioniert? Wenn nicht wie kann ich nur den Schalter von Iterator zu JavaRDD auf Treiber und das Plugin schreiben, um Elasticsearch am Executor zu schreiben?

Danke,

Daniela

+0

Ehm wäre , AFAIK, 'groupByKey' ergibt ein' JavaPairRDD > 'was immer noch ein' rdd' ist . Jede weitere Verarbeitung des "rdd" wird somit auf den Executoren und nicht auf dem Treiber ausgeführt. –

Antwort

1

Ich würde sagen, dass es möglich sein muss, smth wie folgt

JavaPairRDD<Key, Iterable<Value>> pair = ...; 
JavaRDD<Iterable<Value>> values = pair.map(t2 -> t2._2()); 
JavaRDD<Value> onlyValues = values.flatMap(it -> it); 

Alternative Ansatz müssen

JavaPairRDD<Key, Iterable<Value>> pair = ...; 
JavaRDD<Key, Value> keyValues = pair.flatMapValues(v1 -> v1); 
JavaRDD<Value> values = keyValues.map(t2 -> t2._2()); 
+0

dank evgenii, da ich brauche von JavaPairRDD > zu JavaRDD zu erhalten, die innerhalb foreachRDD das Ergebnis JavaRDD Werte = rdd.flatMap ((FlatMapFunction >, String>) tuple2 - > { final Liste l = Lists.newArrayList(); tuple2._2(). ForEach (l :: add); return l; }); sind mit demselben Schlüssel verwandt? – Daniela

+0

Ich habe deine Frage wahrscheinlich missverstanden. Ich werde meine Antwort bearbeiten, hoffe, es wird diesmal besser. – evgenii