2017-07-19 1 views
3

Die neue Kafka-Version (0.11) unterstützt genau einmal Semantik.Bedeutung von sendOffsetsToTransaction in Kafka 0.11

https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

Ich habe wie dieses einen Produzenten Setup mit KAFKA Transaktionscode in Java bekam.

producer.initTransactions(); 
    try { 
     producer.beginTransaction(); 
     for (ProducerRecord<String, String> record : payload) { 
      producer.send(record); 
     } 

     Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() { 
      { 
       put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null)); 
      } 
     }; 
     producer.sendOffsetsToTransaction(groupCommit, "groupId"); 
     producer.commitTransaction(); 
    } catch (ProducerFencedException e) { 
     producer.close(); 
    } catch (KafkaException e) { 
     producer.abortTransaction(); 
    } 

Ich bin mir nicht ganz sicher, wie die sendOffsetsToTransaction verwenden und den den beabsichtigten Anwendungsfall davon. AFAIK, Consumer Groups ist eine Multithread-Lesefunktion auf Konsumentenseite.

javadoc sagt

"Sendet eine Liste der verbrauchten Offsets an den Verbraucher Gruppenkoordinator und markiert auch jene Offsets im Rahmen der aktuellen Transaktion. Werden diese Offsets nur verbraucht angesehen werden, wenn die Transaktion erfolgreich begangen wird. Diese Diese Methode sollte verwendet werden, wenn Sie im Batch konsumieren und Nachrichten gemeinsam erstellen müssen, normalerweise in einem consume-transform-produce-Muster. "

Wie würde produzieren eine Liste der konsumierten Offsets pflegen? Was ist der Sinn?

Antwort

0

Dies ist nur relevant für Workflows, in denen Sie konsumieren und dann Nachrichten basierend auf diesem verbrauchen. In solchen Fällen können Sie mit dieser Funktion Ihre Offsets nur festschreiben, wenn die nachgeschaltete Erzeugertransaktion erfolgreich ist.

Ohne Transaktionen verwenden Sie Consumer#commitSync() oder Consumer#commitAsync(). Wenn Sie diese Consumer-Methoden jedoch verwenden, um Offsets festzulegen, bevor Sie die Daten verwenden, die Sie mit einem Producer konsumiert haben, haben Sie Offsets festgelegt, bevor Sie wissen, ob der Producer seine Datensätze erfolgreich gesendet hat. Anstatt also Ihre Offsets mit dem Consumer zu beglaubigen, können Sie Producer#sendOffsetsToTransaction() auf dem Downstream-Producer verwenden. Dies sendet die Offsets an den Transaktionsmanager, der die Transaktion bearbeitet. Nur wenn die gesamte Transaktion erfolgreich ist, werden die Offsets festgeschrieben.

() Hinweis:: Wenn Sie Offsets zum Commit senden, sollten Sie 1 zum zuletzt gelesenen Offset hinzufügen, damit zukünftige Lesevorgänge von dem nicht gelesenen Offset fortgesetzt werden. Dies gilt unabhängig davon, ob Sie mit committen der Verbraucher oder der Hersteller. Siehe: KafkaProducer sendOffsetsToTransaction need offset+1 to successfully commit current offset).