2017-08-29 2 views
0

Ich versuche, meine ask Anfragen an einen ConsumerActor zu drosseln.Throttle Akka Fragen

val throttler: ActorRef = 
Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew) 
    .throttle(10, 1.second, 1, ThrottleMode.Shaping) 
    .to(Sink.foreach[Any](msg => consumerActor ! msg)) 
    .run() 

mit

aLotOfItems.map(items => 
    val itemsFuture = (throttler ? consumeItems(items)).mapTo[Future[String]] 
    itemsFuture flatMap {x => x} 
}).toVector 

msgs zum consumerActor Dieser schickt aber ich glaube, die Antwort zu verlieren, wie ich mit 2 Artikel versucht, aber die Anfrage gerade hängt.

Ich glaube, ich brauche die tell im Sink.foreach zu einer fragen oder etwas zu ändern, dass eine Antwort

Lösung umgehen kann: Verstanden unter die gewählte Antwort mit zu arbeiten. Ich hatte

val answer = Source(...) (from the selected answer below) 
sender ! answer 

Antwort

1

Das Problem ist hinzuzufügen, dass Sie Antworten von throttler erwar, aber throttler nicht Antworten zu senden und ist nicht in der Lage, dies zu tun, weil es keinen Verweis auf den ursprünglichen Absender hat.

Wenn consumerActor Antworten an den Absender jeder consumeItems(i) Nachricht mit einem Future[String], dann einen Weg zu erreichen, was Sie zu tun versuchen ist, eine Source von aLotOfItems zu erstellen und eine Kombination aus mapAsync und ask verwenden, um Nachrichten an die drosseln Darsteller. Die Antworten des Aktors können in einer gesammelt werden. Etwas wie:

val sink = Sink.seq[String] 

val result = 
    Source(aLotOfItems) 
    .map(consumeItems(_)) 
    .mapAsync(parallelism = 5)(item => (consumerActor ? item).mapTo[Future[String]]) 
    .mapAsync(parallelism = 5)(identity) 
    .runWith(sink) 

// Future[Seq[String]] 
+0

wo würden Sie die 'Quelle' deklarieren? – Jeff

+0

Auch habe ich 'Source (aLotOfItems)' versucht, aber ich habe einen Typkonflikt von 'Erwartet: Iterable [NotInferedT] Actual: Vector [Item]'. – Jeff

+0

Was ist der Typ von 'aLotOfItems'? – chunjef