2017-11-10 5 views
0


In meinem aktuellen Projekt haben wir eine Datenpipeline mit Kafka, Kafka Connect, Elasticsearch erstellt. Die Daten enden zu einem Thema „Signal-Thema“ und sind von der Form
Gemeinsame Themen mit Kafka Streams verbinden?

KeyValue<id:String, obj:Signal> 

Im jetzt versuchen Kafka Streams einführen können Elasticsearch einige Verarbeitung der Daten in seinem Weg von Kafka zu tun.

Mein erstes Ziel ist es, in der Lage zu sein, die Daten mit verschiedenen Arten von Seiteninformationen zu verbessern. Ein typisches Szenario wäre, ein anderes Feld an die Daten anzuhängen, basierend auf einigen bereits in den Daten vorhandenen Informationen. Zum Beispiel enthalten die Daten ein "rawevent" -Feld und basierend darauf möchte ich eine "event-description" hinzufügen und dann zu einem anderen Thema ausgeben.

Was wäre der "richtige" Weg, dies zu implementieren?

Ich dachte an maby die Seitendaten auf einem separaten Thema in kafka

KeyValue<rawEvent:String, eventDesc:String> 

und mit Strömen Verbindung der beiden Themen hat, aber ich bin nicht sicher, wie das zu bewerkstelligen.
Wäre das möglich? Alle Beispiele, auf die ich gestoßen bin, scheinen es erforderlich zu machen, dass die Schlüssel der Datenquellen identisch sind und da ich nicht sicher bin, ob es möglich ist. Wenn jemand einen Ausschnitt dafür hat, wie das gemacht werden könnte, wäre es großartig.

Vielen Dank im Voraus.

Antwort

1

Sie haben zwei Möglichkeiten:

  1. Sie rawEvent von Signal und setzen als neue Key die Verbindung gegen einen KTable<rawEvent:String, eventDesc:String> zu tun extrahieren kann. So etwas wie KStream#selectKey(...)#join(KTable...)
  2. Sie können KStream-GlobalKTable Join tun: Dies ermöglicht das Extrahieren eines Nicht-Schlüssel-Join-Attributs aus dem KStream (in Ihrem Fall rawEvent), das verwendet wird, um eine GlobalKTable-Suche durchzuführen, um den Join zu berechnen.

Beachten Sie, dass beide unterschiedliche Semantik als KStream-K-Tabelle liefern verbindet join auf Zeit synchronisiert, während ein KStream-GlobalKTable verbinden ist nicht synchronisiert. In diesem Blogbeitrag finden Sie weitere Details: https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/

Verwandte Themen