2016-07-10 5 views
3

Aus irgendeinem Grund warten meine Akka-Streams immer auf eine zweite Nachricht, bevor sie den ersten "ausstrahlen" (?).Akka Reactive Streams immer eine Nachricht hinter

Hier ist ein Beispiel Code, der mein Problem zeigt.

val rx = Source((1 to 100).toStream.map { t => 
    Thread.sleep(1000) 
    println(s"doing $t") 
    t 
}) 
rx.runForeach(println) 

ergibt Ausgang:

doing 1 
doing 2 
1 
doing 3 
2 
doing 4 
3 
doing 5 
4 
doing 6 
5 
... 

Was ich will:

doing 1 
1 
doing 2 
2 
doing 3 
3 
doing 4 
4 
doing 5 
5 
doing 6 
6 
... 
+1

Akka-Stream hat immer einen Puffer von einem Element vor jeder Berechnungsstufe. Wahrscheinlich siehst du das. – eiennohito

Antwort

5

Die Art und Weise, wie Ihr Code jetzt eingerichtet ist, transformieren Sie vollständig die Source, bevor Sie Elemente stromabwärts emittieren dürfen. Sie können dieses Verhalten (wie @slouc angegeben) deutlich erkennen, indem Sie die toStream für den Zahlenbereich entfernen, der die Quelle darstellt. Wenn Sie das tun, werden Sie sehen, dass die Source zuerst vollständig transformiert wird, bevor sie auf Downstream-Nachfrage reagiert. Wenn Sie tatsächlich eine Source in ein Sink ausgeführt werden soll und einen Transformationsschritt in der Mitte haben, dann können Sie versuchen und Struktur Dinge wie diese:

val transform = 
    Flow[Int].map{ t => 
    Thread.sleep(1000) 
    println(s"doing $t") 
    t 
    } 

Source((1 to 100).toStream). 
    via(transform). 
    to(Sink.foreach(println)). 
    run 

Wenn Sie diese Änderung vornehmen, dann werden Sie den gewünschten Effekt erzielen Dies bedeutet, dass ein Element, das stromabwärts fließt, den ganzen Weg durch den Fluss verarbeitet wird, bevor das nächste Element beginnt, verarbeitet zu werden.

+0

Ich dachte, ich wäre in der Lage, mein Problem zu vereinfachen und es gelöst zu haben, aber ich denke, ich habe es zu stark vereinfacht. Deine Antwort behebt das Problem, das ich gepostet habe, aber ich muss vielleicht eine neue Frage mit weiteren Details stellen. – SGHAF

1

Sie .toStream() verwenden, was bedeutet, dass die ganze Sammlung faul ist. Ohne es wäre Ihre Ausgabe zuerst hundert "tuend" s gefolgt von Zahlen von 1 bis 100. Jedoch wertet Stream nur das erste Element aus, das den Ausgang "doing 1" gibt, wo es stoppt. Das nächste Element wird bei Bedarf ausgewertet.

Jetzt konnte ich keine Details dazu in den Dokumenten finden, aber ich nehme an, dass runForeach eine Implementierung hat, die das nächste Element vor dem Aufruf der Funktion auf dem aktuellen Element übernimmt. Bevor also println auf Element n aufgerufen wird, untersucht es zuerst Element n + 1 (z. B. überprüft, ob es existiert), was zu einer "tuing n + 1" -Nachricht führt. Dann führt es Ihre println Funktion auf dem aktuellen Element aus, was zu der Nachricht "n" führt.

Müssen Sie wirklich map() vor Ihnen runForeach? Ich meine, brauchen Sie zwei Reisen durch die Daten? Ich weiß, ich bin wahrscheinlich das Offensichtliche, aber wenn Sie nur Ihre Daten in einer so gehen verarbeiten:

val rx = Source((1 to 100).toStream) 
rx.runForeach({ t => 
    Thread.sleep(1000) 
    println(s"doing $t") 
    // do something with 't', which is now equal to what "doing" says 
}) 

dann haben Sie kein Problem, was bei der Auswertung ist.

+0

Ich habe das Problem tatsächlich vereinfacht, als ich es gepostet habe. Der Stream kommt von einem Java BufferedReader (den ich von einem Socket bekomme). Quelle (Stream.continually (myBufferedReader.readLine) .map (t => (System.nanoTime, t))) ' Ich tat dies ursprünglich, um zu versuchen," Zeitstempel ", wenn meine Nachrichten tatsächlich angekommen sind meine Maschine und noch einmal, als sie endlich bearbeitet wurden. Danke für die Antwort, obwohl. Das ist mir jetzt mehr klar! – SGHAF

Verwandte Themen