2017-06-08 2 views
1

I leftjoin eine KStream mit einem K-Tabelle Fügen, aber ich habe keine Ausgabe an den Ausgangs Thema finden Sie unter:eine K-Tabelle mit einem KStream und nichts kommt in der Ausgabe Thema

val stringSerde: Serde[String] = Serdes.String() 
    val longSerde: Serde[java.lang.Long] = Serdes.Long() 
    val genericRecordSerde: Serde[GenericRecord] = new GenericAvroSerde() 

    val builder = new KStreamBuilder() 

    val networkImprStream: KStream[Long, GenericRecord] = builder 
    .stream(dfpGcsNetworkImprEnhanced) 

    // Create a global table for advertisers. The data from this global table 
    // will be fully replicated on each instance of this application. 
    val advertiserTable: GlobalKTable[java.lang.Long, GenericRecord]= builder.globalTable(advertiserTopicName, "advertiser-store") 

    // Join the network impr stream to the advertiser global table. As this is global table 
    // we can use a non-key based join with out needing to repartition the input stream 
    val networkImprWithAdvertiserNameKStream: KStream[java.lang.Long, GenericRecord] = networkImprStream.leftJoin(advertiserTable, 
    (_, networkImpr) => { 
     println(networkImpr) 
     networkImpr.get("advertiserId").asInstanceOf[java.lang.Long] 
    }, 
    (networkImpr: GenericRecord, adertiserIdToName: GenericRecord) => { 
     println(networkImpr) 
     networkImpr.put("advertiserName", adertiserIdToName.get("name")) 
     networkImpr 
    } 
) 

    networkImprWithAdvertiserNameKStream.to(networkImprProcessed) 

    val streams = new KafkaStreams(builder, streamsConfiguration) 
    streams.cleanUp() 
    streams.start() 
    // usually the stream application would be running forever, 
    // in this example we just let it run for some time and stop since the input data is finite. 
    Thread.sleep(15000L) 

Wenn ich das umgehen beitreten und Ich gebe das eingegebene Thema direkt an die Ausgabe aus, ich sehe Nachrichten ankommen. Ich habe den Join bereits zu einem linken Join geändert und einige printlns hinzugefügt, um zu sehen, wann der Schlüssel extrahiert wird (es wird jedoch nichts auf der Konsole gedruckt). Außerdem benutze ich jedes Mal das kafka streams reset tool, also von Anfang an. Mir gehen hier die Ideen aus. Außerdem habe ich Testzugriff auf den Speicher hinzugefügt und es funktioniert und enthält Schlüssel aus dem Stream (obwohl dies keine Ausgabe wegen der linken Verknüpfung verbieten sollte).

Antwort

2

In meinem Quellstream ist der Schlüssel null. Obwohl ich diesen Schlüssel nicht verwende, um der Tabelle beizutreten, darf dieser Schlüssel nicht null sein. Um einen Zwischenstream mit einem Dummy-Key zu erstellen, funktioniert es. Also auch ich habe eine globale K-Tabelle hier die Einschränkungen für die Schlüssel für die Stream-Nachrichten gelten auch hier: http://docs.confluent.io/current/streams/developer-guide.html#kstream-ktable-join

Eingabedatensätze für den Strom mit einer Null-Taste oder einem Nullwert ignoriert und löst keines der Join .

Verwandte Themen