Ich benutze Kafka mit der Consumer API (v. 0.10.0.0). Kafka ist in Docker läuft das Bild von http://wurstmeister.github.io/kafka-docker/Kafka Consumer beitreten Cluster
Auch diesen einfachen Test Ich laufe mit:
@Test
public void test2() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", RandomStringUtils.randomAlphabetic(8));
props.put("auto.offset.reset.config", "earliest");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
Properties props1 = new Properties();
props1.put("bootstrap.servers", "localhost:9092");
props1.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props1.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer1 = new KafkaProducer<>(props1);
KafkaProducer<String, String> producer = producer1;
consumer.subscribe(asList(TEST_TOPIC));
producer.send(new ProducerRecord<>(TEST_TOPIC, 0, "key", "value message"));
producer.flush();
boolean done = false;
while (!done) {
ConsumerRecords<String, String> msg = consumer.poll(1000);
if (msg.count() > 0) {
Iterator<ConsumerRecord<String, String>> msgIt = msg.iterator();
while (msgIt.hasNext()) {
ConsumerRecord<String, String> rec = msgIt.next();
System.out.println(rec.value());
}
consumer.commitSync();
done = true;
}
}
consumer.close();
producer.close();
}
Thema Namen und Verbraucher-ID werden in zufälliger Reihenfolge bei jeder Ausführung erzeugt.
Das Verhalten ist sehr unberechenbar ... Manchmal wird es funktionieren, manchmal wird es Schleife startet beim Aufruf .poll() mit der folgenden Wiederholungs Ausgabe:
2017-04-20 12:01:46 DEBUG NetworkClient:476 - Completed connection to node 1003
2017-04-20 12:01:46 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003
2017-04-20 12:01:46 DEBUG Metadata:180 - Updated cluster metadata version 3 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]])
2017-04-20 12:01:46 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null)
2017-04-20 12:01:46 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686106738, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]2bea5ab4, request=RequestSend(header={api_key=10,api_version=0,correlation_id=3,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686106738, sendTimeMs=1492686106738), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
2017-04-20 12:01:46 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003
2017-04-20 12:01:46 DEBUG Metadata:180 - Updated cluster metadata version 4 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]])
2017-04-20 12:01:46 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null)
2017-04-20 12:01:46 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686106840, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]3d8314f0, request=RequestSend(header={api_key=10,api_version=0,correlation_id=5,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686106839, sendTimeMs=1492686106839), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
2017-04-20 12:01:46 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003
2017-04-20 12:01:46 DEBUG Metadata:180 - Updated cluster metadata version 5 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]])
2017-04-20 12:01:46 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null)
2017-04-20 12:01:46 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686106941, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]2df32bf7, request=RequestSend(header={api_key=10,api_version=0,correlation_id=7,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686106940, sendTimeMs=1492686106940), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
2017-04-20 12:01:47 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003
2017-04-20 12:01:47 DEBUG Metadata:180 - Updated cluster metadata version 6 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]])
2017-04-20 12:01:47 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null)
2017-04-20 12:01:47 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686107042, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]530612ba, request=RequestSend(header={api_key=10,api_version=0,correlation_id=9,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686107041, sendTimeMs=1492686107041), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
2017-04-20 12:01:47 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003
2017-04-20 12:01:47 DEBUG Metadata:180 - Updated cluster metadata version 7 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]])
2017-04-20 12:01:47 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null)
2017-04-20 12:01:47 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686107144, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]2a40cd94, request=RequestSend(header={api_key=10,api_version=0,correlation_id=11,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686107144, sendTimeMs=1492686107144), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
2017-04-20 12:01:47 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003
Wer weiß, was los ist? Es scheint eine ziemlich einfache Einrichtung/Test für mich ...