2017-04-22 7 views
0

Ich bin neu in Kafka .I begann auf Kafka tun ich bin unten Problem konfrontiert bitte helfen Sie mir, dieses eine im Voraus zu lösen. Zuerst schreibe ich Producer API, es funktioniert gut, aber während Consumer API Nachrichten nicht angezeigt werden.Kafka Consumer-API funktioniert nicht richtig

Mein Code ist wie folgt:

import java.util.Arrays; 
import java.util.Properties; 

import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 


public class ConsumerGroup { 
    public static void main(String[] args) throws Exception { 

     String topic = "Hello-Kafka"; 
     String group = "myGroup"; 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "XXX.XX.XX.XX:9092"); 
     props.put("group.id", group); 
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); 
     try { 

      consumer.subscribe(Arrays.asList(topic)); 
      System.out.println("Subscribed to topic " + topic); 


      ConsumerRecords<String, String> records = consumer.poll(100); 

      System.out.println("records ::" + records); 
      System.out.println(records.toString()); 
      for (ConsumerRecord<String, String> record : records) { 
       System.out.println("Record::" + record.offset()); 
       System.out.println(record.key()); 
       System.out.println(record.value()); 
      } 
      consumer.commitSync(); 

     } catch (Exception e) { 
      e.printStackTrace(); 
     } finally { 
      consumer.commitSync(); 
      consumer.close(); 
     } 
    } 
} 

Antwort ::

abonniert Thema Hallo-Kafka records :: [email protected] org. [email protected]

hier nicht den Druck des Offset, Schlüssel, Wert Steuerung kommt nicht für (ConsumerRecord Datensatz: die Datensätze) { , dass für Schleife es selbst bitte helfen Sie mir.

+0

Haben Sie einige Nachrichten zum Thema produziert? Scheint, dass Ihr Thema keine Nachrichten hat – divyesh

Antwort

0

Sie versuchen, leere Datensätze zu drucken, daher wird nur records.toString() in Ihrem Code gedruckt, der im Wesentlichen der Name der Klasse ist.
Ich habe einige Änderungen in Ihrem Code vorgenommen und es funktioniert. Schau mal, ob das hilft.

public class ConsumerGroup { 
    public static void main(String[] args) throws Exception { 

     String topic = "Hello-Kafka"; 
     String group = "myGroup"; 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "xx.xx.xx.xx:9092"); 
     props.put("group.id", group); 
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); 
     try { 

      consumer.subscribe(Arrays.asList(topic)); 
      System.out.println("Subscribed to topic " + topic); 

      while(true){ 
       ConsumerRecords<String, String> records = consumer.poll(1000); 
       if(records.isEmpty()){ 

       } 
       else{ 
       System.out.println("records ::" + records); 
       System.out.println(records.toString()); 
       for (ConsumerRecord<String, String> record : records) { 
        System.out.println("Record::" + record.offset()); 
        System.out.println(record.key()); 
        System.out.println(record.value()); 
       } 
       consumer.commitSync(); 
       } 
      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } finally { 
      consumer.commitSync(); 
      consumer.close(); 
     } 
    } 
} 
+0

vielen Dank .now es funktioniert. Vor kurzem habe ich einen kleinen Fehler gemacht. – Narasimha