2017-09-10 2 views
0

enter image description here Ich versuche zu sqs mit akka Streams zu hören und ich bekomme Nachrichten von ihm q mit diesem Code-Schnipsel ist:Kann dieser Code in Stateful Akka Streams übersetzt werden?

natürlich diese Code-Schnipsel Nachrichten one-by-one bekommen (es dann ack):

meine Frage ist: kann ich mehrere Nachrichten erhalten, und speichern Sie sie zum Beispiel in einer Stateful Karte für letztere Verwendung?

zum Beispiel, dass 5-Nachrichten nach dem Empfang (alle von ihnen (pro Zustand gespeichert werden))

dann, wenn bestimmte Bedingung geschieht i sie alle, und wenn nicht ack wird sie in der Warteschlange zurückkehrt (passiert, trotzdem, weil Sichtbarkeits-Timeout)?

danke.

Antwort

0

Konnte sein, dass Sie nach grouped (oder groupedWithin) Kombinator suchen. Mit ihnen können Sie Nachrichten bündeln und in Gruppen verarbeiten. groupedWithin ermöglicht es Ihnen, eine Charge nach einer bestimmten Zeit freizugeben, falls sie Ihre festgelegte Größe noch nicht erreicht hat. Dokumentennummer here.

In einem nachfolgenden Prüfablauf können Sie die von Ihnen benötigte Logik ausführen und die Sequenz ausgeben, falls die Nachrichten bearbeitet werden sollen oder nicht.

Beispiel:

val yourCheck: Flow[Seq[MessageActionPair], Seq[MessageActionPair], NotUsed] = ??? 

    val future = SqsSource(sqsEndpoint)(awsSqsClient) 
    .takeWhile(_ => true) 
    .mapAsync(parallelism = 2){ ... } 
    .grouped(5) 
    .via(yourCheck) 
    .mapConcat(identity) 
    .to(SqsAckSink(sqsEndpoint)(awsSqsClient)) 
    .run() 
+0

i erhalten Fehler: (115, 18) Typübereinstimmung; gefunden: Seq [(com.amazonaws.services.sqs.model.Message, akka.stream.alpakka.sqs.MessageAction)] => Seq [(com.amazonaws.services.sqs.model.Message, akka.stream. alpakka.sqs.MessageAction)] erforderlich: Seq [(com.amazonaws.services.sqs.model.Message, akka.stream.alpakka.sqs.MessageAction)] => scala.collection.immutable.Iterable [?] . mapConcat (identity) – kevinjoeiy

+0

Stellen Sie sicher, dass Sie 'immutable.Seq' importieren. typesystem wird mit dem nicht-unveränderlichen 'Seq' verwechselt –

+0

Ich habe val yourCheck gesetzt: Flow [scala.collection.immutable.Seq [MessageActionPair], scala.collection.immutable.Seq [MessageActionPair], NotUsed] = ??? aber ich bekomme: Fehler: (115, 12) Vorwärtsreferenz erstreckt sich über die Definition von Wert yourCheck .via (yourCheck) – kevinjoeiy

Verwandte Themen