2014-01-31 6 views
5

Ich habe Schwierigkeiten, herauszufinden, wie Nachrichten von Amazon SQS verarbeitet werden.Verarbeiten Sie Amazon SQS-Nachrichten asynchron aus der Warteschlange in Java

ich umzusetzen versuche folgendes:

  1. Zuhörer auf SQS
  2. Prozess Nachricht aus der Warteschlange, und fügen Sie
  3. löschen verarbeitete Nachricht aus der Warteschlange DB

Was mich stört ein Menge ist, wie Schritt 2 zu implementieren. Ich habe Klasse SQSConnector und ProfileDao. Im Augenblick möchte ich eine einfache Implementierung, indem ich SQSConnector in ProfileDao initialisiere und Nachrichten von der Warteschlange empfange. Meine Idee ist, einen neuen Thread zu starten, Nachrichten abzurufen und wenn die Warteschlange leer ist, unterbrechen Sie den Thread von ProfileDao.

Was ist der beste Weg zur Rücksendung/Verarbeitung von Nachrichten (Callback-Funktion?), Und wenn es eine andere Möglichkeit gibt, bin ich offen für Optionen.

Danke

Antwort

3

ich etwas ähnliches mit SQS erreicht haben Javas mit ExecutorService, Future und die ConcurrentLinkedQueue.

Der ExecutorService erstellt einen Threadpool, der Klassen ausführen kann, die die Callable-Schnittstelle implementieren und einen Future zurückgibt. Wenn der ExecutorService die Futures erstellt, schiebe ich sie auf eine ConcurrentLinkedQueue, die in einem Thread ausgeführt wird, und verarbeitet die Ergebnisse, sobald die Futures abgeschlossen sind.

Implementieren SQS und Starten der Arbeit asynchron Überprüfung:

import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 

public class SqsProcessor { 

    private static final int THREAD_COUNT = 100; 
    private ExecutorService _executor = null; 
    private FutureResultProcessor futureResultProcessor = null; 

    public SqsProcessor() { 
     _executor = Executors.newFixedThreadPool(THREAD_COUNT); 
     _futureResultProcessor = new FutureResultProcessor(); 
    } 

    public void waitReceive() { 

     // Receive a SQS message 

     // Start the work related to the SQS message 
     Callable<MyWorkderResult> sqsWorker = new MyWorker(sqsMessage); 
     Future<MyWorkerResult> sqsFuture = _executor.submit(sqsWorker); 

     // Send to the queue so the result can be processed when it completes 
     _futureResultProcessor.add(sqsFuture); 
    } 
} 

Klasse, die die Arbeit macht:

import java.util.concurrent.Callable; 

public class MyWorker implements Callable<MyWorkerResult> { 

    private String _sqsMessage = null; 

    public MyWorker(String sqsMessage) { 
     _sqsMessage = sqsMessage; 
    } 

    @Override 
    public MyWorkerResult call() throws Exception { 
     // Do work relating to the SQS message 
    } 
} 

Hält die Ergebnisse der Arbeit:

public class MyWorkerResult { 
    // Results set in MyWorker call() 
} 

ConcurrentLinkedQueue zu erhalten und verarbeiten die zukünftigen Ergebnisse:

import java.util.concurrent.Future; 
import java.util.concurrent.ConcurrentLinkedQueue; 

public class FutureResultProcessor extends Thread { 

    private final ConcurrentLinkedQueue<Future<MyWorkerResult>> resultQueue = new ConcurrentLinkedQueue<Future<MyWorkerResult>>(); 
    private final Integer CHECK_SLEEP = 300; 

    public FutureResultProcessor() { 
    } 

    public void run() { 
     while(true) { 
      Future<MyWorkerResult> myFuture = resultQueue.poll(); 

      if(myFuture == null) { 
       // There's nothing to process 
       try { Thread.sleep(CHECK_SLEEP); } catch (InterruptedException e) {} 
       continue; 
      } 

      // Process result 
      if(myFuture != null) { 

       MyFutureResult myFutureResult = myFuture.get(); 

       // Process result 
      } 
     } 
    } 

    public void add(Future<MyWorkerResult> sqsFuture) { 
     resultQueue.offer(sqsFuture); 
    } 
} 

Alternativ könnten Sie eine Gruppe von Futures sammeln und darauf warten, dass alle fertig sind, bevor Sie die Ergebnisse verarbeiten.

Akka könnte eine gute Passform sein. Ich habe es nicht direkt verwendet, aber es bietet einen Rahmen für das Ausführen von asynchronen Aufgaben, bietet eine Fehlerbehandlung und könnte die Aufgaben sogar an entfernte Instanzen verteilen.

Verwandte Themen