2016-03-21 11 views
1

Ich habe eine RDD und möchte mehr RDD hinzufügen. Wie kann ich das in Spark machen? Ich habe Code wie folgt. Ich möchte RDD aus dem dStream, den ich habe, zurückgeben.Wie RD RD zu mehr RDD in Spark hinzufügen?

JavaDStream<Object> newDStream = dStream.map(this); 
JavaRDD<Object> rdd = context.sparkContext().emptyRDD(); 
return newDStream.wrapRDD(context.sparkContext().emptyRDD()); 

ich von Apache Spark zur Verfügung gestellt nicht viel Dokumentation über wrapRDD Methode der JavaDStream Klasse finden.

Antwort

1

Sie JavaStreamingContext.queueStream verwenden können, und füllen Sie es mit einem Queue<RDD<YourType>>:

public JavaInputDStream<Object> FillDStream() { 
    LinkedList<RDD<Object>> rdds = new LinkedList<RDD<Object>>(); 
    rdds.add(context.sparkContext.emptyRDD()); 
    rdds.add(context.sparkContext.emptyRDD()); 

    JavaInputDStream<Object> filledDStream = context.queueStream(rdds); 
    return filledStream; 
} 
+0

Kann ich eine JavaRDD-Liste in eine einzelne JavaRDD konvertieren? –

+0

Ja. Sie können 'JavaRDD.union' verwenden. –

+0

Union wird mir Dstream geben, aber ich möchte JavaRDD als das ist der Rückgabetyp meiner Methode. –

1

Da RDD ist unveränderlich, was Sie tun können, ist sparkContext.parallize verwenden, um eine neue RDD und geben die neue zu erstellen.

List<Object> objectList = new ArrayList<Object>; 
objectList.add("your content"); 

JavaRDD<Object> objectRDD = sparkContext.parallize(objectList); 
JavaRDD<Object> newRDD = oldRDD.union(objectRDD); 
Verwandte Themen