2017-11-11 4 views
0

Lesen der Dokumentation von akka-Streams, ich bin nicht wirklich klar auf Dinge wie die Reihenfolge der Nachrichten und wenn ich es erzwingen kann. Lassen Sie mich den Kontext meiner Frage mit einem kleinen Code festlegen, den ich für einen Chat-Server geschrieben habe.Reihenfolge der Ereignisse bei der Verwendung von akka Streams

Um die Sache für mich einfach zu machen, benutze ich diese Flow-Form mit einer sehr einfachen Quelle und Senke. Etwas wie das -

val source = Source(List[String]("hi", "hello", "what are you upto", "this is nice")) 
val sink = Sink.foreach[ChatMessage] { 
    case tm: UserTextMessage => println(s"${tm.user.username} :: ${tm.content}") 
    case ul: UserLeft => println(s"${ul.user.username} just left the channel") 
    case uj: UserJoined => println(s"${uj.user.username} just joined the channel") 
    case _ => println(s"do not know what I just received") 
} 

val mychatchannel = new Channel(420, myactorsystem) 

source.via(mychatchannel.chatFlow(User("sushruta"))).runWith(sink) 

Nun, hier kommt meine Sorge. Die Reihenfolge der Ereignisse, die im Terminal gedruckt wird, ist überhaupt nicht in Ordnung. Und ich bin mir nicht sicher, wie ich das beheben soll. Hier ist die Ausgabe, die ich bekomme -

[INFO] [11/10/2017 17:42:20.431] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message 
[INFO] [11/10/2017 17:42:20.441] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] received a user joined message 
[INFO] [11/10/2017 17:42:20.443] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message 
[INFO] [11/10/2017 17:42:20.444] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message 

Die erste Nachricht hi wird vom Ausgang fehlt. Die Nachricht hi scheint gesendet worden zu sein, bevor die UserJoin message gedruckt wurde.

Ich versuchte es zu beheben (und auch etwas Sicherheit rund um Messaging hinzufügen) mit actorRefWithAck (die ich im obigen Code auskommentiert.) Es gibt eine ähnliche Ausgabe.

[INFO] [11/11/2017 06:33:03.731] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] channel initialized and ready to take events 
[INFO] [11/11/2017 06:33:03.735] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message 
[INFO] [11/11/2017 06:33:03.736] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] received a user joined message 
[INFO] [11/11/2017 06:33:03.737] [akka-streams-akka.actor.default-dispatcher-4] [akka://akka-streams/user/channel-actor-420] sushruta sent a message 
[INFO] [11/11/2017 06:33:03.737] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message 
[INFO] [11/11/2017 06:33:03.738] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message 
[INFO] [11/11/2017 06:33:03.738] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] received a UserLeft message 

ist klar, was der Fall zu sein scheint, ist, dass Quelle die Nachrichten sendet, bevor UserJoin Nachricht gesendet wird. Wie kann ich das beheben? Konzeptionell denke ich, ich möchte die UserJoin message gesendet werden, sobald die Quelle materialisiert, aber bevor es tatsächlich über die erste Nachricht sendet. Ist das möglich?

dank

Antwort

0

Denken Sie an Ströme als Wasserleitungen: wenn es Wasser gibt, wird es fließen. Dem Zusammenführungsoperator ist es egal, von welchen Seitenelementen kommen. Wenn Sie diese Eingaben bestellen möchten, müssen Sie dies Akka mitteilen, indem Sie stattdessen Concat verwenden.

+0

vielen Dank. Das hat mir geholfen und das Problem endlich gelöst. – shashydhar

Verwandte Themen