1

Ich versuche einen Weg zu finden, meine Kafka-Nachrichten neu zu ordnen und bestellte Nachrichten an ein neues Thema zu senden mit Apache Beam in Verbindung mit Google DataFlow.Apache Beam Kombinieren Werte gruppiert

Ich habe Kafka Verlag, der String-Nachrichten im folgenden Format sendet: {system_timestamp}-{event_name}?{parameters}

zum Beispiel:

1494002667893-client.message?chatName=1c&messageBody=hello 
1494002656558-chat.started?chatName=1c&chatPatricipants=3 

Was ich will ist Ereignisse tun Nachbestellung basierend auf {System-Zeitstempel} Teil der Nachricht und innerhalb eines 5-Sekunden-Fensters, da unsere Publisher nicht garantieren, dass Nachrichten gemäß dem Wert {system-timestamp} gesendet werden.

Ich habe ein Mock Sortierer Funktion geschrieben, die Ereignisse, die von Kafka (mit KafkaIO Quelle) sortiert:

static class SortEventsFunc extends DoFn<KV<String, Iterable<String>>, KV<String, Iterable<String>>> { 

    @ProcessElement 
    public void processElement(ProcessContext c) { 
     KV<String, Iterable<String>> element = c.element(); 

     System.out.println(""); 
     System.out.print("key: " + element.getKey() + ";"); 

     Iterator<String> it = element.getValue().iterator(); 
     List<String> list = new ArrayList<>(); 
     while (it.hasNext()) { 
      String val = it.next(); 
      System.out.print("value: " + val); 
      list.add(val); 
     } 
     Collections.sort(list, Comparator.naturalOrder()); 
     c.output(KV.of(element.getKey(), list)); 
    } 
} 

public static void main(String[] args) { 
    PipelineOptions options = PipelineOptionsFactory.create(); 

    DirectOptions directOptions = options.as(DirectOptions.class); 
    directOptions.setRunner(DirectRunner.class); 

    // Create the Pipeline object with the options we defined above. 
    Pipeline pipeline = Pipeline.create(options); 
    pipeline 
     // read from Kafka 
     .apply(KafkaIO.<String,String>read() 
      .withBootstrapServers("localhost:9092") 
      .withTopics(new ArrayList<>((Arrays.asList("events")))) 
      .withKeyDeserializer(StringDeserializer.class) 
      .withValueDeserializer(StringDeserializer.class) 
      .withoutMetadata()) 
     // apply window 
     .apply(Window.<KV<String,String>>into(
       FixedWindows.of(Duration.standardSeconds(5L)))) 
     // group by key before sorting 
     .apply(GroupByKey.<String, String>create()) // return PCollection<KV<String, Iterable<String>> 
     // sort events 
     .apply(ParDo.of(new SortEventsFunc())) 
     //combine KV<String, Iterable<String>> input to KafkaIO acceptable KV<String, String> format 
     .apply(Combine.perKey()) //:TODO somehow convert KV<String, Iterable<String>> to KV<String, String> 
     // write ordered events to Kafka 
     .apply(KafkaIO.<String, String>write() 
       .withBootstrapServers("localhost:9092") 
       .withTopic("events-sorted") 
       .withKeySerializer(StringSerializer.class) 
       .withValueSerializer(StringSerializer.class) 
      ); 
    pipeline.run(); 
} 

Also habe ich gruppiert Nachrichten mit GroupByKey.<String, String>create() verwandeln, indem nach sortrin Ereignisse muss ich irgendwie um sie von KV<String, Iterable<String>> in KafkaIO KV<String, String> or KV<Void, String> Werte zu konvertieren. Also alles, was ich tun möchte, ist durch die Gruppierung Transformationsschlüssel und einfach jeder Wert als separate Nachricht an KafkaIO Schreiber erstellt ignorieren zu ignorieren.

Ich erforschte Combine#perKey verwandeln, aber es nimmt SerializableFunction, dass nur alle Werte auf einen String kombinieren können. (Mit etwas Trennzeichen), als Ergebnis, das ich nur einen Wert als eine verkettete Zeichenfolge übergeben, anstatt jeden Wert (das wurde gelesen von KafkaIO#read()) KafkaIO Schriftsteller.

Antwort

1

Es ist eigentlich ganz einfach! Der Trick hier ist, dass Sie c.output so oft wie Sie wünschen, innerhalb der @ProcessElement Methode anrufen können.

In diesem Fall definieren Sie einfach eine DoFn<KV<String, Iterable<String>>, KV<String, String>>, iterieren Sie über die c.element().getValue() Sammlung, und rufen Sie c.output für jede von ihnen.

Verwandte Themen