Hinweis: Dies ist ein Repost eines Threads von here.Lesen aus einer SQS-Warteschlange mit mehreren Threads
Hallo alle, Ich habe einen Prozess, der Nachrichten in einer einzigen SQS-Warteschlange verarbeitet. Die Warteschlange kann viele Nachrichten enthalten, und jede Nachricht führt zu einem Datenbanktreffer. Daher wollte ich die Leser dieser Warteschlange fädeln.
Der Basiscode für jeden Thread ist:
public void run() {
while(true) {
ReceiveMessageRequest rmr = new ReceiveMessageRequest(queueUrl)
.withMaxNumberOfMessages(10)
.withWaitTimeSeconds(3);
List<Message> messages = sqsClient.receiveMessage(rmr).getMessages();
// process messages
// delete messages
}
}
Was da ich bin ist, dass es Tonnen von duplizierten Nachrichten zwischen den Fäden. Ich weiß, dass ich ein paar Duplikate hier und dort erwarten sollte, aber es scheint, dass jeder Thread die gleiche Menge von Nachrichten bekommt und, realistisch gesehen, nur ein Thread jemals viel Arbeit macht.
Bin ich falsch verstanden, wie man die API benutzt oder mache ich etwas anderes falsch? Die Javadocs geben an, dass die AmazonSQS-Klasse threadsafe ist und tatsächlich hat sogar das Erstellen einer neuen AmazonSQS-Klasse für jeden Thread nichts geändert.
Alle Zeiger würden am meisten geschätzt werden. Mein momentaner Gedanke an eine Lösung ist, dass ein einzelner Thread aus der SQS-Warteschlange gelesen wird, jede Nachricht in eine Art LinkedBlockingDeque eingefügt wird und dann die Worker das lesen lassen. Aber ich glaube, dass diese Implementierung die Warteschlange nicht so schnell aus dem Ruder laufen lässt, wie ich es möchte.
Was in die Warteschlange in einem einzigen Thread zu hören, und dann neue Themen hochgefahren jede Nachricht zu verarbeiten, die Sie erhalten? –
@Mark B - Das ist eine Variation von dem, was ich in meinem letzten Absatz vorgeschlagen habe - ich hatte gehofft, das zu vermeiden, aber es könnte die beste Wette sein. – stdunbar
Wie lange dauert Ihr Prozess? Möglicherweise müssen Sie setVisibilityTimeout für Ihre Nachrichtenanforderung verwenden, um Ihrem Prozess Zeit zum Verarbeiten und Löschen der Nachrichten zu geben. – Larry