ich tue zwischen zwei einfachen KTables verbinden und lassen vermuten, dass ich zwei Themen mit den folgenden Daten haben, Details siehe:Kafka Streams falsch verbunden Ergebnisse von zwei einfachen KTables
Erstes Thema
1 | Victor C.
1 | Vadim C.
2 | Vasile P.
3 | Vitalie C.
4 | Oleg C.
Zweites Thema
1 | Programmierer
2 | Administrator
3 | Manager
SQL Verhalten ist offensichtlich klar, wenn ich die folgende Abfrage mache und die Ausgabe ist klar verständlich für mich:
SELECT * FROM firstTable
INNER JOIN secondTable
ON firstTable.ID = secondTable.ID
1 | Victor C. | 1 | Programmierer
1 | Vadim C. | 1 | Programmierer
2 | Vasile P. | 2 | Administrator
3 | Vitalie C. | 3 |
Manager
Also, ich arround Kafka spielen und ich versuchte das gleiche Verhalten aber die Ergebnisse von verbundenen Strömen zu tun verwirrten vollständig mein Geist
die Code-Schnipsel Details:
@Test
public void joinKTableToKTableWhereKeyValueIsIntegerAndString() throws Exception {
InternalTestConfiguration itc = getInternalTestConfiguration();
List <String> topics = Arrays.asList(itc.getFirstTopic(), itc.getSecondTopic(), itc.getProjectionTopic(), itc.getFirstKeyedTopic(), itc.getSecondKeyedTopic());
KafkaStreams streams = null;
try {
Integer partitions = 1;
Integer replication = 1;
RestUtils.createTopics(topics, partitions, replication, new Properties());
List < KeyValue < Integer, String >> employees = Arrays.asList(
new KeyValue < > (1, "Victor C."),
new KeyValue < > (1, "Vadim C."),
new KeyValue < > (2, "Vasile P."),
new KeyValue < > (3, "Vitalie C."),
new KeyValue < > (4, "Oleg C.")
);
List < KeyValue < Integer, String >> specialities = Arrays.asList(
new KeyValue < > (1, "Programmer"),
new KeyValue < > (2, "Administrator"),
new KeyValue < > (3, "Manager")
);
List < KeyValue < Integer, String >> expectedResults = Arrays.asList(
new KeyValue < > (1, "Victor C./Programmer"),
new KeyValue < > (1, "Vadim C./Programmer"),
new KeyValue < > (2, "Vasile P./Administrator"),
new KeyValue < > (3, "Vitalie C../Manager")
);
final Serde <Integer> keySerde = Serdes.Integer();
final Serde <String> valueSerde = Serdes.String();
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, itc.getAppIdConfig());
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_CONNECT_CONFIG);
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
//streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
KStreamBuilder builder = new KStreamBuilder();
KTable < Integer, String > firstKTable = builder.table(keySerde, valueSerde, itc.getFirstTopic(), itc.getFirstStore());
KTable < Integer, String > secondKTable = builder.table(keySerde, valueSerde, itc.getSecondTopic(), itc.getSecondStore());
KTable < Integer, String > projectionKTable = firstKTable.join(secondKTable, (l, r) - > {
return l + "/" + r;
});
projectionKTable.to(keySerde, valueSerde, itc.getProjectionTopic());
streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
Properties cfg1 = new Properties();
cfg1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
cfg1.put(ProducerConfig.ACKS_CONFIG, "all");
cfg1.put(ProducerConfig.RETRIES_CONFIG, 0);
cfg1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
cfg1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceKeyValuesSynchronously(itc.getFirstTopic(), employees, cfg1);
Properties cfg2 = new Properties();
cfg2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
cfg2.put(ProducerConfig.ACKS_CONFIG, "all");
cfg2.put(ProducerConfig.RETRIES_CONFIG, 0);
cfg2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
cfg2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceKeyValuesSynchronously(itc.getSecondTopic(), specialities, cfg2);
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, itc.getGroupIdConfig());
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
List < KeyValue < Integer, String >> actualResults = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, itc.getProjectionTopic(), expectedResults.size());
assertThat(actualResults).containsExactlyElementsOf(expectedResults);
} finally {
if (streams != null) {
streams.close();
}
RestUtils.deleteTopics(topics);
}
}
I erwarten die gleichen Ergebnisse wie SQL haben, aber das ist nicht wahr.
Ergebnisse mit //streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0)“disabled
1, Vadim C./Programmer
2, Vasile P./Administrator
3, Vitalie C./Manager
1, Vadim C./Programmer
2, Vasile P./Administrator
3, Vitalie C./ Manager
Ergebnisse mit streamsConfiguration.put (StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); aktiviert
1, Vadim C./Programmer
2, Vasile P./Administrator
3, Vitalie C./Manager
Wie auch immer der Ergebnisse nicht beide die gleiche wie SQL sind, Bitte helfen Sie mir, dies zu verstehen, weil ich bereits mein Selbst töte :(