2017-09-25 3 views
0

Ich versuche eine einfache Java API zu erstellen, um Kafka Producer und Consumer zu testen. Wenn ich den Producer und den Consumer auf separaten Terminals auf meinem Mac betreibe, funktioniert das gut. Aber als ich versuchte, Kafka-Server mit Java-API-Code zu verbinden, immer diese Fehlermeldung:Apache Kafka Producer Consumer API Problem

Exception in thread "main" java.lang.NullPointerException bei kafkatest2.ProducerTest.main (ProducerTest.java:34)

Producer Code:

package kafkatest2; 
import java.util.Properties; 
import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.Producer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
public class ProducerTest { 
    public static void main(String[] args) { 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("acks", "all"); 
    props.put("retries", 0); 
    props.put("batch.size", 16384); 
    props.put("linger.ms", 1); 
    props.put("buffer.memory", 33554432); 
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); 

    Producer<String, String> producer = null; 
    try { 
      producer = new KafkaProducer<>(props); 
     for (int i = 0; i < 10; i++) { 
     String msg = "Message " + i; 
     producer.send(new ProducerRecord<String, String>("tested", msg)); 
     System.out.println("Sent:" + msg); 
     } 
    } catch (Exception e) { 
     e.printStackTrace(); 

    } finally { 
    System.out.println("last"); 
     producer.close(); 
    } 

    } 

} 

Consumer-Code:

package kafkatest2; 

import java.util.Arrays; 
import java.util.Properties; 

import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 

public class ConsumerTest { 

    public static void main(String[] args) { 
    Properties props = new Properties(); 
    props.put("zookeeper.connect", "127.0.0.1:2181"); 
    props.put("group.id", "test-consumer-group"); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("auto.offset.reset", "earliest"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

    KafkaConsumer<String, String> kafkaConsumer1 = new KafkaConsumer<>(props); 
    kafkaConsumer1.subscribe(Arrays.asList("tested")); 
    while (true) { 
     ConsumerRecords<String, String> records = kafkaConsumer1.poll(10); 
     for (ConsumerRecord<String, String> record : records) { 
     System.out.println("Partition: " + record.partition() + " Offset: " + record.offset() + " Value: " + record.value() + " ThreadID: " + Thread.currentThread().getId()); 
     } 
    } 

    } 

} 

Bitte lassen Sie mich wissen, was ich vermisse? Gibt es ein Problem mit den Konfigurationswerten?

Danke, Vipul

Antwort

0

Der Stack-Trace zeigt eine NPE in Ihrem Erzeugercode:

Exception in thread "main" java.lang.NullPointerException at 
kafkatest2.ProducerTest.main(ProducerTest.java:34) 

Dies kann passieren, wenn producer = new KafkaProducer<>(props); eine Ausnahme auslöst. Wenn dies der Fall ist, wird beim Eingeben des finally-Blocks die lokale Variable producer nicht definiert, so dass das Aufrufen von producer.close() eine NPE auslöst. Schließen Sie den Anruf einfach um, um ihn in einem if (producer != null) Block zu schließen.

+0

Danke Michael für Ihre Kommentare. Aber das Problem ist, warum Produzent = neuer KafkaProducer <> (Requisiten); ist Wurffehler. ??? Ich kann das nicht lösen. –

+0

Der Producer-Code, den Sie oben eingefügt haben, funktioniert für mich (getestet mit Kafka 0.11.0.1). Wenn Sie immer noch Probleme haben, empfehlen wir Ihnen, log4j Logging zu aktivieren. Für den Verbraucher müssen Sie 'zoekeeper.connect' entfernen und durch' bootstrap.servers' ersetzen, wie Sie es im Producer haben. –

+0

Ja, es funktioniert jetzt gut. Danke –