Ich möchte feststellen, ob zwei Ereignisse in einem definierten Zeitrahmen auf der Grundlage von zwei Ereignissen auftreten, die denselben Bezeichner haben. Zum Beispiel sieht ein DoorEvent
wie folgt aus:Stateful Komplexe Ereignisverarbeitung mit Apache flink
<doorevent>
<door>
<id>1</id>
<status>open</status>
</door>
<timestamp>12345679</timestamp>
</doorevent>
<doorevent>
<door>
<id>1</id>
<status>close</status>
</door>
<timestamp>23456790</timestamp>
</doorevent>
Meine DoorEvent
Java-Klasse im Beispiel unten hat die gleiche Struktur.
Ich möchte erkennen, dass Tür mit ID 1 innerhalb von 5 Minuten nach dem Öffnen schließt. Ich versuche, die Apache flink CEP-Bibliothek für diesen Zweck zu verwenden. Der eingehende Stream enthält alle offenen und geschlossenen Nachrichten von beispielsweise 20 Türen.
Pattern<String, ?> pattern = Pattern.<String>begin("door_open").where(
new SimpleCondition<String>() {
private static final long serialVersionUID = 1L;
public boolean filter(String doorevent) {
DoorEvent event = new DoorEvent().parseInstance(doorevent, DataType.XML);
if (event.getDoor().getStatus().equals("open")){
// save state of door as open
return true;
}
return false;
}
}
)
.followedByAny("door_close").where(
new SimpleCondition<String>() {
private static final long serialVersionUID = 1L;
public boolean filter(String doorevent) throws JsonParseException, JsonMappingException, IOException {
DoorEvent event = new DoorEvent().parseInstance(doorevent, DataType.XML);
if (event.getDoor().getStatus().equals("close")){
// check if close is of previously opened door
return true;
}
return false;
}
}
)
.within(Time.minutes(5));
Wie speichere ich den Zustand der Tür 1 so offen in den door_open
, so dass in dem door_close
Schritt weiß ich, dass Tür 1 ist das eines geschlossen ist, und es ist nicht eine andere Tür?