2016-10-20 5 views
2

Ich habe eine Anforderung, Nachrichten im Namen einer Reihe von faulen Konsumenten zu konsumieren, die nur REST-APIs verfügbar macht. Daher plane ich, Sink Connectors zu haben, die Nachrichten von Kafka-Themen abrufen und HTTP-POST-Operationen für die exponierten APIs ausführen.Kafka connect throttling

Einer der wichtigsten Faktoren für die Berücksichtigung ist die Drosselung. Welchen Mechanismus schlagen Sie vor, um die Sink-Aufgaben zu drosseln, um die Tier-SLA der APIs zu erfüllen? Ich verstehe, dass Kafka eine Client-Quota-Funktion hat. Was ist jedoch der optimale Mechanismus, um API-Anfragen/min oder sec zu verfolgen, die es ermöglichen würden, die Client-Quote dynamisch anzupassen?

Antwort

5

Ich denke, der beste Weg, um die Begrenzung der Rate für Ihre REST-API zu implementieren, wäre in Ihrem Connector-Code durch Blockieren bei Bedarf in SinkTask.put(). Vielleicht möchten Sie darüber nachdenken, ob die Ratenbegrenzung auf der Ebene Ihrer 0 ausreicht, oder Sie benötigen sie global (komplexer seit der Koordination).

Der Vorteil der Verwendung von Kafka-Kontingenten, die Sie in Betracht gezogen haben, ist, dass der verteilte Aspekt für Sie behandelt wird, aber ich glaube, dass diese aktuell nur in Bytes übertragen werden können.

+0

Das ist richtig. Kafka-Client-Kontingente werden bei übertragenen Bytes konfiguriert, während die REST-API-Drosselung bei Nein ist. von Anfragen. In meinem Fall würde ich mehrere Sink-Aufgaben in mehreren Arbeitern in der Cloud ausführen. Wenn Sie global sagen, beziehen Sie sich auf die Zählung von Nein. von Anfragen in etwas wie einem Cache (redis)? Gibt es eine elegante Möglichkeit, dies über Kafka connect zu realisieren? – bhalochele

+1

Ja, verwenden Sie etwas wie Redis oder Memcached, um den gemeinsamen Status zu erhalten, den Sie für Raten-begrenzende API-Anfragen benötigen. Wenn Sie mit einer möglicherweise zu konservativen Ratenbegrenzung einverstanden sind, können Sie stattdessen einen Guava 'RateLimiter' für jede' SinkTask' initialisieren, die Sie mit der 'put()' Methode erhalten. Die Anzahl der Erlaubnisse, die Sie dem 'RateLimiter' geben können, könnte' (overall_limit/num_tasks) 'sein (Sie könnten das Limit auf Task-Ebene weitergeben, wenn Sie die Task-Konfiguration von' SinkConnector.taskConfigs() ') generieren. – shikhar

+0

Wenn wir Guava RateLimiter verwenden, glaube ich, dass wir die gleiche RateLimiter-Instanz für die Sink-Aufgaben benötigen, die über mehrere Worker hinweg über mehrere Racks laufen. Das heißt, Task 1 und Task 2 sollten auf eine einzelne Instanz von RateLimiter zugreifen können, auf der die Methode acquire() aufgerufen wird. Übrigens, Sie vermuten Guava RateLimiter als Alternative zu einem Caching-Mechanismus? – bhalochele