2017-05-31 4 views
1

Ich verfolge this answer einen SQS Verbraucher mit Akka Streams zu erstellen:Akka Streams: in Quelle letztes Element fehlt

def queryForMessages = { 
    val messages = Sqs.receive(queueUrl, 3, 10) 
    println(s"Received from sqs: ${messages.map(_.getBody)}") 
    messages 
    } 

    def messageListStream : immutable.Stream[Iterable[SqsMessage]] = { 
    queryForMessages #:: messageListStream 
    } 

    def messageIterator() : Iterator[SqsMessage] = messageListStream.flatten.toIterator 

    Source.fromIterator(messageIterator) 
    .map(_.getBody) 
    .runForeach(m => println(s"Stream output: $m"))(materializer) 

Alles, abgesehen von der Tatsache zu funktionieren scheint, dass das letzte Element, das von empfangen wurde Die Warteschlange wird vom Stream nicht aufgenommen. Dh wenn ich vier Elemente auf sqs poste, werden nur 3 davon vom Stream ausgedruckt (Element "2" fehlt). Der Ausgang ich erhalte, ist:

Received from sqs: List(1) 
Received from sqs: List(3, 4, 2) 
Stream output: 1 
Stream output: 3 
Stream output: 4 
Received from sqs: List() 
Received from sqs: List() 

Das fehlende Element (2) scheint tatsächlich, obwohl, wenn ich noch ein paar Elemente schreiben:

Received from sqs: List(5) 
Stream output: 2 
Received from sqs: List(6) 
Stream output: 5 

Irgendwelche Ideen?

+0

Ich gab die Antwort, auf die Sie in Ihrer Frage verwiesen haben. Ich vermute, dass das Problem von der faulen Natur der Bewertung eines Stream Tails herrührt. Versuchen Sie dies stattdessen: 'def messageIterator() = Iterator.kontinuierlich (messageListStream) .flatMap (Identität)' –

+0

'Iterator.continually (queryForMessages) .flatMap (identity)' –

Antwort

0

Wie ich im Kommentarabschnitt geschrieben habe, glaube ich, dass die Quelle des Problems die Tatsache ist, dass Streams, und daher Lazy Tail Evaluation, als Vermittler verwendet werden.

Die Stream-Komponente ist nicht erforderlich, da ein allein Iterator das Problem lösen kann:

val messageListIterator :() => Iterator[Iterable[SqsMessage]] = 
() => Iterator continually queryForMessages 

val messageIterator :() => Iterator[SqsMessage] = 
() => messageListIterator() flatMap identity 

Dies kann nun in Ihrem akka Strom Source verwendet werden:

Source 
    .fromIterator(messageIterator) 
    .map(_.getBody) 
    .runForeach(m => println(s"Stream output: $m"))(materializer) 

I the linked question entsprechend aktualisiert haben.

Verwandte Themen