1

Gegeben: Ich habe zwei Themen in Kafka sagen wir Thema A und Thema B. Der Kafka Stream liest einen Datensatz aus Thema A, verarbeitet es und produziert mehrere Datensätze (sagen wir recordA und recordB) entsprechend dem konsumierten Datensatz. Jetzt stellt sich die Frage, wie ich das mit Kafka Streams erreichen kann.Kafka Streams: Ein Datensatz zu mehreren Datensätzen

KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() { 
     @Override 
     public List<Message> apply(final Message message) { 
      return consumerRecordHandler.process(message); 
     } 
    }).*someFunction*() 

Hier ist der Datensatz gelesen Nachricht; Nach der Verarbeitung wird eine Liste von Nachrichten zurückgegeben. Wie kann ich diese Liste in zwei Producer-Streams aufteilen? Jede Hilfe wird geschätzt.

Antwort

5

Ich bin nicht sicher, wenn ich die Frage richtig verstehe, und ich verstehe auch nicht die Antwort von @Abhishek :(

Wenn Sie einen Eingabestream haben, und Sie möchten, Null, eine oder mehrere Ausgabe Datensätze pro Eingabe Datensätze, Sie würden eine flatMap() oder flatMapValues() (abhängig davon, ob Sie den Schlüssel ändern möchten oder nicht)

anwenden

Sie fragen auch nach "Wie kann ich diese Liste in zwei Producer-Streams aufteilen?" Wenn Sie einen Stream in mehrere aufteilen möchten, können Sie branch() verwenden.

Weitere Einzelheiten verweise ich auf die docs: http://docs.confluent.io/current/streams/developer-guide.html#stateless-transformations

+0

@ user2538255 Fühlen Sie sich frei zu folgen, wenn meine Antwort unklar ist. –

+0

Das ist genau was ich mache. Nachdem ich auf Abhisheks Antwort gegoogelt hatte, landete ich auf diesem Beispiel https://github.com/conflutinc/examples/blob/kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/test/java/io/ konfluent/Beispiele/Streams/WordCountLambdaIntegrationTest.java – user2538255

+0

Habe die richtige Antwort akzeptiert :) Danke :) – user2538255

2

Was ist Ihr Schlüssel (Typ)? Ich vermute es ist nicht String. Nach der Ausführung der mapValues haben Sie dies - KStream<K,List<Message>>. Wenn K nicht String dann kann someFunction() ein map sein, die K in String umwandeln (wenn seine ist, haben Sie bereits das Ergebnis haben) und lassen Sie den List<Message> (den Wert) unberührt, da das Ergebnis Ihrer beabsichtigten Ende ist

+0

Ja, das works..thanks eine Tonne! – user2538255

Verwandte Themen