3

Ich bin ein Apache Kafka Verbraucher zu einem anderen bereits laufenden Kafka abonnieren. Nun, mein Problem ist, dass, wenn mein Produzent Nachrichten an einen Server sendet ... mein Konsument sie nicht empfängt. Here I Producer Code geben,Consumer keine Nachricht in Apache Kafka

 Properties properties = new Properties(); 
     properties.put("metadata.broker.list","Running kafka ip addr:9092"); 
     properties.put("serializer.class","kafka.serializer.StringEncoder"); 
     ProducerConfig producerConfig = new ProducerConfig(properties); 
     kafka.javaapi.producer.Producer<String,String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig); 
     String filePath="filepath"; 
     File rootFile= new File(filePath); 
     Collection<File> allFiles = FileUtils.listFiles(rootFile, CanReadFileFilter.CAN_READ, TrueFileFilter.INSTANCE); 
     for(File file : allFiles) { 
      StringBuilder sb = new StringBuilder(); 
      sb.append(file); 
      KeyedMessage<String, String> message =new KeyedMessage<String, String>(TOPIC,sb.toString()); 
      System.out.println("sending msg from producer.."+sb.toString()); 
      producer.send(message); 
     } 
      producer.close(); 

Hier Code Consumer,

  properties.put("bootstrap.servers","Running zookeaper ip addr:2181"); 
     properties.put("group.id","test-group"); 
     properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     properties.put("enable.auto.commit", "false"); 

     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); 
     consumer.subscribe(Collections.singletonList(topicName)); 
     while (true) { 
       ConsumerRecords<String, String> records = consumer.poll(100); 
       for (ConsumerRecord<String, String> record : records) 
       { 
        System.out.println("topic = "+record.topic()); 
        System.out.println("topic = "+record.partition()); 
        System.out.println("topic = "+record.offset()); 
       } 
       try { 
        consumer.commitSync(); 
       } catch (CommitFailedException e) { 
        System.out.printf("commit failed", e) ; 
       } 
      } 

Ich benutze diese Abhängigkeit:

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.11</artifactId> 
     <version>0.10.0.0</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.10.1.0</version> 
    </dependency> 

ich alle Informationen aus dieser Verbindung erhalten:
https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

Wenn wir Verbraucher betrieben, erhielten wir keine Benachrichtigung von der Verbraucherseite. Bitte gib mir eine Idee.

+0

versuchen Sie herauszufinden, wo das Problem ist: auf Verbraucher- oder Herstellergröße. Dafür: Offsets im Thema überprüfen. Es kann von der Kommandozeile – Natalia

+0

ausgeführt werden Sie führen es als eine JAR-Datei im Cluster? .. Bitte überprüfen Sie Ihren Port zookeeper. –

+0

@Natalia: Ich kann die Nachrichten über den Produzenten posten. Ich kann sehen, dass die Nummer der Nachricht zusammen mit der Protokollgröße zunimmt. Aber Offset wird nicht erhöht ... –

Antwort

0

Für Hersteller:

properties.put("metadata.broker.list","Running kafka ip addr:9092"); 

Ich denke, sollte diese "bootstrap.servers" sein.

Für Verbraucher:

properties.put("bootstrap.servers","Running zookeaper ip addr:2181"); 

bootstrap.servers an einen Broker verweisen muss, nicht ZK.

Das "Problem" ist, dass der Konsument nur auf einen Broker wartet, aber nicht ausfällt, wenn kein Broker am angegebenen Host/Port vorhanden ist.

+0

Wir führen beide Broker und Tierpfleger in der gleichen IP. Es ist eine Einzelknoteninstallation. Daher gab es die gleiche ip.Do muss ich Zoowärter und Broker in verschiedenen vms ausführen? –

+0

Sie müssen nicht auf verschiedenen Servern laufen - aber es wird empfohlen. Wie auch immer, ZK und Broker verwenden verschiedene Ports, und '2181' ist ZK Standard Port - also denke ich, dass Sie brauchen, um Broker Port (Standard:' 9092') –

+0

@ Matthias J. Sax - tat in der gleichen, aber immer noch nicht zu bekommen jede Nachricht in Consumer-Seite –

0

Ich bin ein newb bei Kafka und Java, aber ich werde den folgenden Ansatz vorschlagen,

  • Stellen Sie sicher, dass der Hersteller tatsächlich zum Thema schreibt /usr/bin/kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic KumarTopic --from-beginning mit dem folgenden Befehl.
  • Wenn dies der Fall ist, müssen Sie sich wahrscheinlich auf Ihren Verbrauchercode konzentrieren. Die Guides von Confluent sind sehr hilfreich.
+0

@ Zigmaphi-Thnaks für einen Kommentar, habe ich bereits überprüft. Der Produzent schreibt das Thema perfekt und der Kunde läuft auch, bekommt aber immer noch keine Nachricht –

Verwandte Themen