2017-07-17 7 views
0

Ich benutze Kafka Consumer, um von mehreren Themen zu lesen, und ich brauche eine von ihnen, um höhere Priorität zu haben. Die Verarbeitung nimmt viel Zeit in Anspruch, und es gibt immer viele Nachrichten in Themen mit niedriger Priorität, aber die Nachrichten von anderen müssen so schnell wie möglich verarbeitet werden.Kafka Consumer - Topic (s) mit höherer Priorität

Es ist eine ähnliche Frage wie Does Kafka support priority for topic or message?, aber diese verwendet alte API.

In neuen API (0.10.1.1), gibt es Methoden,

KafkaConsumer::pause(Collection) 
KafkaConsumer::resume(Collection) 

Aber es ist mir nicht klar, wie effektiv zu erkennen, dass es neue Nachrichten in hohem Priorität Thema und es ist notwendig Verbrauch zu pausieren von den anderen Themen.

Irgendwelche Ideen/Beispiele?

+1

Sie können überprüfen, ob endOffsets für die Partitionen, die Sie überwachen, größer als die letzten festgeschriebenen Offsets für diese Partitionen sind. Wie das genau funktioniert, wird implementationsspezifisch sein, aber das wird dich wissen lassen, ob es mehr Nachrichten zu konsumieren gibt, bevor du abrufst. – dawsaw

Antwort

1

Schließlich löste ich, dass, wie dawsaw geraten - in Verarbeitungsschleife, speichere ich für alle Themen/Partitionen ich gelesen:

  • beginningOffsets
  • endOffsets
  • begangen - verwenden kann ich nicht Stellung da ich Themen abonniere, nicht auf Partitionen.

Wenn (endOffset - commited) > 0 für jede Priorität Thema, nenne ich consumer.pause() für nicht prioritäre Themen und diejenigen wieder fortgesetzt, nachdem (endOffset - commited) == 0 für alle vorrangigen Themen.

+0

Kannst du bitte deine Strategie teilen, um das Problem zu lösen? Angenommen, wir haben (insgesamt 10 Gb) Nachrichten mit niedriger Priorität und einige Nachrichten mit hoher Priorität. Wir haben mehrere Konsumenten und mehrere Produzenten. Auch wenn wir die Konsumenten pausieren, müssen wir auch die Produzenten aller anderen Themen pausieren, um Ihre Idee zu verwirklichen. Recht? Hattest du irgendwelche Erfahrungen dazu, weil es in einem 100 Service und 10s Themen-Ökosystem fast unmöglich scheint? - Und ja, ich habe Ihre diesbezügliche andere Frage zu diesem Thema gelesen. Danke – JSBach

+0

Nein - es gibt keine Notwendigkeit, irgendeinen Produzenten anzuhalten - die Idee ist, dass Sie einzelne Verbraucher zu mehreren Themen abonniert haben (einige dieser Themen haben hohe Priorität und andere normale Priorität). Bevor Sie nach neuen Nachrichten suchen, müssen Sie die Verzögerung (en) für das Thema mit hoher Priorität überprüfen. Wenn einer dieser Verzögerungen nicht Null ist, bedeutet dies, dass Sie das Abonnement für Themen mit normaler Priorität unterbrechen müssen, nicht für die "Stahl" -Zeit Ihres Verbrauchers. Nachdem Sie alle Nachrichten von Hi-Priority-Themen verarbeitet haben, können Sie die Normaln-Priorität-Themen erneut fortsetzen. – miran

+0

Danke. Ich kann mich nicht gerade trauen. Aber es riecht schlecht nach größeren Systemen. Sobald die Damm-Türen für eine große Menge von Daten geöffnet sind, muss ich hin und wieder prüfen, ob ich Ressourcen mit dieser Warteschlange mit niedriger Priorität verschwenden werde. Warum sollte ich? Recht. Sowieso. Danke nochmals – JSBach

0

Ich denke, dass Sie eine Mischung aus position() und committed() -Methoden könnten. Die Methode position() ruft den Offset des nächsten Datensatzes ab, der abgerufen wird, und die Methode committed() ruft den letzten festgeschriebenen Offset für die angegebene Partition ab (wie in der Dokumentation beschrieben). Bevor Sie mit der niedrigeren Priorität abfragen, können Sie die Position() und committed() für die höhere Priorität überprüfen. Wenn position() höher als committed() ist, können Sie die niedrigere Priorität und poll() auf der höheren Priorität() pausieren() und dann die niedrigere Priorität wieder aufnehmen.