2017-01-17 5 views
1

Ich habe versucht, Beispiele auf Flink Windowing, und um das Timing des Fensters zu überprüfen Ich habe einen Zeitstempel zum Stream-Ereignis hinzugefügt. Und ich fand heraus, dass die Dauer des Fensters weniger als die Fensterlänge war. Auch wenn ich ein gleitendes Fenster verwenden und das Ereignis ändern sollte, bekomme ich das modifizierte Ereignis in das nächste Fenster.Windowing nicht die Fensterlänge abgeschlossen

Wenn ich die Fensterlänge angabe, wartet es nicht auf das Fenster zu vervollständigen? Und die überlappenden Ereignisse zwischen gleitenden Fenstern beziehen sich auf dieselbe Instanz? (Ich bin mir bewusst, dass Ströme sind unveränderlich Strukturen)

public class WindowDemo { 

public static void main(String[] args) { 
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 

    Properties prop=PropertyLoader.loadPropertiesForConsumer("WC",0); 
    FlinkKafkaConsumer09<Alarm> consumer= new FlinkKafkaConsumer09<Alarm>("topic_smartEmse", new AlarmSchema(), prop); 
    DataStream<Alarm> inputStream= env.addSource(consumer); 

    inputStream= inputStream.flatMap(new FlatMapFunction<Alarm, Alarm>() { 

     @Override 
     public void flatMap(Alarm value, Collector<Alarm> out) 
       throws Exception { 
      System.out.println("flatMap Started at "+System.currentTimeMillis()); 
      value.setUserDefined10("IN TIME "+System.currentTimeMillis()); 
      out.collect(value); 
      System.out.println("flatMap Ended at "+System.currentTimeMillis()); 
     } 
    }); 

    KeyedStream<Alarm, String> keyedStream= inputStream.keyBy(new KeySelector<Alarm, String>(){ 

     @Override 
     public String getKey(Alarm value) throws Exception { 
      System.out.println("getKey Started at "+System.currentTimeMillis()); 
      return "XX"; 
     }}); 

    DataStream<Alarm> dataStream= keyedStream.timeWindow(Time.of(90, TimeUnit.SECONDS)).apply(new WindowFunction<Alarm, Alarm, String, TimeWindow>() { 

     @Override 
     public void apply(String key, TimeWindow window, 
       Iterable<Alarm> input, Collector<Alarm> out) 
       throws Exception { 
      System.out.println("timeWindow Started at "+System.currentTimeMillis()); 
      int count=0; 
      System.out.println("Key : "+key); 
      System.out.println("Values : "+input); 
      Iterator<Alarm> itr= input.iterator(); 
      while (itr.hasNext()){ 
       Alarm alarm= itr.next(); 
       alarm.setUserDefined1(""+count++); 

       out.collect(alarm); 
      } 
      System.out.println("timeWindow ended at "+System.currentTimeMillis()); 

     } 
    }); 

    dataStream= dataStream.flatMap(new FlatMapFunction<Alarm, Alarm>() { 

     @Override 
     public void flatMap(Alarm value, Collector<Alarm> out) 
       throws Exception { 
      value.setUserDefined11("OUT TIME "+System.currentTimeMillis()); 
      out.collect(value); 
     } 
    }); 
    dataStream.printToErr(); 
    try { 
     env.execute(); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 
} 

Antwort

1

Wenn ich habe Sie richtig Ihre Sorge ist, dass die Fenster auswertet (gelten genannt wird) vor der vorgegebenen Zeit fertig war. Ich habe den gleichen Effekt für die erste Auswertung des Fensters bemerkt. Es scheint, als wäre der Zeitschlitz irgendwie ausgerichtet. Ich begann die Verarbeitung um 19.09.13 Uhr und das erste Mal war das Fenster ausgewertet um 19:10:30 Uhr, also nach 77 Sekunden. Nach diesem ersten Anruf wurde das Fenster nicht genau, sondern alle 90 Sekunden geschlossen.

Für die TumblingProcessingTimeWindows (die Sie verwenden) Es scheint, dieser Code zu sein:

public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { 

    private long size; 

    private TumblingProcessingTimeWindows(long size) { 
     this.size = size; 
    } 

    @Override 
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { 

     final long now = context.getCurrentProcessingTime(); 
     // here goes the alignment 
     long start = now - (now % size); 
     return Collections.singletonList(new TimeWindow(start, start + size)); 
    } 

ist das sinnvoll für Sie?

+0

Der Code, den Sie freigegeben haben, macht eine manuelle Korrektur für die Zeitausrichtung. Ich frage mich, ob dies hätte getan werden sollen, indem man die Zeitmerkmale implizit setzt (was ich versucht habe). –

+0

Dies ist nicht mein Code. Seine aus den flink-Quellen: https://github.com/apache/flink/blob/release-1.1.4-rc1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/ windowing/assigners/TumblingProcessingTimeWindows.java – TobiSH

+0

Okay, das macht Sinn. Ich wollte auch wissen, ob Änderungen, die von einem (gleitenden) Fenster gemacht werden, für andere Fenster sichtbar wären? –

Verwandte Themen