2016-06-21 14 views
1

Ich schreibe eine Anwendung, um Ereignisse zu behandeln, die von Geräten ausgelöst werden (1 Million pro Stunde). Einige Ereignisse werden aggregiert (und haben eine lange Zeitspanne (z. B. 48 Stunden)), die ein Begin-Ereignis, einen Status (x-mal) -Ereignisse und ein Endereignis enthalten. Andere sind einzelne Ereignisse, die sofort verarbeitet werden können. Um die Ereignisse mindestens einmal zu garantieren, werde ich auf Akka-Persistenz achten. Andere Teile der Anwendung verwenden bereits akka und kafka.Akka persistente Nachricht (en) löschen

Die Lösung, die ich anstrebte, sollte eine persistente Karte enthalten, in der die Ereignisse von ihrer eventId leicht ausgewählt werden können. Die Reihenfolge ist weniger wichtig. Nach Abschluss der Verarbeitung eines Ereignisses kann es aus der Karte entfernt werden (und sollte nicht mehr beibehalten werden).

In den docs/examples found fand ich Queue-Beispiele, die die Purge-Anforderung pro Ereignis erfüllen, aber kämpfen mit der einfachen Suche (Warteschlange muss geloopt werden, um das Ereignis zu finden). Und um die einfache Suche zu erfüllen, dachte ich an eine Karte, die das PersistentActor-Merkmal und einige db darunter verwendet. Ereignisse werden jedoch durch Sequenznummer bereinigt (was Ereignisse entfernen würde, die mehr Verarbeitung benötigen/auf andere Ereignisse warten). Ein anderes Merkmal, das untersucht wird, ist die AtLeastOnceDelivery, mit der Lieferbestätigung, die die Anforderungen erfüllt, aber diese blockiert die Wiederherstellung, bis alle Ereignisse verarbeitet sind.

Irgendwelche Gedanken darüber, wie man in Akka einen persistenten Korb für Ereignisse implementiert? (Ich benutze scala btw)

Antwort

1

Würde so etwas zu Ihren Bedürfnissen passen? Es ist wahrscheinlich nicht genau Ihre Logik, aber im Grunde erhält es ein neues Ereignis, beharrt darauf, dass es das Ereignis erhalten hat und speichert das Ereignis dann mit der ID in der Karte. Dann an einem Punkt (nicht sicher, wie Sie die Ereignisverarbeitung auslösen) erhält es den Befehl, ein Ereignis mit einer bestimmten ID zu behandeln. Es besteht weiterhin die Tatsache, dass es das Ereignis behandeln sollte, behandelt dann das Ereignis und entfernt es von der Karte. Auf diese Weise wird die Karte beim Neustart wiederhergestellt und Sie haben alle Ereignisse, die noch nicht verarbeitet wurden, über die ID zugänglich.

class PersistentMapActor extends PersistentActor { 

    private var eventMap: Map[ Int, Event ] = Map[ Int, Event ]() 

    override def receiveRecover: Receive = { 
      case NewEventSaved(payload: Event) => 
        eventMap = eventMap + ((payload.eventId, payload)) 
      case EventHandled(eventId) => 
        eventMap = eventMap - eventId 
    } 

    override def receiveCommand: Receive = { 
      case SaveNewEvent(payload) => 
        persist(NewEventSaved(payload)) { persistedNewEvent => 
          //Add new event to map 
          eventMap = eventMap + ((payload.eventId, payload)) 
        } 
      case HandleEvent(eventId) => 
        val event = eventMap.get(eventId) 

        event.foreach { e => 
          persist(EventHandled(eventId)) { persistedEvent => 
            //Do stuff with the event 
            //Remove the event from the map 
            eventMap = eventMap - eventId 
          } 
        } 
    } 

    override def persistenceId: String = "PersistentMapActor" 
} 

object PersistentMapActor { 

    case class Event(eventId: Int, someField: String) 

    case class SaveNewEvent(payload: Event) 

    case class NewEventSaved(payload: Event) 

    case class HandleEvent(eventId: Int) 

    case class EventHandled(eventId: Int) 

} 
+0

Danke für die Beantwortung. Ich habe Ihre Lösung implementiert und einen Hintergrundjob für die Persistency-Schicht hinzugefügt. Der geplante Hintergrundjob prüft auf behandelte Ereignisse und entfernt sie aus der Datenbank. – motormuis

+1

Ich bin froh, dass ich helfen konnte. Darf ich fragen, warum Sie die behandelten Ereignisse löschen? – thwiegan

+0

Da das Speichern der Daten keine Vorteile hat, sind nur Nachteile in Bezug auf die Kosten verbunden. Jedes Ereignis ist ungefähr 20kb, 1 Million Ereignisse pro Stunde. Wir haben nicht so viel Speicherplatz, um es jahrelang laufen zu lassen (und mit Markenprodukten der Hochverfügbarkeit ist es immer noch teuer (Geschäftsanforderungen)). Die Ereignisse sind nutzlos für sich selbst, das heißt, wo die Ereignisverarbeitung stattfindet, Aggregation/Gruppierung von Ereignissen und nur berechnete Ereignisse werden in einer Datenbank gespeichert. – motormuis