2016-06-19 7 views
3

Ich habe eine Spark-Streaming-Anwendung, die mehrere JSON-Nachrichten pro Sekunde empfängt, von denen jede eine ID hat, die ihre Quelle identifiziert.Wie gruppieren Schlüssel/Werte nach Partition in Spark?

Mit dieser ID als Schlüssel kann ich MapPartitionsToPair ausführen und so einen JavaPairDStream erstellen, mit einem RDD der Schlüssel/Wert-Paare, ein Schlüsselwertpaar pro Partition (wenn ich also beispielsweise 5 JSON-Nachrichten erhielt, Ich bekomme eine RDD mit 5 Partitionen, jede mit der ID der Nachricht als Schlüssel und der JSON-Nachricht selbst als Wert).

Was ich jetzt tun möchte, ist, möchte ich alle Werte, die den gleichen Schlüssel haben, in der gleichen Partition gruppieren. Wenn ich zum Beispiel 3 Partitionen mit Schlüssel 'a' und 2 Partitionen mit Schlüssel 'b' habe, möchte ich eine neue RDD mit 2 Partitionen anstelle von 5 erstellen, wobei jede Partition alle Werte enthält, die ein Schlüssel hat, einen für "a" und eins für "b".

Wie kann ich das erreichen? Dies ist mein Code so weit:

JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]), 
      StorageLevels.MEMORY_AND_DISK_SER); 

JavaPairDStream<String,String> streamGiveKey= streamData2.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() { 
     @Override 
     public Iterable<Tuple2<String, String>> call(Iterator<String> stringIterator) throws Exception { 

      ArrayList<Tuple2<String,String>>a= new ArrayList<Tuple2<String, String>>(); 

      while (stringIterator.hasNext()){ 
       String c=stringIterator.next(); 
       if(c==null){ 
        return null; 

       } 

       JsonMessage retMap = new Gson().fromJson(c,JsonMessage.class); 
       String key= retMap.getSid(); 
       Tuple2<String,String> b= new Tuple2<String,String>(key,c); 
       a.add(b); 

       System.out.print(b._1+"_"+b._2); 
       // } 
       //break; 
      } 


      return a; 
     } 
    }); 

// ich eine JavaPairDStream, in dem jeder Partition ein Schlüssel/Wert-Paar enthält.

Ich versuchte grouByKey(), aber egal zu verwenden, was die Anzahl der Nachrichten waren, bekam ich immer eine Partitionsnummer 2.

Wie soll ich das tun? Vielen Dank.

+0

Warum würden Sie pro Partition 1 Element gefallen hat? Welches Problem versuchen Sie zu lösen? – maasg

Antwort

4

Sie können das

groupByKey(Integer numPartitions) 

und stellen Sie die numPartitions gleich der Anzahl der verschiedenen Tasten, die Sie haben.

Aber .. Sie müssen wissen, , wie viele verschiedene Schlüssel haben Sie vorne. Hast du diese Informationen? Wahrscheinlich nicht. Also dann ... müsstest du etwas extra (/ redundante) Arbeit machen. Z.B. verwenden

countByKey 

als der erste Schritt. Das ist schneller als groupByKey - also zumindest warst du nicht die Gesamtverarbeitungszeit.

Update Das OP fragte, warum sie standardmäßig 2 Partitionen bekommen.

Der Standard verwendet eine groupByKeydefaultPartitioner() Methode

groupByKey(defaultPartitioner(self)) 
  • , die mit dem größten Kardinalität der Partitioner von der übergeordneten Partition auswählt.

- oder es wird verwenden spark.default.parallelism

+0

Danke, das löst definitiv mein Problem. Aber nur eine Frage: Weißt du, warum 'groupByKey()' standardmäßig 2 Partitionen zurückgibt? Egal, wie viele Eingaben ich pro Batch-Intervall oder Ausgaben, die ich habe, senden, scheint es, dass groupByKey unabhängig von all dem ist. Es gibt nur 2 zurück, wenn ich 'getNumPartitions' mache –