2017-04-20 5 views
1

Ich benutze Kafka mit der Consumer API (v. 0.10.0.0). Kafka ist in Docker läuft das Bild von http://wurstmeister.github.io/kafka-docker/Kafka Consumer beitreten Cluster

Auch diesen einfachen Test Ich laufe mit:

@Test 
    public void test2() { 

    Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("group.id", RandomStringUtils.randomAlphabetic(8)); 
    props.put("auto.offset.reset.config", "earliest"); 
    props.put("enable.auto.commit", "false"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); 
    Properties props1 = new Properties(); 
    props1.put("bootstrap.servers", "localhost:9092"); 

    props1.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props1.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

    KafkaProducer<String, String> producer1 = new KafkaProducer<>(props1); 
    KafkaProducer<String, String> producer = producer1; 

    consumer.subscribe(asList(TEST_TOPIC)); 

    producer.send(new ProducerRecord<>(TEST_TOPIC, 0, "key", "value message")); 
    producer.flush(); 


    boolean done = false; 
    while (!done) { 
     ConsumerRecords<String, String> msg = consumer.poll(1000); 
     if (msg.count() > 0) { 
     Iterator<ConsumerRecord<String, String>> msgIt = msg.iterator(); 
     while (msgIt.hasNext()) { 
      ConsumerRecord<String, String> rec = msgIt.next(); 
      System.out.println(rec.value()); 
     } 
     consumer.commitSync(); 
     done = true; 
     } 
    } 

    consumer.close(); 
    producer.close(); 
    } 

Thema Namen und Verbraucher-ID werden in zufälliger Reihenfolge bei jeder Ausführung erzeugt.

Das Verhalten ist sehr unberechenbar ... Manchmal wird es funktionieren, manchmal wird es Schleife startet beim Aufruf .poll() mit der folgenden Wiederholungs Ausgabe:

2017-04-20 12:01:46 DEBUG NetworkClient:476 - Completed connection to node 1003 
2017-04-20 12:01:46 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003 
2017-04-20 12:01:46 DEBUG Metadata:180 - Updated cluster metadata version 3 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]]) 
2017-04-20 12:01:46 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null) 
2017-04-20 12:01:46 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686106738, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]2bea5ab4, request=RequestSend(header={api_key=10,api_version=0,correlation_id=3,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686106738, sendTimeMs=1492686106738), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) 
2017-04-20 12:01:46 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003 
2017-04-20 12:01:46 DEBUG Metadata:180 - Updated cluster metadata version 4 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]]) 
2017-04-20 12:01:46 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null) 
2017-04-20 12:01:46 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686106840, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]3d8314f0, request=RequestSend(header={api_key=10,api_version=0,correlation_id=5,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686106839, sendTimeMs=1492686106839), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) 
2017-04-20 12:01:46 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003 
2017-04-20 12:01:46 DEBUG Metadata:180 - Updated cluster metadata version 5 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]]) 
2017-04-20 12:01:46 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null) 
2017-04-20 12:01:46 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686106941, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]2df32bf7, request=RequestSend(header={api_key=10,api_version=0,correlation_id=7,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686106940, sendTimeMs=1492686106940), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) 
2017-04-20 12:01:47 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003 
2017-04-20 12:01:47 DEBUG Metadata:180 - Updated cluster metadata version 6 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]]) 
2017-04-20 12:01:47 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null) 
2017-04-20 12:01:47 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686107042, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]530612ba, request=RequestSend(header={api_key=10,api_version=0,correlation_id=9,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686107041, sendTimeMs=1492686107041), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) 
2017-04-20 12:01:47 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003 
2017-04-20 12:01:47 DEBUG Metadata:180 - Updated cluster metadata version 7 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]]) 
2017-04-20 12:01:47 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null) 
2017-04-20 12:01:47 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686107144, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]2a40cd94, request=RequestSend(header={api_key=10,api_version=0,correlation_id=11,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686107144, sendTimeMs=1492686107144), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) 
2017-04-20 12:01:47 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003 

Wer weiß, was los ist? Es scheint eine ziemlich einfache Einrichtung/Test für mich ...

Antwort

0

Ich habe den Grund selbst gefunden. Also habe ich den Consumer zu einem Thema mit nur 1 Partition laufen lassen. Dann tötete ich den Prozess einfach mit dem Verbraucher, also keine saubere Abschaltung.

In dieser Situation behält der Broker den Spot für den Verbraucher, bis die Sitzung abläuft. Der Versuch, sich mit einem anderen Verbraucher zu verbinden, führt zu diesem Fehler bis zum Ablaufdatum.

lösen man tun kann: (?) - Gruppe wechseln Id - - Warten Sie, bis Sitzung Ablauf Starten Sie den Broker

Wenn jemand mit mehr Wissen besser erklären können, tun Sie bitte