2017-05-28 2 views
1

In einem einfachen Test erwarte ich, dass der Flow Zahlen für eine Sekunde generiert und druckt. Ich möchte Strömungsvorgänge mit Gegendruck testen und benötige eine Source, die den Gegendruck nicht berücksichtigt.Warum wird der Stream sofort gestoppt und wie kann ich ihn verhindern?

... with FreeSpec ... { 

    implicit val system = ActorSystem(this.getClass.getSimpleName) 
    private val matSettings: ActorMaterializerSettings = 
ActorMaterializerSettings(system).withDebugLogging(true).withFuzzing(true) 
    implicit val mat = ActorMaterializer(matSettings.withInputBuffer(1, 1)) 

"must print numbers for a second" in { 

    val source: Source[Double, ActorRef] = 
    Source.actorRef(100, OverflowStrategy.fail).map(_ => Random.nextDouble()) 

    val sink: Sink[Double, Future[Done]] = Sink.foreach(println) 

    val actorRef: ActorRef = Flow[Double].to(sink).runWith(source) 

    system.scheduler.schedule(0.micro, 1.milli, actorRef, "tick")(system.dispatcher) 

    Thread.sleep(1000) 
    println("done") 
} 

Allerdings scheint der Schauspieler sofort zu stoppen, nachdem der Fluss materialisiert wurde, nicht eine einzelne Nachricht wird geliefert und nur zwei gesendet werden. Wo verkenne ich, was hier passiert und wie bekomme ich das erwartete Ergebnis? Das Protokoll:

08:24:07.077 DEBUG a.a.LocalActorRefProvider$SystemGuardian - now supervising Actor[akka://BuffersAndTicksSpec/system/UnhandledMessageForwarder#1836419858] 
08:24:07.078 DEBUG akka.event.EventStream - subscribing Actor[akka://BuffersAndTicksSpec/system/UnhandledMessageForwarder#1836419858] to channel class akka.actor.UnhandledMessage 
08:24:07.079 DEBUG akka.event.EventStream - Default Loggers started 
08:24:07.079 DEBUG a.e.LoggingBus$$anonfun$startDefaultLoggers$2$$anon$3 - started ([email protected]f15e8) 
08:24:07.079 DEBUG akka.event.EventStream - unsubscribing StandardOutLogger from all channels 
08:24:07.080 DEBUG a.a.LocalActorRefProvider$SystemGuardian - now supervising Actor[akka://BuffersAndTicksSpec/system/deadLetterListener#-876496689] 
08:24:07.080 DEBUG akka.event.EventStream - subscribing Actor[akka://BuffersAndTicksSpec/system/deadLetterListener#-876496689] to channel class akka.actor.DeadLetter 
08:24:07.080 DEBUG akka.event.DeadLetterListener - started ([email protected]) 
08:24:07.081 DEBUG a.a.LocalActorRefProvider$SystemGuardian - now supervising Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434] 
08:24:07.081 DEBUG akka.event.EventStreamUnsubscriber - registering unsubscriber with [email protected] 
08:24:07.081 DEBUG akka.event.EventStream - initialized unsubscriber to: Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434], registering 3 initial subscribers with it 
08:24:07.082 DEBUG akka.event.EventStreamUnsubscriber - started ([email protected]) 
08:24:07.082 DEBUG akka.event.EventStreamUnsubscriber - watching Actor[akka://BuffersAndTicksSpec/system/log1-Slf4jLogger#28712451] in order to unsubscribe from EventStream when it terminates 
08:24:07.082 DEBUG akka.event.EventStreamUnsubscriber - watching Actor[akka://BuffersAndTicksSpec/system/UnhandledMessageForwarder#1836419858] in order to unsubscribe from EventStream when it terminates 
08:24:07.082 DEBUG akka.event.slf4j.Slf4jLogger - now watched by Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434] 
08:24:07.083 DEBUG a.e.LoggingBus$$anonfun$startDefaultLoggers$2$$anon$3 - now watched by Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434] 
08:24:07.083 DEBUG akka.event.EventStreamUnsubscriber - watching Actor[akka://BuffersAndTicksSpec/system/deadLetterListener#-876496689] in order to unsubscribe from EventStream when it terminates 
08:24:07.083 DEBUG akka.event.DeadLetterListener - now watched by Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434] 
08:24:07.164 DEBUG a.a.LocalActorRefProvider$Guardian - now supervising Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0#-544371192] 
08:24:07.164 DEBUG akka.stream.impl.StreamSupervisor - started ([email protected]) 
08:24:07.173 WARN a.stream.impl.ActorMaterializerImpl - Fuzzing mode is enabled on this system. If you see this warning on your production system then set akka.stream.materializer.debug.fuzzing-mode to off. 
08:24:07.270 DEBUG akka.stream.impl.StreamSupervisor - now supervising Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-0-unknown-operation#478879902] 
08:24:07.276 DEBUG a.s.i.fusing.ActorGraphInterpreter - started ([email protected]) 
08:24:07.276 DEBUG akka.stream.impl.StreamSupervisor - now supervising Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-1-actorRefSource#1199332529] 
08:24:07.283 DEBUG akka.stream.impl.ActorRefSourceActor - started ([email protected]) 
08:24:07.289 INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://BuffersAndTicksSpec/deadLetters] to Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-1-actorRefSource#1199332529] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
08:24:07.290 DEBUG akka.stream.impl.ActorRefSourceActor - stopped 
08:24:07.291 DEBUG a.s.i.fusing.ActorGraphInterpreter - stopped 
08:24:07.297 INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://BuffersAndTicksSpec/deadLetters] to Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-1-actorRefSource#1199332529] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
done 
+0

Die Thread.sleep blockiert Fäden zu geben und etwas in der Aktorik blockieren könnten. Es wäre besser, wenn Sie die 'Zukunft' herausbringen und darauf' warten' könnten. – Stephen

Antwort

5

Das Problem hier ist mit der Art Ihrer Source.actorRef. Auch wenn ein Schauspieler Any Art von Nachricht erhalten kann, wenn Sie es in eine Source umhüllen, müssen Sie ihm einen Typ geben (um Akka Streams stark zu tippen).

Beispiel:

val source: Source[Int, ActorRef] = Source.actorRef[Int](100, OverflowStrategy.fail) 

Was unter der Haube passiert ist, dass Ihr Source wird versuchen, jede eingehende Nachricht an Int zu werfen.

In Ihrem Fall ist die Source.actorRef nicht explizit getippt, daher wird Nothing vom Compiler abgeleitet. (Dies wird durch die Tatsache maskiert, dass Sie die map Bühne verketten, wo alles zu einem Double wird). Alle eingehenden Tick-Nachrichten werden in Nothing umgewandelt, was zu ClassCastException führt.

Die Lösung ist Ihr Source.actorRef Bühne

val source: Source[Double, ActorRef] = 
     Source.actorRef[String](100, OverflowStrategy.fail).map(_ => Random.nextDouble()) 
+0

Hey, das war ein kniffliger, funktioniert jetzt. Vielen Dank, Stefano! – kostja

Verwandte Themen