I leftjoin eine KStream mit einem K-Tabelle Fügen, aber ich habe keine Ausgabe an den Ausgangs Thema finden Sie unter:eine K-Tabelle mit einem KStream und nichts kommt in der Ausgabe Thema
val stringSerde: Serde[String] = Serdes.String()
val longSerde: Serde[java.lang.Long] = Serdes.Long()
val genericRecordSerde: Serde[GenericRecord] = new GenericAvroSerde()
val builder = new KStreamBuilder()
val networkImprStream: KStream[Long, GenericRecord] = builder
.stream(dfpGcsNetworkImprEnhanced)
// Create a global table for advertisers. The data from this global table
// will be fully replicated on each instance of this application.
val advertiserTable: GlobalKTable[java.lang.Long, GenericRecord]= builder.globalTable(advertiserTopicName, "advertiser-store")
// Join the network impr stream to the advertiser global table. As this is global table
// we can use a non-key based join with out needing to repartition the input stream
val networkImprWithAdvertiserNameKStream: KStream[java.lang.Long, GenericRecord] = networkImprStream.leftJoin(advertiserTable,
(_, networkImpr) => {
println(networkImpr)
networkImpr.get("advertiserId").asInstanceOf[java.lang.Long]
},
(networkImpr: GenericRecord, adertiserIdToName: GenericRecord) => {
println(networkImpr)
networkImpr.put("advertiserName", adertiserIdToName.get("name"))
networkImpr
}
)
networkImprWithAdvertiserNameKStream.to(networkImprProcessed)
val streams = new KafkaStreams(builder, streamsConfiguration)
streams.cleanUp()
streams.start()
// usually the stream application would be running forever,
// in this example we just let it run for some time and stop since the input data is finite.
Thread.sleep(15000L)
Wenn ich das umgehen beitreten und Ich gebe das eingegebene Thema direkt an die Ausgabe aus, ich sehe Nachrichten ankommen. Ich habe den Join bereits zu einem linken Join geändert und einige printlns hinzugefügt, um zu sehen, wann der Schlüssel extrahiert wird (es wird jedoch nichts auf der Konsole gedruckt). Außerdem benutze ich jedes Mal das kafka streams reset tool, also von Anfang an. Mir gehen hier die Ideen aus. Außerdem habe ich Testzugriff auf den Speicher hinzugefügt und es funktioniert und enthält Schlüssel aus dem Stream (obwohl dies keine Ausgabe wegen der linken Verknüpfung verbieten sollte).