Ich versuche eine einfache Java API zu erstellen, um Kafka Producer und Consumer zu testen. Wenn ich den Producer und den Consumer auf separaten Terminals auf meinem Mac betreibe, funktioniert das gut. Aber als ich versuchte, Kafka-Server mit Java-API-Code zu verbinden, immer diese Fehlermeldung:Apache Kafka Producer Consumer API Problem
Exception in thread "main" java.lang.NullPointerException bei kafkatest2.ProducerTest.main (ProducerTest.java:34)
Producer Code:
package kafkatest2;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ProducerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = null;
try {
producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
String msg = "Message " + i;
producer.send(new ProducerRecord<String, String>("tested", msg));
System.out.println("Sent:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("last");
producer.close();
}
}
}
Consumer-Code:
package kafkatest2;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ConsumerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("zookeeper.connect", "127.0.0.1:2181");
props.put("group.id", "test-consumer-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer1 = new KafkaConsumer<>(props);
kafkaConsumer1.subscribe(Arrays.asList("tested"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer1.poll(10);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Partition: " + record.partition() + " Offset: " + record.offset() + " Value: " + record.value() + " ThreadID: " + Thread.currentThread().getId());
}
}
}
}
Bitte lassen Sie mich wissen, was ich vermisse? Gibt es ein Problem mit den Konfigurationswerten?
Danke, Vipul
Danke Michael für Ihre Kommentare. Aber das Problem ist, warum Produzent = neuer KafkaProducer <> (Requisiten); ist Wurffehler. ??? Ich kann das nicht lösen. –
Der Producer-Code, den Sie oben eingefügt haben, funktioniert für mich (getestet mit Kafka 0.11.0.1). Wenn Sie immer noch Probleme haben, empfehlen wir Ihnen, log4j Logging zu aktivieren. Für den Verbraucher müssen Sie 'zoekeeper.connect' entfernen und durch' bootstrap.servers' ersetzen, wie Sie es im Producer haben. –
Ja, es funktioniert jetzt gut. Danke –