2017-12-16 10 views
0

Wie kann ich einen Rückruf durchführen, nachdem jede Nachricht erfolgreich an JMS gesendet wurde oder fehlschlägt?Alpakka - Rückruf, wenn Nachricht verarbeitet wird

val jmsSink = JmsSink.textSink(
    JmsSinkSettings(connectionFactory).withQueue("My_Queue") 
) 
Source(Stream.from(1)) 
    .map(_.toString) 
    .runWith(jmsSink) 

spezifischeres Beispiel

// creating a sourceQueue which is bound to jmsSink 
val sourceQueue: SourceQueueWithComplete[String] = 
    Source.queue[String](bufferSize, OverflowStrategy.backpressure) 
     .to(jmsSink) 
     .run() 

Der Client sendet Artikel zu sourceQueue:

val result: Future[QueueOfferResult] = sourceQueue offer "my-item" 

val result ist das Ergebnis das Element in sourceQueue Einfügen, bedeutet es nicht, dass es noch an JMS gesendet wird. Ich muss ein Ereignis auslösen, wenn das Element den Senkenprozess durchlaufen hat und in die JMS-Warteschlange eingefügt wurde.

Antwort

1

Ein Ansatz einen Rückruf aufgerufen werden (falls von „Rückruf“ eine Funktion bedeuten, dass Unit zurückgibt) für jede erfolgreiche Nachricht ist ein entsprechendes Source zu erstellen, die auf die gleiche JMS-Warteschlange abonniert, und verwenden Sie runForeach:

val jmsSink = JmsSink.textSink(
    JmsSinkSettings(connectionFactory).withQueue("My_Queue") 
) 

Source(...) 
    .map(_.toString) 
    .runWith(jmsSink) 

val jmsSource = JmsSource(
    JmsSourceSettings(connectionFactory).withQueue("My_Queue") 
) 

jmsSource.runForeach(println) 

Das obige Beispiel druckt jede Nachricht, die über die Senke in die Warteschlange veröffentlicht wird.

Bei Fehlern wird derzeit der Stream heruntergefahren, wenn eine Ausnahme ausgelöst wird. Wenn zum Beispiel im Fall einer geworfene Ausnahme mögen Sie die Ausnahme drucken und den Strom wieder aufnehmen, anstatt es zu enden, könnten Sie ein supervision strategy zu Ihrem ursprünglichen Source befestigen:

val decider: Supervision.Decider = { 
    case e: Exception => 
    println(s"Exception thrown: ${e.getMessage}") 
    Supervision.Resume 
} 

val flow = Flow[String] 
    .withAttributes(ActorAttributes.supervisionStrategy(decider)) 

Source(...) 
    .map(_.toString) 
    .via(flow) 
    .runWith(jmsSink) 

val jmsSource = ... 
+0

Dank für die Beantwortung @chunjef. 'jmsSource.runForeach (println)' würde das Objekt aus der Warteschlange entfernen, oder? Ich fügte meiner Frage einige Details hinzu, um zu klären, was genau ich brauche. – Feyyaz

Verwandte Themen