für die Aufrechterhaltung meiner Verbraucher (sehr lange Verarbeitung variabler Länge) Ich implementiere eine leere Poll() Aufruf in einem Hintergrund-Thread, die den Broker Rebalancing halten, wenn ich zu viel Zeit zwischen verbringen Umfragen(). Ich habe mein Poll-Intervall sehr lang gewählt, aber ich möchte es nicht immer für immer längere Verarbeitung erhöhen.Kafka Proper Weg zum Polling keine Datensätze
Was ist der richtige Weg, um keine Datensätze abzufragen? Derzeit rufe ich poll() auf und suche dann erneut nach den frühesten Offsets für jede Partition, die im Pollaufruf() zurückgegeben wurde, damit sie vom Hauptthread gelesen werden können, nachdem die vorherigen Nachrichten verarbeitet wurden.
ConsumerRecords<String, String> msgs = kafkaConsumer.poll(timeout);
Map<Integer, Long> partitionToOffsets = getEarliestPartitionOffsets(msgs); // helper method
seekToOffsets(partitionToOffsets);
Welche Kafka Version sind Sie? Seit 0.10.1.0 ist es [nicht mehr notwendig, in einem separaten Thread zu pollen] (https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats + von + a + Hintergrund + Thread). –