Ich habe versucht, Beispiele auf Flink Windowing, und um das Timing des Fensters zu überprüfen Ich habe einen Zeitstempel zum Stream-Ereignis hinzugefügt. Und ich fand heraus, dass die Dauer des Fensters weniger als die Fensterlänge war. Auch wenn ich ein gleitendes Fenster verwenden und das Ereignis ändern sollte, bekomme ich das modifizierte Ereignis in das nächste Fenster.Windowing nicht die Fensterlänge abgeschlossen
Wenn ich die Fensterlänge angabe, wartet es nicht auf das Fenster zu vervollständigen? Und die überlappenden Ereignisse zwischen gleitenden Fenstern beziehen sich auf dieselbe Instanz? (Ich bin mir bewusst, dass Ströme sind unveränderlich Strukturen)
public class WindowDemo {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
Properties prop=PropertyLoader.loadPropertiesForConsumer("WC",0);
FlinkKafkaConsumer09<Alarm> consumer= new FlinkKafkaConsumer09<Alarm>("topic_smartEmse", new AlarmSchema(), prop);
DataStream<Alarm> inputStream= env.addSource(consumer);
inputStream= inputStream.flatMap(new FlatMapFunction<Alarm, Alarm>() {
@Override
public void flatMap(Alarm value, Collector<Alarm> out)
throws Exception {
System.out.println("flatMap Started at "+System.currentTimeMillis());
value.setUserDefined10("IN TIME "+System.currentTimeMillis());
out.collect(value);
System.out.println("flatMap Ended at "+System.currentTimeMillis());
}
});
KeyedStream<Alarm, String> keyedStream= inputStream.keyBy(new KeySelector<Alarm, String>(){
@Override
public String getKey(Alarm value) throws Exception {
System.out.println("getKey Started at "+System.currentTimeMillis());
return "XX";
}});
DataStream<Alarm> dataStream= keyedStream.timeWindow(Time.of(90, TimeUnit.SECONDS)).apply(new WindowFunction<Alarm, Alarm, String, TimeWindow>() {
@Override
public void apply(String key, TimeWindow window,
Iterable<Alarm> input, Collector<Alarm> out)
throws Exception {
System.out.println("timeWindow Started at "+System.currentTimeMillis());
int count=0;
System.out.println("Key : "+key);
System.out.println("Values : "+input);
Iterator<Alarm> itr= input.iterator();
while (itr.hasNext()){
Alarm alarm= itr.next();
alarm.setUserDefined1(""+count++);
out.collect(alarm);
}
System.out.println("timeWindow ended at "+System.currentTimeMillis());
}
});
dataStream= dataStream.flatMap(new FlatMapFunction<Alarm, Alarm>() {
@Override
public void flatMap(Alarm value, Collector<Alarm> out)
throws Exception {
value.setUserDefined11("OUT TIME "+System.currentTimeMillis());
out.collect(value);
}
});
dataStream.printToErr();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Der Code, den Sie freigegeben haben, macht eine manuelle Korrektur für die Zeitausrichtung. Ich frage mich, ob dies hätte getan werden sollen, indem man die Zeitmerkmale implizit setzt (was ich versucht habe). –
Dies ist nicht mein Code. Seine aus den flink-Quellen: https://github.com/apache/flink/blob/release-1.1.4-rc1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/ windowing/assigners/TumblingProcessingTimeWindows.java – TobiSH
Okay, das macht Sinn. Ich wollte auch wissen, ob Änderungen, die von einem (gleitenden) Fenster gemacht werden, für andere Fenster sichtbar wären? –