2017-04-17 6 views
1

Ich habe einen Kafka Producer, den ich in Java geschrieben habe. Es scheint nicht richtig zu funktionieren, obwohl es im Grunde ein Schnitt und eine Vergangenheit von Beispielcode ist. Ich würde erwarten, dass die Ausgabe 10 Nachrichten an meinen Cluster sein wird. Stattdessen erhalte ich die Meldung Erfolgreich ausgegeben, aber nichts geht wirklich an meinen Cluster. Ich bin mir nicht sicher, wo ich mit der Fehlersuche beginnen soll.Kafka Producer scheint nicht richtig zu funktionieren

import java.util.Properties; 
import org.apache.kafka.clients.producer.Producer; 
import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 

public class SimpleProducer { 

public static void main(String[] args) throws Exception{ 

    String topicName = "test_topic"; 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "skynet.local:6667");  
    props.put("acks", "all"); 
    props.put("retries", 0); 
    props.put("batch.size", 16384); 
    props.put("linger.ms", 1); 
    props.put("buffer.memory", 33554432); 
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); 

    Producer<String, String> producer = new KafkaProducer<String, String>(props); 

    for(int i = 0; i < 10; i++) 
     producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); 
     System.out.println("Message sent successfully"); 
     producer.close(); 
} 
} 
+0

Senden ist ein asynchroner Anruf. Versuchen Sie also, auf die zurückgegebene Zukunft zu warten, bevor Sie Ihre Nachricht drucken. Noch besser, Sie können die Zukunft für Debugging-Zwecke drucken. Ich empfehle auch acks gleich 1 zum Testen. – dawsaw

+0

Also habe ich acks auf 1 gesetzt und die Bildschirmausgabe komplett auskommentiert. Es dauert immer noch sehr lange und scheint keine Nachrichten zu senden. –

+0

Ich führe das innerhalb von Eclipse. Spielt das eine Rolle? –

Antwort

2

Da einige Umgebungen uncleared, so werde ich Ihre Frage beantwortet Basis auf Ihrem Kafka Server versuchen, arbeitet an Port 6667 bereits.

kann Ihr Code sein muss, auf 2 palces einstellen (jemand mich verbessern kann):

props.put("linger.ms", 1); // set to 0 let Producer can send message immediately 

und hier producer.close(); aus for Schleife fallen:

Producer<String, String> producer = new KafkaProducer<String, String>(props, new StringSerializer(), new StringSerializer()); 
for(int i = 0; i < 10; i++) { 
    Future<RecordMetadata> f = producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); 
    System.out.println(f.get()); // don't do that in your Production, here just for debugging purpose. 
} 
producer.close(); 

Eine weitere Sache, Sie können kafka-console-consumer.sh und kafka-console-producer.sh vor Ihren Tests ausführen, um Ihren Kafka-Server zu bestätigen, und Ihr SimpleProducer funktioniert bereits. Kafka 0.10.x Konfigurationsparameter unter Kafka Producer Configuration Parameters

+0

Ich markiere dies als die Antwort, weil es sich herausgestellt hat, dass ja das Schließen des Produzenten in der Tat aus der Schleife entfernt werden musste, aber ich musste auch meine Host-Datei aktualisieren, so dass IPs aufgelöst werden würden. VIELEN DANK! –

+0

@BobWakefield, und '' 'new KafkaProducer (Requisiten, neuer StringSerializer(), neuer StringSerializer())' '', um Serializer zu Ihrem KafkaProducer-Konstruktor hinzuzufügen, könnte besser sein. –

Verwandte Themen