0
Ich teste einen elementaren Transaktionsfall, aber es bleibt auf producer.initTransaction(); Kann irgendeine Konfiguration falsch sein?Apache Kafka - Producer Pause auf initTransaction
public static void main(String[] argv) throws Exception {
String topicName = "helloworldpartitioned";
// Configure the Producer
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProperties.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "1000");
configProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "test");
// configProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, Boolean.TRUE);
configProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, PRODUCER_TRANSACTIONAL_ID);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configProperties);
producer.initTransactions();
producer.beginTransaction();
String line = "TestMessage";
System.out.println("Inizializzo la transazione");
ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, line);
producer.send(rec);
// Thread.sleep(5000);
producer.commitTransaction();
producer.flush();
producer.close();
}
bevor es war so, aber es bleibt immer in der ersten Zeile –
In diesem Beispiel https://gist.github.com/apurvam/f94353a22d4cbe62a7d6abb0464f3891 verwendete er ' beginTransaction 'vor' producer.send (rec) '. Aber er verwendete auch 'producer.sendOffsets (getUncommittedOffsets());' und dann 'producer.endTransaction();'. Vielleicht kannst du so etwas benutzen. –
Und ich verstehe nicht, was meinst du in deinem Kommentar? –