2016-08-03 3 views
5

Hinweis: Dies ist ein Repost eines Threads von here.Lesen aus einer SQS-Warteschlange mit mehreren Threads

Hallo alle, Ich habe einen Prozess, der Nachrichten in einer einzigen SQS-Warteschlange verarbeitet. Die Warteschlange kann viele Nachrichten enthalten, und jede Nachricht führt zu einem Datenbanktreffer. Daher wollte ich die Leser dieser Warteschlange fädeln.

Der Basiscode für jeden Thread ist:

public void run() { 
    while(true) { 
     ReceiveMessageRequest rmr = new ReceiveMessageRequest(queueUrl) 
       .withMaxNumberOfMessages(10) 
       .withWaitTimeSeconds(3); 
     List<Message> messages = sqsClient.receiveMessage(rmr).getMessages(); 
     // process messages 
     // delete messages 
    } 
} 

Was da ich bin ist, dass es Tonnen von duplizierten Nachrichten zwischen den Fäden. Ich weiß, dass ich ein paar Duplikate hier und dort erwarten sollte, aber es scheint, dass jeder Thread die gleiche Menge von Nachrichten bekommt und, realistisch gesehen, nur ein Thread jemals viel Arbeit macht.

Bin ich falsch verstanden, wie man die API benutzt oder mache ich etwas anderes falsch? Die Javadocs geben an, dass die AmazonSQS-Klasse threadsafe ist und tatsächlich hat sogar das Erstellen einer neuen AmazonSQS-Klasse für jeden Thread nichts geändert.

Alle Zeiger würden am meisten geschätzt werden. Mein momentaner Gedanke an eine Lösung ist, dass ein einzelner Thread aus der SQS-Warteschlange gelesen wird, jede Nachricht in eine Art LinkedBlockingDeque eingefügt wird und dann die Worker das lesen lassen. Aber ich glaube, dass diese Implementierung die Warteschlange nicht so schnell aus dem Ruder laufen lässt, wie ich es möchte.

+1

Was in die Warteschlange in einem einzigen Thread zu hören, und dann neue Themen hochgefahren jede Nachricht zu verarbeiten, die Sie erhalten? –

+0

@Mark B - Das ist eine Variation von dem, was ich in meinem letzten Absatz vorgeschlagen habe - ich hatte gehofft, das zu vermeiden, aber es könnte die beste Wette sein. – stdunbar

+2

Wie lange dauert Ihr Prozess? Möglicherweise müssen Sie setVisibilityTimeout für Ihre Nachrichtenanforderung verwenden, um Ihrem Prozess Zeit zum Verarbeiten und Löschen der Nachrichten zu geben. – Larry

Antwort

1

Da Sie für jede Nachricht einen Datenbanktreffer haben, scheint die Verarbeitung jeder Nachricht Zeit zu beanspruchen. Sie sollten das Sichtbarkeits-Timeout der Warteschlange erhöhen.

Von AWS SQS Dokumentation:

Unmittelbar nachdem die Nachricht empfangen wird, bleibt es in der Warteschlange. Um andere Verbraucher von der Verarbeitung der Nachricht erneut zu verhindern, setzt Amazon SQS ein Sichtbarkeits-Timeout, einen Zeitraum, in dem Amazon SQS verhindert, dass andere konsumierende Komponenten die Nachricht empfangen und verarbeiten.

(http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html)

+0

Das Zeitlimit für geringe Sichtbarkeit konnte bei langen Nachrichtenverarbeitungszeiten zu Problemen mit doppelten Nachrichten führen, war aber in meinem Fall eindeutig nicht die Hauptursache, da es auf drei Minuten eingestellt war und ich doppelte Nachrichten sah, bevor SQS sie wieder verfügbar machte . Am Ende habe ich den gepinnten Dispatcher in Akka verwendet, um zu garantieren, dass ein einziger Thread alle Lesevorgänge aus der Warteschlange ausführt. Die Nachrichtenverarbeitung wird jedoch von verschiedenen Threads ausgeführt, um zu verhindern, dass sie das Lesen neuer Nachrichten aus der Warteschlange stört –

Verwandte Themen