2017-11-23 5 views
0

Ich teste einen elementaren Transaktionsfall, aber es bleibt auf producer.initTransaction(); Kann irgendeine Konfiguration falsch sein?Apache Kafka - Producer Pause auf initTransaction

public static void main(String[] argv) throws Exception { 
    String topicName = "helloworldpartitioned"; 
    // Configure the Producer 
    Properties configProperties = new Properties(); 
    configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
    configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
    configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
    configProperties.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "1000"); 
    configProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "test"); 
//  configProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, Boolean.TRUE); 
    configProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, PRODUCER_TRANSACTIONAL_ID); 

    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configProperties); 
    producer.initTransactions(); 
    producer.beginTransaction(); 
     String line = "TestMessage"; 
     System.out.println("Inizializzo la transazione"); 
     ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, line); 
     producer.send(rec); 
    // Thread.sleep(5000); 
    producer.commitTransaction(); 
    producer.flush(); 
    producer.close(); 
} 

Antwort

0

Versuchen Sie:

producer.initTransactions(); 
producer.beginTransaction(); 
    String line = "TestMessage"; 
    System.out.println("Inizializzo la transazione"); 
    ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, line); 
    producer.send(rec); 


// Thread.sleep(5000); 
producer.commitTransaction(); 
producer.flush(); 
producer.close(); 
+0

bevor es war so, aber es bleibt immer in der ersten Zeile –

+0

In diesem Beispiel https://gist.github.com/apurvam/f94353a22d4cbe62a7d6abb0464f3891 verwendete er ' beginTransaction 'vor' producer.send (rec) '. Aber er verwendete auch 'producer.sendOffsets (getUncommittedOffsets());' und dann 'producer.endTransaction();'. Vielleicht kannst du so etwas benutzen. –

+0

Und ich verstehe nicht, was meinst du in deinem Kommentar? –