In einem einfachen Test erwarte ich, dass der Flow Zahlen für eine Sekunde generiert und druckt. Ich möchte Strömungsvorgänge mit Gegendruck testen und benötige eine Source
, die den Gegendruck nicht berücksichtigt.Warum wird der Stream sofort gestoppt und wie kann ich ihn verhindern?
... with FreeSpec ... {
implicit val system = ActorSystem(this.getClass.getSimpleName)
private val matSettings: ActorMaterializerSettings =
ActorMaterializerSettings(system).withDebugLogging(true).withFuzzing(true)
implicit val mat = ActorMaterializer(matSettings.withInputBuffer(1, 1))
"must print numbers for a second" in {
val source: Source[Double, ActorRef] =
Source.actorRef(100, OverflowStrategy.fail).map(_ => Random.nextDouble())
val sink: Sink[Double, Future[Done]] = Sink.foreach(println)
val actorRef: ActorRef = Flow[Double].to(sink).runWith(source)
system.scheduler.schedule(0.micro, 1.milli, actorRef, "tick")(system.dispatcher)
Thread.sleep(1000)
println("done")
}
Allerdings scheint der Schauspieler sofort zu stoppen, nachdem der Fluss materialisiert wurde, nicht eine einzelne Nachricht wird geliefert und nur zwei gesendet werden. Wo verkenne ich, was hier passiert und wie bekomme ich das erwartete Ergebnis? Das Protokoll:
08:24:07.077 DEBUG a.a.LocalActorRefProvider$SystemGuardian - now supervising Actor[akka://BuffersAndTicksSpec/system/UnhandledMessageForwarder#1836419858]
08:24:07.078 DEBUG akka.event.EventStream - subscribing Actor[akka://BuffersAndTicksSpec/system/UnhandledMessageForwarder#1836419858] to channel class akka.actor.UnhandledMessage
08:24:07.079 DEBUG akka.event.EventStream - Default Loggers started
08:24:07.079 DEBUG a.e.LoggingBus$$anonfun$startDefaultLoggers$2$$anon$3 - started ([email protected]f15e8)
08:24:07.079 DEBUG akka.event.EventStream - unsubscribing StandardOutLogger from all channels
08:24:07.080 DEBUG a.a.LocalActorRefProvider$SystemGuardian - now supervising Actor[akka://BuffersAndTicksSpec/system/deadLetterListener#-876496689]
08:24:07.080 DEBUG akka.event.EventStream - subscribing Actor[akka://BuffersAndTicksSpec/system/deadLetterListener#-876496689] to channel class akka.actor.DeadLetter
08:24:07.080 DEBUG akka.event.DeadLetterListener - started ([email protected])
08:24:07.081 DEBUG a.a.LocalActorRefProvider$SystemGuardian - now supervising Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434]
08:24:07.081 DEBUG akka.event.EventStreamUnsubscriber - registering unsubscriber with [email protected]
08:24:07.081 DEBUG akka.event.EventStream - initialized unsubscriber to: Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434], registering 3 initial subscribers with it
08:24:07.082 DEBUG akka.event.EventStreamUnsubscriber - started ([email protected])
08:24:07.082 DEBUG akka.event.EventStreamUnsubscriber - watching Actor[akka://BuffersAndTicksSpec/system/log1-Slf4jLogger#28712451] in order to unsubscribe from EventStream when it terminates
08:24:07.082 DEBUG akka.event.EventStreamUnsubscriber - watching Actor[akka://BuffersAndTicksSpec/system/UnhandledMessageForwarder#1836419858] in order to unsubscribe from EventStream when it terminates
08:24:07.082 DEBUG akka.event.slf4j.Slf4jLogger - now watched by Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434]
08:24:07.083 DEBUG a.e.LoggingBus$$anonfun$startDefaultLoggers$2$$anon$3 - now watched by Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434]
08:24:07.083 DEBUG akka.event.EventStreamUnsubscriber - watching Actor[akka://BuffersAndTicksSpec/system/deadLetterListener#-876496689] in order to unsubscribe from EventStream when it terminates
08:24:07.083 DEBUG akka.event.DeadLetterListener - now watched by Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434]
08:24:07.164 DEBUG a.a.LocalActorRefProvider$Guardian - now supervising Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0#-544371192]
08:24:07.164 DEBUG akka.stream.impl.StreamSupervisor - started ([email protected])
08:24:07.173 WARN a.stream.impl.ActorMaterializerImpl - Fuzzing mode is enabled on this system. If you see this warning on your production system then set akka.stream.materializer.debug.fuzzing-mode to off.
08:24:07.270 DEBUG akka.stream.impl.StreamSupervisor - now supervising Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-0-unknown-operation#478879902]
08:24:07.276 DEBUG a.s.i.fusing.ActorGraphInterpreter - started ([email protected])
08:24:07.276 DEBUG akka.stream.impl.StreamSupervisor - now supervising Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-1-actorRefSource#1199332529]
08:24:07.283 DEBUG akka.stream.impl.ActorRefSourceActor - started ([email protected])
08:24:07.289 INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://BuffersAndTicksSpec/deadLetters] to Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-1-actorRefSource#1199332529] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
08:24:07.290 DEBUG akka.stream.impl.ActorRefSourceActor - stopped
08:24:07.291 DEBUG a.s.i.fusing.ActorGraphInterpreter - stopped
08:24:07.297 INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://BuffersAndTicksSpec/deadLetters] to Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-1-actorRefSource#1199332529] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
done
Die Thread.sleep blockiert Fäden zu geben und etwas in der Aktorik blockieren könnten. Es wäre besser, wenn Sie die 'Zukunft' herausbringen und darauf' warten' könnten. – Stephen