2017-12-07 1 views
0

Ich möchte Ereignisse in EvenTime mit gleitenden Fenstern verarbeiten. Das Gleitintervall beträgt 24 Stunden und das Inkrement 30 Minuten. Das Problem besteht darin, dass unter dem Code 48 Berechnungen für jedes Ereignis erstellt werden. In unserem Fall kommen die Ereignisse in Ordnung, so dass wir nur das letzte Fenster zur Auswertung benötigen.Evaluieren Sie nur das neueste Fenster für ereigniszeitbasierte gleitende Fenster

Danke,

Dejan

public static void processEventsa(
     DataStream<Tuple2<String, MyEvent>> events) throws Exception { 

    events.assignTimestampsAndWatermarks(new MyWatermark()). 
      keyBy(0). 
      timeWindow(Time.hours(windowSizeHour), Time.seconds(windowSlideSeconds)). 
      apply(new WindowFunction<Tuple2<String, MyEvent>, Tuple2<String, MyEvent>, Tuple, TimeWindow>() { 
       @Override 
       public void apply(Tuple key, TimeWindow window, Iterable<Tuple2<String, MyEvent>> input, 
              Collector<Tuple2<String, MyEvent>> out) throws Exception { 

        for (Tuple2<String, MyEvent> record : input) { 



        } 
       } 
      }); 
} 

public class MyWatermark implements 
     AssignerWithPunctuatedWatermarks<Tuple2<String, MyEvent>> { 

    @Override 
    public long extractTimestamp(Tuple2<String, MyEvent> event, long previousElementTimestamp) { 
     return event.f1.eventTime; 
    } 

    @Override 
    public Watermark checkAndGetNextWatermark(Tuple2<String, MyEvent> event, long previousElementTimestamp) { 
     return new Watermark(event.f1.eventTime); 
    } 
} 

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 

Antwort

0

Das Problem war in Wasserzeichen. AssignerWithPeriodicWatermarks sollte verwendet werden

public class MyWatermark implements 
     AssignerWithPeriodicWatermarks<Tuple2<String, MyEvent>> { 

    private final long maxTimeLag = 5000; 

    @Override 
    public long extractTimestamp(Tuple2<String, MyEvent> event, long previousElementTimestamp) { 
     try { 
      return event.f1.eventTime; 
     } 
     catch(NullPointerException ex) {} 

     return System.currentTimeMillis() - maxTimeLag; 
    } 

    @Override 
    public Watermark getCurrentWatermark() { 
     return new Watermark(System.currentTimeMillis() - maxTimeLag); 
    } 
}