2017-07-20 4 views
0

Ich habe ein Problem, das ich nicht wirklich herausfinden kann. So habe ich einen kafka Strom, der einige Daten wie folgt enthält:Flink: Datei mit Kafka Stream verbinden

{"adId":"9001", "eventAction":"start", "eventType":"track", "eventValue":"", "timestamp":"1498118549550"} 

Und ich will ‚AdID‘ mit einem anderen Wert ersetzen ‚BookingID‘. Dieser Wert befindet sich in einer CSV-Datei, aber ich kann nicht wirklich herausfinden, wie es funktioniert.

Hier ist meine Mapping CSV-Datei:

9001;8 
9002;10 

So ideal meine Ausgabe würde jede Stunde mindestens einmal diese Datei so etwas wie

{"bookingId":"8", "eventAction":"start", "eventType":"track", "eventValue":"", "timestamp":"1498118549550"} 

sein kann erfrischen, so dass es Änderungen abholen sollte zu ihm.

Im Moment habe ich diesen Code, die nicht für mich arbeiten:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
env.enableCheckpointing(30000); // create a checkpoint every 30 seconds 
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); 

DataStream<String> adToBookingMapping = env.readTextFile(parameters.get("adToBookingMapping")); 

DataStream<Tuple2<Integer,Integer>> input = adToBookingMapping.flatMap(new Tokenizer()); 

//Kafka Consumer 
Properties properties = new Properties(); 
properties.setProperty("bootstrap.servers", parameters.get("bootstrap.servers")); 
properties.setProperty("group.id", parameters.get("group.id")); 

FlinkKafkaConsumer010<ObjectNode> consumer = new FlinkKafkaConsumer010<>(parameters.get("inbound_topic"), new JSONDeserializationSchema(), properties); 

consumer.setStartFromGroupOffsets(); 

consumer.setCommitOffsetsOnCheckpoints(true); 

DataStream<ObjectNode> logs = env.addSource(consumer); 

DataStream<Tuple4<Integer,String,Integer,Float>> parsed = logs.flatMap(new Parser()); 

// output -> bookingId, action, impressions, sum 
DataStream<Tuple4<Integer, String,Integer,Float>> joined = runWindowJoin(parsed, input, 3); 


public static DataStream<Tuple4<Integer, String, Integer, Float>> runWindowJoin(DataStream<Tuple4<Integer, String, Integer, Float>> parsed, 
     DataStream<Tuple2<Integer, Integer>> input,long windowSize) { 

    return parsed.join(input) 
      .where(new ParsedKey()) 
      .equalTo(new InputKey()) 
      .window(TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS))) 
      //.window(TumblingEventTimeWindows.of(Time.milliseconds(30000))) 
      .apply(new JoinFunction<Tuple4<Integer, String, Integer, Float>, Tuple2<Integer, Integer>, Tuple4<Integer, String, Integer, Float>>() { 

       private static final long serialVersionUID = 4874139139788915879L; 

       @Override 
       public Tuple4<Integer, String, Integer, Float> join(
           Tuple4<Integer, String, Integer, Float> first, 
           Tuple2<Integer, Integer> second) { 
        return new Tuple4<Integer, String, Integer, Float>(second.f1, first.f1, first.f2, first.f3); 
       } 
      }); 
} 

Der Code nur einmal ausgeführt und dann stoppt, so dass es nicht konvertiert neue Einträge in kafka die CSV-Datei. Irgendwelche Ideen, wie ich den Stream von Kafka mit den neuesten Werten aus meiner CSV-Datei verarbeiten könnte?

Mit freundlichen Grüßen

darkownage

Antwort

0

Ihr Ziel mit einem langsam ändernden Katalogdaten dampfend beitreten zu sein scheint (das heißt eine Seiteneingang). Ich glaube nicht, dass die Operation join hier nützlich ist, weil sie die Katalogeinträge nicht über Windows hinweg speichert. Außerdem ist die Textdatei eine beschränkte Eingabe, deren Zeilen einmal gelesen werden.

Verwenden Sie connect, um einen verbundenen Stream zu erstellen, und speichern Sie die Katalogdaten als verwalteten Status, um Nachschlagevorgänge durchzuführen. Die Parallelität des Bedieners müsste 1 sein.

Sie können eine bessere Lösung finden, indem Sie "Seiteneingaben" erforschen und die Lösungen betrachten, die Leute heute verwenden. Siehe FLIP-17 und Dean Wampler's talk at Flink Forward.