2017-07-20 1 views
-4

Ich versuche, in Kafka-Thema durch JAVA zu schreiben, wie ich das Thema erstellt habe, aber einige Daten in diesem Thema einfügen möchte.In Thema in Kafka durch Java schreiben Code

Vielen Dank im Voraus.

+2

Was meinst du mit "Ich versuche"? Hast du einen Code, der nicht funktioniert? Kannst du es zeigen? Ansonsten gibt es viele Codebeispiele online zur Verwendung eines KafkaProducers, keine Notwendigkeit für eine Stack Overflow-Frage. – ppatierno

Antwort

0

Hier ist ein Beispiel für einen synchronen Producer. Es sollte (zu und einigen früheren Versionen) mit Kafka 0,11 arbeiten:

import org.apache.kafka.clients.producer.*; 
import org.apache.kafka.common.serialization.LongSerializer; 
import org.apache.kafka.common.serialization.StringSerializer; 
import java.util.Properties; 

public class MyKafkaProducer { 

    private final static String TOPIC = "my-example-topic"; 
    private final static String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094"; 

    private static Producer<Long, String> createProducer() { 
     Properties props = new Properties(); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); 
     props.put(ProducerConfig.CLIENT_ID_CONFIG, "MyKafkaProducer"); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
     return new KafkaProducer<>(props); 
    } 

    static void runProducer(final int sendMessageCount) throws Exception { 
     final Producer<Long, String> producer = createProducer(); 

     try { 
      for (long index = 1; index <= sendMessageCount; index++) { 
       final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, index, "Message " + index); 
       RecordMetadata metadata = producer.send(record).get(); 
       System.out.printf("sent record(key=%s value='%s')" + " metadata(partition=%d, offset=%d)\n", 
        record.key(), record.value(), metadata.partition(), metadata.offset()); 
      } 
     } finally { 
      producer.flush(); 
      producer.close(); 
     } 
    } 

    public static void main(String[] args) throws Exception { 
     if (args.length == 0) { 
      runProducer(5); 
     } else { 
      runProducer(Integer.parseInt(args[0])); 
     } 
    } 
} 

Sie müssen möglicherweise einige der hartcodierte Einstellungen ändern.

Referenz: http://cloudurable.com/blog/kafka-tutorial-kafka-producer/index.html

Verwandte Themen