2016-10-20 1 views
2

Ich arbeite mit einfachen Kafka Consumer in einem meiner Projekte und meine gewünschte Logik ist, wenn der Verbraucher einige Nachrichten nicht verarbeiten konnte, wird es die letzte korrekt verarbeitete Nachricht übernehmen und dann bei der nächsten Umfrage wird es fortgesetzt von fehlgeschlagener Nachricht.Kafka-Consumer-Wiederherstellung nach fehlgeschlagener Nachrichtenverarbeitung

ich versuchte, jede Nachricht manuell mit folgendem Code zu begehen:

public void fetchMessages() { 
    ConsumerRecords<String, MyObject> messages = kafkaConsumer.poll(10000); 
    for (ConsumerRecord message : messages) { 
     logger.info("Reading kafka message, topic ["+kafkaTopic+"], partition ["+message.partition()+"], offset ["+message.offset()+"]"); 
     try { 
      MyObject myObject = (MyObject) message.value(); 
      logger.info("Handling message," + myObject); 
      handleMessage(myObject); 
      commitMessage(message); 
     } catch (Exception e) { 
      logger.error("Error handling message");    throw e; 
     } 
    } 
} 


private void commitMessage(ConsumerRecord message) { 
     long    nextOffset  = message.offset() + 1; 

     TopicPartition topicPartition = new TopicPartition(kafkaTopic,message.partition()); 
     OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(nextOffset); 

     Map<TopicPartition,OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>(); 
     offsetAndMetadataMap.put(topicPartition,offsetAndMetadata); 

     logger.info("Commiting processed kafka message, topic ["+kafkaTopic+"], partition ["+message.partition()+"], next offset ["+nextOffset+"]"); 
     kafkaConsumer.commitSync(offsetAndMetadataMap); 
} 

Aber zum Beispiel, wenn I-3-Nachrichten holen, von denen jedem von anderer Partition, I den ersten erfolgreichen Handhabung und dann gescheitert zweite Nachricht zu verarbeiten, Ich verlasse gerade ConsumerRecord s für Schleife und ich erwarte, um 2 Nachrichten zu erhalten, die ich noch nicht in der nächsten poll Iteration begangen habe. Stattdessen empfängt der Verbraucher einfach weiterhin neue Nachrichten und kehrt nie zu fehlgeschlagenen Nachrichten zurück.

Auch versucht, seek auf fehlgeschlagene Nachricht anzuwenden und dann die Schleife zu beenden, aber es funktioniert auf 1 Partition und nicht auf viele arbeiten.

kafkaConsumer.seek(new TopicPartition(kafkaTopic,message.partition()),message.offset());  

Einige Details:

  • Thema hat 12 Partitionen
  • Ein Verbraucher für alle Partitionen
  • Verbraucherumfrage Schleife ein in Minute
  • enable.auto.commit ausführt: false

Was ist falsch mit meinem Code oder mit meine Logik?

+0

Ich glaube, Sie wollen Sie nicht hinzufügen ‚1‘ auf den Offset Ich engagiere mich. Stattdessen möchten Sie den korrekt verbrauchten Offset festschreiben. Das würde erklären, dass eine der fehlgeschlagenen Nachrichten in Ihrem Beispiel nicht wiedergegeben wird ... – jimijazz

+0

@jimijazz Ich glaube nicht, dass Sie Recht haben. Bitte werfen Sie einen Blick auf die Kafka Consumer API - commitSync-Methode. ** Der festgeschriebene Offset sollte die nächste Nachricht sein, die Ihre Anwendung konsumiert, zB lastProcessedMessageOffset + 1. ** [link] (https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/ Kunden/Verbraucher/KafkaConsumer.html) – mixermt

+0

Ich denke, du hast Recht @mixermt, danke für das darauf hin ... Ich sehe nicht die Logik davon. Würden nachfolgende Aufrufe von 'poll()' den ersten Offset in jedem Stapel vermissen, da er bereits von der vorherigen Iteration als festgeschrieben markiert wurde? – jimijazz

Antwort

1

Ich habe gefunden, wie Suche funktioniert, und bei fehlgeschlagener Nachricht muss ich alle Offsets für alle Partitionen des aktuellen Verbrauchers suchen.

private void seekAllPartitions() { 
    logger.info("Processing of some kafka message was failed, seeking all partitions to last committed"); 
    List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(kafkaTopic); 
    for (PartitionInfo partitionInfo : partitionInfos) { 
     TopicPartition topicPartition = new TopicPartition(kafkaTopic, partitionInfo.partition()); 
     OffsetAndMetadata committedForPartition = kafkaConsumer.committed(topicPartition); 
     if (committedForPartition != null) { 
      kafkaConsumer.seek(topicPartition,committedForPartition.offset()); 
     } 
    } 
} 

Null-Check für committedForPartition benötigt wird, wenn zuletzt auf einige Teilung einiger Verbrauchergruppe ausgeglichen wurde noch nicht festgelegt (unbekannt)

Verwandte Themen