2016-10-31 3 views
2

Ich versuche einen Kafka-Producer zu machen, der eine Zeichenfolge "Dieses Programm läuft" an ein Kafka-Thema sendet. Ich bin mir nicht sicher, warum es nicht funktioniert. Unten ist der folgende Code. Ich bin keine Cloudera-Distribution.Kafka Producer Send String funktioniert nicht

Es gibt

16/10/viele Informationen über den Kafka, ssl, passowrd, client.id usw. auch:

package kafka_test; 

import java.util.Properties; 

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.common.serialization.StringSerializer; 

public class DataMovement { 

    public static void main(String[] args) { 

     String kafkaTopic = args[0]; 

     Properties props = new Properties(); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "server:9092"); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 

     KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); 
     ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(kafkaTopic, null, "This program is running."); 

     producer.send(producerRecord); 
     producer.close(); 

    } 
} 

Ich habe keine Fehlermeldung, sondern eine Auszeit bekommen 31 10:25:46 INFO utils.AppInfoParser: Kafka Version: 0.9.0.1 16/10/31 10:25:46 INFO utils.AppInfoParser: Kafka commitId: commidid 16/10/31 10:26:46 INFO Produzent .KafkaProducer: Schließen des Kafka-Producers mit TimeoutMillis = 9223372036854775807 ms.

+0

Erhalten Sie irgendeine Ausnahme? Bevor Sie den Producer schließen, können Sie noch einige Zeit mit Thread.sleep (2000) warten. Sobald der Producer geschlossen ist, wird Kafka die Nachricht nicht an topic senden. – Shankar

+0

Wenn Sie keinen Nachrichtenschlüssel festlegen, können Sie ProducerRecord auch mit zwei Argumenten, einem Themennamen und einer Nachricht verwenden. – Shankar

+0

Es ist ein Timeout-Fehler. Ich habe einen Schnitt gemacht. Okay, lass mich versuchen, den Schlüssel zu wechseln. Lass mich schlafen gehen. – Defcon

Antwort

0

Der Code funktioniert gut. Problem war der Server: 9092 sollte die Adresse des designierten Brokers des Kafka-Clusters sein (ich hatte es gezielt an den aktiven Broker gerichtet, sie sind unterschiedlich).

+0

Können Sie klären? Jeder aktive Worker sollte als Bootstrap-Server arbeiten - er leitet den Client einfach um (mithilfe von MetadataResponse), um bei Bedarf mit einem anderen Server zu kommunizieren. –

+0

Wir hatten zwei Broker, von denen jeder seinen eigenen Knoten hatte. Aus irgendeinem Grund füllt sich die Nachricht nicht, wenn Sie auf einen von ihnen verweisen (wir können dies als Beispiel für broker_node_2 bezeichnen). Als wir die Sendedaten an broker_node_1 verwiesen haben, funktionierte es. Nicht sicher der Grund. Könnte die Art sein, wie der Cluster eingerichtet wurde. – Defcon

Verwandte Themen