2017-06-15 1 views
0

Also ich habe zwei verschiedene KStream s etwa so:Join Werte von zwei verschiedenen KStreams

Stream 1: (String Schlüssel, Object Wert1)

Stream 2: (String Schlüssel, Object Wert2)

Ich möchte ihnen beitreten, so dass ich mit einem Stream, der wie folgt aussieht (Object value1, Object value2).

Was ist der sauberste Weg, dies zu tun?

+0

Es gibt ein paar Arten kommen, je nachdem, was Sie erreichen wollen. Dies ist in informativen Artikel, der Ihnen weiter helfen könnte: https://cwiki.apache.org/confluence/display/KAFKA+Streams+Join+Semantics – jvwilge

Antwort

1

Eine Möglichkeit besteht darin, die beiden Datenströme so zu verbinden, dass der Wert des resultierenden Datenstroms eine Containerklasse ist, die beide ursprünglichen Werte enthält. Ordnen Sie dann den Stream zu, um die Werte aus dem Container zu entfernen, und verwenden Sie einen der Schlüssel als Schlüssel.

Code:

KStream<String, Object> stream1; 
KStream<String, Object> stream2; 

KStream<Object, Object> joinedStream = stream1 
     .join(stream2, (value1, value2) -> new MyValueContainer(value1, value2)) 
     .map((key, container) -> new KeyValue<Object, Object>(container.getValue1(), container.getValue2())); 
+0

in der Join Ich denke, dass Sie neue MyValueContainer (value1, value2). –

+0

@MichalBorowiecki Ich wollte das tun - danke, dass du es aufgezeigt hast –

Verwandte Themen