2014-05-11 6 views
7

Ich schreibe Kafka Consumer für hochvolumige hohe Geschwindigkeit verteilten Anwendung. Ich habe nur ein Thema, aber eingehende Nachrichten bewerten ist sehr hoch. Mehrere Partitionen, die mehr Konsumenten bedienen, wären für diesen Anwendungsfall geeignet. Der beste Weg zu konsumieren ist, mehrere Stream-Reader zu haben. Laut der Dokumentation oder der verfügbaren Stichproben basiert die Anzahl der KafkaStreams, die der ConsumerConnector herausgibt, auf der Anzahl der Themen. Ich frage mich, wie man mehr als einen KafkaStream-Leser [basierend auf der Partition] bekommt, so dass ich einen Thread pro Stream überbrücken kann oder das Lesen von KafkaStream in mehreren Threads das gleichzeitige Lesen von mehreren Partitionen macht.Apache Kafka - KafkaStream auf Thema/partition

Alle Einblicke werden sehr geschätzt.

+0

SimpleConsumer Verwendung ist keine Option? –

Antwort

14

Möchten was ich von Mailing-Liste gefunden teilen:

Die Zahl, die Sie in den Themen Karte Kontrollen passieren, wie viele Ströme ein Thema in unterteilt. In Ihrem Fall, wenn Sie 1 übergeben, werden alle Daten der 10 Partitionen in 1 Stream eingespeist. Wenn Sie 2 übergeben, erhält jeder der 2 Streams Daten von 5 Partitionen. Wenn Sie 11 übergeben, erhalten 10 von ihnen jeweils Daten von 1 Partition und 1 Stream wird nichts bekommen.

Normalerweise müssen Sie jeden Strom in einem eigenen Thread iterieren. Dies liegt daran, dass jeder Stream für immer blockieren kann, wenn kein neues Ereignis vorliegt.

Beispiel Schnipsel:

topicCount.put(msgTopic, new Integer(partitionCount)); 
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = connector.createMessageStreams(topicCount); 
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(msgTopic); 

for (final KafkaStream stream : streams) { 
    ReadTask task = new ReadTask(stream, msgTopic); 
    task.addObserver(this.msgObserver); 
    tasks.add(task); executor.submit(task); 
} 

Referenz: http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201201.mbox/%[email protected].com%3E

+0

Beispiel Schnipsel topicCount.put (msgTopic, new Integer (partitionCount)); Map >> consumerStreams = connector.createMessageStreams (Topiccount); List > Ströme = consumerStreams.get (msgTopic); für (final KafkaStream stream: Streams) { ReadTask Aufgabe = new ReadTask (stream, msgTopic); task.addObserver (this.msgObserver); tasks.add (Aufgabe); executor.submit (Aufgabe); } –

3

Der empfohlene Weg, dies zu tun ist, einen Thread-Pool haben, so Java-Organisation für Sie behandeln kann und für jeden Strom der createMessageStreamsByFilter Methode gibt Sie verbrauchen es in einem lauffähigen. Zum Beispiel:

int NUMBER_OF_PARTITIONS = 6; 
Properties consumerConfig = new Properties(); 
consumerConfig.put("zk.connect", "zookeeper.mydomain.com:2181"); 
consumerConfig.put("backoff.increment.ms", "100"); 
consumerConfig.put("autooffset.reset", "largest"); 
consumerConfig.put("groupid", "java-consumer-example"); 
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerConfig)); 

TopicFilter sourceTopicFilter = new Whitelist("mytopic|myothertopic"); 
List<KafkaStream<Message>> streams = consumer.createMessageStreamsByFilter(sourceTopicFilter, NUMBER_OF_PARTITIONS); 

ExecutorService executor = Executors.newFixedThreadPool(streams.size()); 
for(final KafkaStream<Message> stream: streams){ 
    executor.submit(new Runnable() { 
     public void run() { 
      for (MessageAndMetadata<Message> msgAndMetadata: stream) { 
       ByteBuffer buffer = msgAndMetadata.message().payload(); 
       byte [] bytes = new byte[buffer.remaining()]; 
       buffer.get(bytes); 
       //Do something with the bytes you just got off Kafka. 
      } 
     } 
    }); 
} 

In diesem Beispiel habe ich für 6 Fäden im Grunde gefragt, weil ich weiß, dass ich 3 Partitionen für jedes Thema haben, und ich aufgelistet zwei Themen in meinem weißen Liste. Sobald wir die Handles der eingehenden Streams haben, können wir über ihren Inhalt iterieren, bei denen es sich um MessageAndMetadata-Objekte handelt. Metadaten sind nur der Name und der Offset des Themas. Wie Sie herausgefunden haben, können Sie es in einem einzigen Thread tun, wenn Sie in meinem Beispiel 6 nach einem Stream fragen, aber wenn Sie eine parallele Verarbeitung benötigen, ist es sinnvoll, einen Executor mit einem Thread für jeden zurückgegebenen Stream zu starten.

+0

Was würde passieren, wenn ich das tue? kafkaConsumerConfig = new ConsumerConfig (...); consumerConnector = Consumer.createJavaConsumerConnector (kafkaConsumerConfig); topicCountMap.put ("mytopic", 1); consumerMap.get ("Mytopic"). get (0); Überprüfen Sie, dass es auf dieser Liste der Kafka-Streams get (0) gibt, also bekomme ich nur 1 Stream. Was passiert, wenn ich Consumer anrufe?createJavaConsumerConnector 10 mal? – stewenson

+0

Sie hatten alle die gleiche Konfiguration und jeder alle Partitionen zu lesen sein wird, damit ich Sie 10 Verbraucher bekommen würde vermuten würde, würde versuchen, ihren Zustand in der gleichen ZK Knoten zu speichern, so dass Sie auf alle davon enden würde mit Verbraucher 1 z die ersten 1K-Nachrichten lesen, dann Verbraucher 2 die gleichen 1K-Nachrichten zu lesen, aber potenziell Verbraucher 1 beenden würde seine Batch-Update ZK Lesen, ein zweites Lesen, Schreiben dann seine Position unser zweites ZK dann aus irgendeinem Grund langsamer Thread kommt und seine Position schreibt zurück zu ZK, was dazu führte, dass der erste Verbraucher die zweite Charge erneut verarbeitete. Im Grunde gibt es viele Konflikte. – feldoh

0
/** 
* @param source : source kStream to sink output-topic 
*/ 
private static void pipe(KStream<String, String> source) { 
    source.to(Serdes.String(), Serdes.String(), new StreamPartitioner<String, String>() { 

     @Override 
     public Integer partition(String arg0, String arg1, int arg2) { 
      return 0; 
     } 
    }, "output-topic"); 
} 

oben Code Rekord bei Partition 1 von Themennamen "output-Thema" schreiben

Verwandte Themen