2017-09-12 3 views
1

Ich möchte feststellen, ob zwei Ereignisse in einem definierten Zeitrahmen auf der Grundlage von zwei Ereignissen auftreten, die denselben Bezeichner haben. Zum Beispiel sieht ein DoorEvent wie folgt aus:Stateful Komplexe Ereignisverarbeitung mit Apache flink

<doorevent> 
    <door> 
    <id>1</id> 
    <status>open</status> 
    </door> 
    <timestamp>12345679</timestamp> 
</doorevent> 

<doorevent> 
    <door> 
    <id>1</id> 
    <status>close</status> 
    </door> 
    <timestamp>23456790</timestamp> 
</doorevent> 

Meine DoorEvent Java-Klasse im Beispiel unten hat die gleiche Struktur.

Ich möchte erkennen, dass Tür mit ID 1 innerhalb von 5 Minuten nach dem Öffnen schließt. Ich versuche, die Apache flink CEP-Bibliothek für diesen Zweck zu verwenden. Der eingehende Stream enthält alle offenen und geschlossenen Nachrichten von beispielsweise 20 Türen.

Pattern<String, ?> pattern = Pattern.<String>begin("door_open").where(
    new SimpleCondition<String>() { 
     private static final long serialVersionUID = 1L; 
     public boolean filter(String doorevent) { 
      DoorEvent event = new DoorEvent().parseInstance(doorevent, DataType.XML); 
      if (event.getDoor().getStatus().equals("open")){ 
       // save state of door as open 
       return true; 
      } 
      return false;       
     } 
    } 
) 
.followedByAny("door_close").where(
    new SimpleCondition<String>() { 
      private static final long serialVersionUID = 1L; 
      public boolean filter(String doorevent) throws JsonParseException, JsonMappingException, IOException { 
       DoorEvent event = new DoorEvent().parseInstance(doorevent, DataType.XML); 
       if (event.getDoor().getStatus().equals("close")){ 
        // check if close is of previously opened door 
        return true; 
       } 
       return false; 
      } 
     } 
) 
.within(Time.minutes(5)); 

Wie speichere ich den Zustand der Tür 1 so offen in den door_open, so dass in dem door_close Schritt weiß ich, dass Tür 1 ist das eines geschlossen ist, und es ist nicht eine andere Tür?

Antwort

2

Wenn Sie Flink 1.3.0 und höher sein wirklich straightforard was Sie

Ihr Muster in etwa so aussehen würde tun wollen:

Pattern.<DoorEvent>begin("first") 
     .where(new SimpleCondition<DoorEvent>() { 
      private static final long serialVersionUID = 1390448281048961616L; 

      @Override 
      public boolean filter(DoorEvent event) throws Exception { 
      return event.getDoor().getStatus().equals("open"); 
      } 
     }) 
     .followedBy("second") 
     .where(new IterativeCondition<DoorEvent>() { 
      private static final long serialVersionUID = -9216505110246259082L; 

      @Override 
      public boolean filter(DoorEvent secondEvent, Context<DoorEvent> ctx) throws Exception { 

      if (!secondEvent.getDoor().getStatus().equals("close")) { 
       return false; 
      } 

      for (DoorEvent firstEvent : ctx.getEventsForPattern("first")) { 
       if (secondEvent.getDoor().getEventID().equals(firstEvent.getDoor().getEventId())) { 
       return true; 
       } 
      } 
      return false; 
      } 
     }) 
     .within(Time.minutes(5)); 

Also im Grunde kann man IterativeConditions verwenden und den Kontext zu erhalten für die ersten Muster, die übereinstimmen und über diese Liste iterieren, während Sie für das eine vergleichen, das Sie brauchen, und fortfahren, wie Sie wollen.

IterativeConditions sind teuer und sollten dementsprechend

Weitere Informationen über die Bedingungen prüfen, hier bei Flink - Conditions

behandelt werden