2016-08-31 2 views
2

Ich möchte eine Sequenz von Elementen erstellen, indem ich actorRefSource von akka Streams verwende. Diese Quelle wird kontinuierlich mit Daten gespeist. Nach Abschluss der Berechnung wird der Stream mit einer Giftpille beendet.Akka Streams ActorRefSource Reihenfolge der Nachrichten

Das folgende vereinfachte Beispiel zeigt meine Absicht:

val source = Source.actorRef[Int](1000, OverflowStrategy.fail) 
    .mapMaterializedValue{ ref => 
     for(i <- 1 to 1000) { 
     ref ! i 
     } 

     ref ! PoisonPill 
    } 

    source.runWith(Sink.seq).foreach(s => println("count: "+s.size)) 

ich den Bach erwarte alle 1000 Elemente zu verarbeiten und dann beenden wegen der Giftpille empfangen werden. Unglücklicherweise endet der Stream normalerweise viel früher. Beispielausgaben sind:

Warten einige Zeit vor dem Senden der Giftpille, z. 1000 ms führt dazu, dass alle Zahlen verarbeitet werden.

Eine Idee, wie sichergestellt werden kann, dass alle Artikel verarbeitet wurden, bevor die Poison Pill eingeht, wäre sehr willkommen.

Antwort

2

Siehe the documentation for Source.actorRef: PoisonPill löscht den Puffer vor dem Beenden des Streams nicht.

+0

In Ordnung, um sicherzustellen, dass alle Elemente vor dem Herunterfahren verarbeitet werden, sollte ich stattdessen eine Instanz von akka.actor.Status.Succes übergeben? Danke für die Aufklärung! – Calardan

Verwandte Themen