2016-05-31 12 views
3

Karte Ich streame für kafka Thema wie unten;Wie kafka Topic Namen und entsprechende Datensätze in Spark-Streaming-

JavaPairInputDStream<String, String> directKafkaStream = 
    KafkaUtils.createDirectStream(jssc, 
            String.class, 
            String.class, 
            StringDecoder.class, 
            StringDecoder.class, 
            kafkaParams, 
            topicSet); 

directKafkaStream.print(); 

Der Ausgang gibt wie unten für ein Thema:

(null,"04/15/2015","18:44:14") 
(null,"04/15/2015","18:44:15") 
(null,"04/15/2015","18:44:16") 
(null,"04/15/2015","18:44:17") 

Wie kann ich Thema Namen und Datensätze abzubilden.
ab: Thema ist „Calldata“, sollte es so etwas wie unten und so weiter

(callData,"04/15/2015","18:44:14") 
(callData,"04/15/2015","18:44:15") 
(callData,"04/15/2015","18:44:16") 
(callData,"04/15/2015","18:44:17") 
+0

Was bedeutet * "Themenamen und Datensätze" *? –

+0

gerade aktualisiert die Frage .. – Alka

Antwort

3

Wie kann ich Themennamen und Aufzeichnungen Karte?

Um die Partitionsinformationen zu extrahieren, you'll need to use the overload which accepts a Function receiving MessageAndMetadata<K, V> und den Typ, den Sie transformieren möchten, zurückgeben.

Es sieht wie folgt aus:

Map<TopicAndPartition, Long> map = new HashMap<>(); 
map.put(new TopicAndPartition("topicname", 0), 1L); 

JavaInputDStream<Map.Entry> stream = KafkaUtils.createDirectStream(
     javaContext, 
     String.class, 
     String.class, 
     StringDecoder.class, 
     StringDecoder.class, 
     Map.Entry.class, // <--- This is the record return type from the transformation. 
     kafkaParams, 
     map, 
     messageAndMetadata -> 
      new AbstractMap.SimpleEntry<>(messageAndMetadata.topic(), 
              messageAndMetadata.message())); 

Hinweis habe ich Map.Entry als Java Ersatz für eine Tuple2 in Scala. Sie können Ihre eigene Klasse mit einer Partition und Message Eigenschaft auch bereitstellen und die für die Transformation verwenden. Beachten Sie, dass der Typ des kafka-Eingabestreams nun JavaInputDStream<Map.Entry> ist, da dies die Transformation zurückgibt.

+0

16/05/31 10:44:20 FEHLER kafka.DirectKafkaInputDStream: ArrayBuffer (org.apache.spark.SparkException: Konnte nicht finden, Leader-Offsets für Set()) – Alka

+0

was sollte der Wert sein der Map hier.Es sagt Map .Was soll ich hier setzen – Alka

+0

@Alka Sollte der Name Ihrer Warteschlange, Partition und Offset sein. Ich habe es mit einem Beispiel bearbeitet. –

Verwandte Themen