2017-03-29 3 views
0

ich tue zwischen zwei einfachen KTables verbinden und lassen vermuten, dass ich zwei Themen mit den folgenden Daten haben, Details siehe:Kafka Streams falsch verbunden Ergebnisse von zwei einfachen KTables

Erstes Thema

1 | Victor C.

1 | Vadim C.

2 | Vasile P.

3 | Vitalie C.

4 | Oleg C.

Zweites Thema

1 | Programmierer

2 | Administrator

3 | Manager

SQL Verhalten ist offensichtlich klar, wenn ich die folgende Abfrage mache und die Ausgabe ist klar verständlich für mich:

SELECT * FROM firstTable 
INNER JOIN secondTable 
ON firstTable.ID = secondTable.ID 

1 | Victor C. | 1 | Programmierer

1 | Vadim C. | 1 | Programmierer

2 | Vasile P. | 2 | Administrator

3 | Vitalie C. | 3 |

Manager

Also, ich arround Kafka spielen und ich versuchte das gleiche Verhalten aber die Ergebnisse von verbundenen Strömen zu tun verwirrten vollständig mein Geist

die Code-Schnipsel Details:

@Test 
    public void joinKTableToKTableWhereKeyValueIsIntegerAndString() throws Exception { 
     InternalTestConfiguration itc = getInternalTestConfiguration(); 
     List <String> topics = Arrays.asList(itc.getFirstTopic(), itc.getSecondTopic(), itc.getProjectionTopic(), itc.getFirstKeyedTopic(), itc.getSecondKeyedTopic()); 
     KafkaStreams streams = null; 

     try { 
      Integer partitions = 1; 
      Integer replication = 1; 
      RestUtils.createTopics(topics, partitions, replication, new Properties()); 

      List < KeyValue < Integer, String >> employees = Arrays.asList(
       new KeyValue < > (1, "Victor C."), 
       new KeyValue < > (1, "Vadim C."), 
       new KeyValue < > (2, "Vasile P."), 
       new KeyValue < > (3, "Vitalie C."), 
       new KeyValue < > (4, "Oleg C.") 
     ); 

      List < KeyValue < Integer, String >> specialities = Arrays.asList(
       new KeyValue < > (1, "Programmer"), 
       new KeyValue < > (2, "Administrator"), 
       new KeyValue < > (3, "Manager") 
     ); 

      List < KeyValue < Integer, String >> expectedResults = Arrays.asList(
       new KeyValue < > (1, "Victor C./Programmer"), 
       new KeyValue < > (1, "Vadim C./Programmer"), 
       new KeyValue < > (2, "Vasile P./Administrator"), 
       new KeyValue < > (3, "Vitalie C../Manager") 
     ); 

      final Serde <Integer> keySerde = Serdes.Integer(); 
      final Serde <String> valueSerde = Serdes.String(); 

      Properties streamsConfiguration = new Properties(); 
      streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, itc.getAppIdConfig()); 
      streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG); 
      streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_CONNECT_CONFIG); 
      streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); 
      streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
      streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); 
      //streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); 
      streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
      streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); 

      KStreamBuilder builder = new KStreamBuilder(); 

      KTable < Integer, String > firstKTable = builder.table(keySerde, valueSerde, itc.getFirstTopic(), itc.getFirstStore()); 
      KTable < Integer, String > secondKTable = builder.table(keySerde, valueSerde, itc.getSecondTopic(), itc.getSecondStore()); 
      KTable < Integer, String > projectionKTable = firstKTable.join(secondKTable, (l, r) - > { 
       return l + "/" + r; 
      }); 

      projectionKTable.to(keySerde, valueSerde, itc.getProjectionTopic()); 

      streams = new KafkaStreams(builder, streamsConfiguration); 

      streams.start(); 

      Properties cfg1 = new Properties(); 
      cfg1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG); 
      cfg1.put(ProducerConfig.ACKS_CONFIG, "all"); 
      cfg1.put(ProducerConfig.RETRIES_CONFIG, 0); 
      cfg1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); 
      cfg1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
      IntegrationTestUtils.produceKeyValuesSynchronously(itc.getFirstTopic(), employees, cfg1); 

      Properties cfg2 = new Properties(); 
      cfg2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG); 
      cfg2.put(ProducerConfig.ACKS_CONFIG, "all"); 
      cfg2.put(ProducerConfig.RETRIES_CONFIG, 0); 
      cfg2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); 
      cfg2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
      IntegrationTestUtils.produceKeyValuesSynchronously(itc.getSecondTopic(), specialities, cfg2); 

      Properties consumerConfig = new Properties(); 
      consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG); 
      consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, itc.getGroupIdConfig()); 
      consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
      consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); 
      consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 

      List < KeyValue < Integer, String >> actualResults = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, itc.getProjectionTopic(), expectedResults.size()); 

      assertThat(actualResults).containsExactlyElementsOf(expectedResults);    
     } finally { 
      if (streams != null) { 
       streams.close(); 
      } 

      RestUtils.deleteTopics(topics); 
     } 
    } 

I erwarten die gleichen Ergebnisse wie SQL haben, aber das ist nicht wahr.

Ergebnisse mit //streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0)“disabled

1, Vadim C./Programmer

2, Vasile P./Administrator

3, Vitalie C./Manager

1, Vadim C./Programmer

2, Vasile P./Administrator

3, Vitalie C./ Manager

Ergebnisse mit streamsConfiguration.put (StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); aktiviert

1, Vadim C./Programmer

2, Vasile P./Administrator

3, Vitalie C./Manager

Wie auch immer der Ergebnisse nicht beide die gleiche wie SQL sind, Bitte helfen Sie mir, dies zu verstehen, weil ich bereits mein Selbst töte :(

Antwort

2

In Ihrem SQL-Vergleich scheint die Nummer kein Primärschlüssel zu sein, da sowohl Victor C. als auch Vadim C. zugeordnet sind 1

Dies funktioniert nicht in der KTable, wenn die Nummer der Schlüssel der Nachricht ist - Vadim C. überschreibt Victor C.. Aus diesem Grund haben Sie nur drei verschiedene Personen in der Ausgabe.

Der zweite Teil Ihrer Frage zum Caching-Verhalten der KTables. Wenn das Caching aktiviert ist (Ihr erstes Beispiel), werden die Joins ausgelöst, wenn die Caches geleert werden (standardmäßig 30 Sekunden). Es gibt auch an issue mit Duplikaten, wenn das Caching aktiviert ist. Wenn Sie das Caching deaktivieren, tritt das nicht auf, also ist das die "richtige" Ausgabe ohne Duplikate.

Ich vor kurzem blogged about join behaviour in Kafka 0.10.1 (also nicht die neueste Version, die ein paar Semantik geändert). Vielleicht ist das hilfreich für dich.

Verwandte Themen