2017-07-05 4 views
0

für die Aufrechterhaltung meiner Verbraucher (sehr lange Verarbeitung variabler Länge) Ich implementiere eine leere Poll() Aufruf in einem Hintergrund-Thread, die den Broker Rebalancing halten, wenn ich zu viel Zeit zwischen verbringen Umfragen(). Ich habe mein Poll-Intervall sehr lang gewählt, aber ich möchte es nicht immer für immer längere Verarbeitung erhöhen.Kafka Proper Weg zum Polling keine Datensätze

Was ist der richtige Weg, um keine Datensätze abzufragen? Derzeit rufe ich poll() auf und suche dann erneut nach den frühesten Offsets für jede Partition, die im Pollaufruf() zurückgegeben wurde, damit sie vom Hauptthread gelesen werden können, nachdem die vorherigen Nachrichten verarbeitet wurden.

ConsumerRecords<String, String> msgs = kafkaConsumer.poll(timeout); 
Map<Integer, Long> partitionToOffsets = getEarliestPartitionOffsets(msgs); // helper method 
seekToOffsets(partitionToOffsets); 
+0

Welche Kafka Version sind Sie? Seit 0.10.1.0 ist es [nicht mehr notwendig, in einem separaten Thread zu pollen] (https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats + von + a + Hintergrund + Thread). –

Antwort

2

Der richtige Weg, lange Verarbeitungszeit (und die Vermeidung von Verbrauchern Neuverteilung) zu handhaben ist KafkaConsumer.pause()/KafkaConsumer.resume() Methoden zu verwenden. Hier können Sie mehr darüber lesen:

+0

Oh cool, danke. Weißt du, was passiert, wenn der Benutzer nach der Pause() aber vor dem Fortsetzen() abstürzt? – kyl

+0

Ich denke, ich habe es herausgefunden, pausieren Sie nur Ursachen Umfragen() für diesen bestimmten Verbraucher, nichts zurückzugeben, Partitionen immer noch neu zugewiesen, wenn der Verbraucher abstürzt. – kyl

Verwandte Themen