Ich möchte Ereignisse in EvenTime mit gleitenden Fenstern verarbeiten. Das Gleitintervall beträgt 24 Stunden und das Inkrement 30 Minuten. Das Problem besteht darin, dass unter dem Code 48 Berechnungen für jedes Ereignis erstellt werden. In unserem Fall kommen die Ereignisse in Ordnung, so dass wir nur das letzte Fenster zur Auswertung benötigen.Evaluieren Sie nur das neueste Fenster für ereigniszeitbasierte gleitende Fenster
Danke,
Dejan
public static void processEventsa(
DataStream<Tuple2<String, MyEvent>> events) throws Exception {
events.assignTimestampsAndWatermarks(new MyWatermark()).
keyBy(0).
timeWindow(Time.hours(windowSizeHour), Time.seconds(windowSlideSeconds)).
apply(new WindowFunction<Tuple2<String, MyEvent>, Tuple2<String, MyEvent>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple key, TimeWindow window, Iterable<Tuple2<String, MyEvent>> input,
Collector<Tuple2<String, MyEvent>> out) throws Exception {
for (Tuple2<String, MyEvent> record : input) {
}
}
});
}
public class MyWatermark implements
AssignerWithPunctuatedWatermarks<Tuple2<String, MyEvent>> {
@Override
public long extractTimestamp(Tuple2<String, MyEvent> event, long previousElementTimestamp) {
return event.f1.eventTime;
}
@Override
public Watermark checkAndGetNextWatermark(Tuple2<String, MyEvent> event, long previousElementTimestamp) {
return new Watermark(event.f1.eventTime);
}
}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);