2017-12-05 2 views
1

Ich habe eine Kafka Stream App, die ein einzelnes Thema als KTable liest, wandelt jedes Element in 0-n Elemente um und schreibt sie alle in ein anderes Thema. Der Fluss sieht wie folgt aus (vereinfacht durch eine recht Bit):Kafka Streams Mapping KTable Wert in separate Werte

('a', '123') -> ('a', '1,2,3') -> ('a1', '1'), ('a2', '2'), ('a3', '3') 

Ist das machbar mit Kafka Stream-DSL? Alle verwendeten Themen sind compact, deshalb möchte ich eine Tabelle simulieren und nie alte Werte loswerden.

tl; dr; Wie transformiere ich eine Nachricht in mehrere Nachrichten?

Antwort

2

Nicht sicher, ob Sie das mit der DSL ausdrücken können. Aber es ist ziemlich einfach diese mit Prozessor-API zu tun:

builder.stream("input-topic").transform(...).to("output-topic"); 

Sie befestigen einen Schlüssel-Wert-Speicher Ihrer transform() und zeichne die folgenden für jeden Eingang:

  1. überprüfen, ob es einen entsprechenden Schlüssel -Wertpaar im Laden
    • Wenn ja (dh store.get()!=null), nehmen Sie den alten Wert aus dem Geschäft und teilen Sie ihn; ersetzen den Wert für jede „split record“ mit null und emittieren alle jene Datensätze
  2. wenn Eingabesatz value!=null, den Eingabedatensatz setzen in den Laden, wie sie ist, und teilen den Eingabedatensatz und emittieren die einzelnen Ausgangsdatensätzen
  3. wenn Eingabesatz value==null, löschen key aus dem Laden

Schauen Sie sich die Dokumentation für weitere Informationen: https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration

Verwandte Themen