Der empfohlene Weg, dies zu tun ist, einen Thread-Pool haben, so Java-Organisation für Sie behandeln kann und für jeden Strom der createMessageStreamsByFilter Methode gibt Sie verbrauchen es in einem lauffähigen. Zum Beispiel:
int NUMBER_OF_PARTITIONS = 6;
Properties consumerConfig = new Properties();
consumerConfig.put("zk.connect", "zookeeper.mydomain.com:2181");
consumerConfig.put("backoff.increment.ms", "100");
consumerConfig.put("autooffset.reset", "largest");
consumerConfig.put("groupid", "java-consumer-example");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerConfig));
TopicFilter sourceTopicFilter = new Whitelist("mytopic|myothertopic");
List<KafkaStream<Message>> streams = consumer.createMessageStreamsByFilter(sourceTopicFilter, NUMBER_OF_PARTITIONS);
ExecutorService executor = Executors.newFixedThreadPool(streams.size());
for(final KafkaStream<Message> stream: streams){
executor.submit(new Runnable() {
public void run() {
for (MessageAndMetadata<Message> msgAndMetadata: stream) {
ByteBuffer buffer = msgAndMetadata.message().payload();
byte [] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
//Do something with the bytes you just got off Kafka.
}
}
});
}
In diesem Beispiel habe ich für 6 Fäden im Grunde gefragt, weil ich weiß, dass ich 3 Partitionen für jedes Thema haben, und ich aufgelistet zwei Themen in meinem weißen Liste. Sobald wir die Handles der eingehenden Streams haben, können wir über ihren Inhalt iterieren, bei denen es sich um MessageAndMetadata-Objekte handelt. Metadaten sind nur der Name und der Offset des Themas. Wie Sie herausgefunden haben, können Sie es in einem einzigen Thread tun, wenn Sie in meinem Beispiel 6 nach einem Stream fragen, aber wenn Sie eine parallele Verarbeitung benötigen, ist es sinnvoll, einen Executor mit einem Thread für jeden zurückgegebenen Stream zu starten.
SimpleConsumer Verwendung ist keine Option? –