2017-03-21 5 views
0

Ich habe einige sehr einfache Beispiel mit Join-Streams, wo zwei Themen haben die einfache Schlüssel Wert Struktur (Integer/String) und es funktioniert perfekt.Kafka beitreten Streams Filter für Schlüssel

Darf ich fragen, wie kann ich so etwas wie:

SELECT * FROM stream1, stream2 
WHERE stream1.key = stream2.key AND (stream1.key > 50 && stream1.key < 100) AND (stream2.key > 50 AND stream2.key < 100) 

Kafka so etwas wie dies erlaubt?

Schließlich, was ich tun möchte, ist 2-Streams verbunden filtern, wo Schlüssel GenericRecord sein wird, und es wird sieht irgendwie:

SELECT * FROM stream1, stream2 
WHERE stream1.genericRecordkey.someId. = stream2.genericRecordkey.someId 

Mein Test Beispiel:

public void joinKStreamToKStreamWhereKeyValueIsIntegerString() throws Exception { 
    String uniqueKey = new Object() { 
    }.getClass().getEnclosingMethod().getName(); 

    long timestamp = new Date().getTime(); 

    String firstTopic = String.format("%1$s_1_%2$s", uniqueKey, timestamp); 
    String secondTopic = String.format("%1$s_2_%2$s", uniqueKey, timestamp); 
    String outputTopic = String.format("%1$s_output_%2$s", uniqueKey, timestamp); 
    String appIdConfig = String.format("%1$s_app_id_%2$s", uniqueKey, timestamp); 
    String groupIdConfig = String.format("%1$s_group_id_%2$s", uniqueKey, timestamp); 

    List<KeyValue<Integer, String>> ikv1 = Arrays.asList(
      new KeyValue<>(1, "Bruce Eckel"), 
      new KeyValue<>(2, "Robert Lafore"), 
      new KeyValue<>(3, "Andrew Tanenbaum") 
    ); 

    List<KeyValue<Integer, String>> ikv2 = Arrays.asList(
      new KeyValue<>(3, "Modern Operating System"), 
      new KeyValue<>(1, "Thinking in Java"), 
      new KeyValue<>(3, "Computer Architecture"), 
      new KeyValue<>(4, "Programming in Scala") 
    ); 

    List<KeyValue<Integer, String>> expectedResults = Arrays.asList(
      new KeyValue<>(3, "Andrew Tanenbaum/Modern Operating System"), 
      new KeyValue<>(1, "Bruce Eckel/Thinking in Java"), 
      new KeyValue<>(3, "Andrew Tanenbaum/Computer Architecture") 
    ); 

    Integer partitions = 1; 
    Integer replication = 1; 
    Properties topicConfig = new Properties(); 

    TopicUtils.createTopic(firstTopic, partitions, replication, topicConfig); 
    TopicUtils.createTopic(secondTopic, partitions, replication, topicConfig); 
    TopicUtils.createTopic(outputTopic, partitions, replication, topicConfig); 

    final Serde<String> stringSerde = Serdes.String(); 
    final Serde<Integer> integerSerde = Serdes.Integer(); 

    Properties streamsConfiguration = new Properties(); 
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appIdConfig); 
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG); 
    streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_CONNECT_CONFIG); 
    streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); 
    streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
     // The commit interval for flushing records to state stores and downstream must be lower than 
     // this integration test's timeout (30 secs) to ensure we observe the expected processing results. 
     streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); 
     streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 

     // Use a temporary directory for storing state, which will be automatically removed after the test. 
     streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); 

     KStreamBuilder builder = new KStreamBuilder(); 

     KStream<Integer, String> firstStream = builder.stream(integerSerde, stringSerde, firstTopic); 
     KStream<Integer, String> secondStream = builder.stream(integerSerde, stringSerde, secondTopic); 

     KStream<Integer, String> outputStream = firstStream.join(secondStream, (l, r) -> { 
      return l + "/" + r; 
     }, JoinWindows.of(TimeUnit.SECONDS.toMillis(5)), integerSerde, stringSerde, stringSerde); 

     outputStream.to(integerSerde, stringSerde, outputTopic); 

     KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); 

     streams.start(); 

     Properties pCfg1 = new Properties(); 
     pCfg1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG); 
     pCfg1.put(ProducerConfig.ACKS_CONFIG, "all"); 
     pCfg1.put(ProducerConfig.RETRIES_CONFIG, 0); 
     pCfg1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); 
     pCfg1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
     IntegrationTestUtils.produceKeyValuesSynchronously(firstTopic, ikv1, pCfg1); 

     Properties pCfg2 = new Properties(); 
     pCfg2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG); 
     pCfg2.put(ProducerConfig.ACKS_CONFIG, "all"); 
     pCfg2.put(ProducerConfig.RETRIES_CONFIG, 0); 
     pCfg2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); 
     pCfg2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
     IntegrationTestUtils.produceKeyValuesSynchronously(secondTopic, ikv2, pCfg2); 

     Properties consumerConfig = new Properties(); 
     consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG); 
     consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig); 
     consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
     consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); 
     consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 

     List<KeyValue<Integer, String>> actualResults = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedResults.size()); 

     streams.close(); 

     assertThat(actualResults).containsExactlyElementsOf(expectedResults); 
    } 

Hoffe, dass ich gut erklärt und Danke für jede Hilfe.

Antwort

1

Sie können einfach eine filter anwenden, bevor Sie den Beitritt tun.

outputStream = firstStream.filter(...).join(secondStream.filter(...), ...); 

Wenn Sie auf stream1.genericRecordkey.someId anschließen möchten, müssen Sie someId erste und legen Sie es als Schlüssel extrahieren:

firstStream.selectKey((k,v) -> v.someId)).join(secondStream.selectKey((k,v) -> v.someId), ...); 

Für weitere Informationen die Dokumentation finden Sie unter: http://docs.confluent.io/current/streams/developer-guide.html

+0

Bis zum Ende habe nicht verstanden, wie ich generische Schlüssel und Werte zwischen zwei Themen filtern kann, ich meine .filter (keyFromTopic1.something == keyFromTopic2.something && valueFromTopic1.something == valueFromTopic 2.etwas) Können Sie bitte detaillierteren Code beschreiben? – EVO

+0

Ihr ursprüngliches Beispiel zeigte nur "einfache" Filterprädikate wie 'stream1.key> 50'. Für den komplexen Teil müssten Sie zuerst dem Schlüssel beitreten und nach dem Join einen zusätzlichen Filter (für den Wertfilter-Teil) anwenden. –

Verwandte Themen