2016-06-28 11 views
0

Mein Akka Stream stoppt nach einem einzelnen Element. Hier ist mein stream:Akka Stream stoppt nach einem Element

val firehoseSource = Source.actorPublisher[FirehoseActor.RawTweet](
    FirehoseActor.props(
    auth = ... 
) 
) 

val ref = Flow[FirehoseActor.RawTweet] 
    .map(r => ResponseParser.parseTweet(r.payload)) 
    .map { t => println("Received: " + t); t } 
    .to(Sink.onComplete({ 
    case Success(_) => logger.info("Stream completed") 
    case Failure(x) => logger.error(s"Stream failed: ${x.getMessage}") 
    })) 
    .runWith(firehoseSource) 

FirehoseActor eine Verbindung zu den Twitter Firehose und Puffer-Nachrichten an eine Warteschlange. Wenn der Akteur eine Nachricht empfängt Request es take S die nächste Element und gibt es:

def receive = { 
    case Request(_) => 
    logger.info("Received request for next firehose element") 
    onNext(RawTweet(queue.take())) 
} 

Das Problem, dass nur ein einzelner tweet wird an die Konsole gedruckt wird. Das Programm beendet nicht oder wirft keine Fehler, und ich habe Logging-Anweisungen herumgespritzt, und keines wird gedruckt.

Ich dachte, die Senke würde weiterhin Druck ausüben, um Elemente durchzuziehen, aber das scheint nicht der Fall zu sein, da keine der Nachrichten in Sink.onComplete gedruckt werden. Ich habe auch versucht, Sink.ignore zu verwenden, aber das nur ein einzelnes Element auch gedruckt. Die Protokollnachricht im Akteur wird nur einmal gedruckt.

Welche Senke muss ich verwenden, um Elemente unbegrenzt durch den Fluss zu ziehen?

Antwort

0

Ah Ich hätte totalDemand in meinem Schauspieler respektiert werden sollen. Dies behebt das Problem:

def receive = { 
    case Request(_) => 
    logger.info("Received request for next firehose element") 
    while (totalDemand > 0) { 
     onNext(RawTweet(queue.take())) 
    } 

Ich erwarte ein Request für jedes Element in dem Strom zu empfangen, aber anscheinend jeder Fluss ein Request senden.

Verwandte Themen