Ich habe eine Spark-Streaming-Anwendung, die mehrere JSON-Nachrichten pro Sekunde empfängt, von denen jede eine ID hat, die ihre Quelle identifiziert.Wie gruppieren Schlüssel/Werte nach Partition in Spark?
Mit dieser ID als Schlüssel kann ich MapPartitionsToPair
ausführen und so einen JavaPairDStream erstellen, mit einem RDD der Schlüssel/Wert-Paare, ein Schlüsselwertpaar pro Partition (wenn ich also beispielsweise 5 JSON-Nachrichten erhielt, Ich bekomme eine RDD mit 5 Partitionen, jede mit der ID der Nachricht als Schlüssel und der JSON-Nachricht selbst als Wert).
Was ich jetzt tun möchte, ist, möchte ich alle Werte, die den gleichen Schlüssel haben, in der gleichen Partition gruppieren. Wenn ich zum Beispiel 3 Partitionen mit Schlüssel 'a' und 2 Partitionen mit Schlüssel 'b' habe, möchte ich eine neue RDD mit 2 Partitionen anstelle von 5 erstellen, wobei jede Partition alle Werte enthält, die ein Schlüssel hat, einen für "a" und eins für "b".
Wie kann ich das erreichen? Dies ist mein Code so weit:
JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER);
JavaPairDStream<String,String> streamGiveKey= streamData2.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(Iterator<String> stringIterator) throws Exception {
ArrayList<Tuple2<String,String>>a= new ArrayList<Tuple2<String, String>>();
while (stringIterator.hasNext()){
String c=stringIterator.next();
if(c==null){
return null;
}
JsonMessage retMap = new Gson().fromJson(c,JsonMessage.class);
String key= retMap.getSid();
Tuple2<String,String> b= new Tuple2<String,String>(key,c);
a.add(b);
System.out.print(b._1+"_"+b._2);
// }
//break;
}
return a;
}
});
// ich eine JavaPairDStream, in dem jeder Partition ein Schlüssel/Wert-Paar enthält.
Ich versuchte grouByKey()
, aber egal zu verwenden, was die Anzahl der Nachrichten waren, bekam ich immer eine Partitionsnummer 2.
Wie soll ich das tun? Vielen Dank.
Warum würden Sie pro Partition 1 Element gefallen hat? Welches Problem versuchen Sie zu lösen? – maasg