ich etwas ähnliches mit SQS erreicht haben Javas mit ExecutorService, Future und die ConcurrentLinkedQueue.
Der ExecutorService erstellt einen Threadpool, der Klassen ausführen kann, die die Callable-Schnittstelle implementieren und einen Future zurückgibt. Wenn der ExecutorService die Futures erstellt, schiebe ich sie auf eine ConcurrentLinkedQueue, die in einem Thread ausgeführt wird, und verarbeitet die Ergebnisse, sobald die Futures abgeschlossen sind.
Implementieren SQS und Starten der Arbeit asynchron Überprüfung:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class SqsProcessor {
private static final int THREAD_COUNT = 100;
private ExecutorService _executor = null;
private FutureResultProcessor futureResultProcessor = null;
public SqsProcessor() {
_executor = Executors.newFixedThreadPool(THREAD_COUNT);
_futureResultProcessor = new FutureResultProcessor();
}
public void waitReceive() {
// Receive a SQS message
// Start the work related to the SQS message
Callable<MyWorkderResult> sqsWorker = new MyWorker(sqsMessage);
Future<MyWorkerResult> sqsFuture = _executor.submit(sqsWorker);
// Send to the queue so the result can be processed when it completes
_futureResultProcessor.add(sqsFuture);
}
}
Klasse, die die Arbeit macht:
import java.util.concurrent.Callable;
public class MyWorker implements Callable<MyWorkerResult> {
private String _sqsMessage = null;
public MyWorker(String sqsMessage) {
_sqsMessage = sqsMessage;
}
@Override
public MyWorkerResult call() throws Exception {
// Do work relating to the SQS message
}
}
Hält die Ergebnisse der Arbeit:
public class MyWorkerResult {
// Results set in MyWorker call()
}
ConcurrentLinkedQueue zu erhalten und verarbeiten die zukünftigen Ergebnisse:
import java.util.concurrent.Future;
import java.util.concurrent.ConcurrentLinkedQueue;
public class FutureResultProcessor extends Thread {
private final ConcurrentLinkedQueue<Future<MyWorkerResult>> resultQueue = new ConcurrentLinkedQueue<Future<MyWorkerResult>>();
private final Integer CHECK_SLEEP = 300;
public FutureResultProcessor() {
}
public void run() {
while(true) {
Future<MyWorkerResult> myFuture = resultQueue.poll();
if(myFuture == null) {
// There's nothing to process
try { Thread.sleep(CHECK_SLEEP); } catch (InterruptedException e) {}
continue;
}
// Process result
if(myFuture != null) {
MyFutureResult myFutureResult = myFuture.get();
// Process result
}
}
}
public void add(Future<MyWorkerResult> sqsFuture) {
resultQueue.offer(sqsFuture);
}
}
Alternativ könnten Sie eine Gruppe von Futures sammeln und darauf warten, dass alle fertig sind, bevor Sie die Ergebnisse verarbeiten.
Akka könnte eine gute Passform sein. Ich habe es nicht direkt verwendet, aber es bietet einen Rahmen für das Ausführen von asynchronen Aufgaben, bietet eine Fehlerbehandlung und könnte die Aufgaben sogar an entfernte Instanzen verteilen.