2015-02-11 21 views
9

ich mit dem folgenden Code einen einfachen Kafka Consumer in Java habeKafka Consumer bei .hasNext in Java hängen

public void run() { 
     ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); 
     while (it.hasNext()&& !done){ 
      try { 
       System.out.println("Parsing data"); 
       byte[] data = it.next().message(); 
       System.out.println("Found data: "+data); 
       values.add(data); // array list 
      } catch (InvalidProtocolBufferException e) { 
       e.printStackTrace(); 
      } 
     } 
     done = true; 
    } 

Wenn eine Nachricht geschrieben wird, wird die Daten erfolgreich gelesen, aber wenn es geht es um die Überprüfung .hasNext(), es bleibt ausstehend und kommt nie zurück.

Was könnte das hinhalten?

m_stream ist ein KafkaStream wie folgt erhalten:

Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
topicCountMap.put(topic, new Integer(a_numThreads)); 
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 
executor = Executors.newFixedThreadPool(a_numThreads); 
for (final KafkaStream stream : streams) { 
    // m_stream is one of these streams 
} 

Antwort

11

Die Lösung, die die Eigenschaft

"consumer.timeout.ms"

Jetzt hinzuzufügen war, als das Timeout ein ConsumerTimeoutException erreicht ist, geworfen

Verwandte Themen