2012-04-16 6 views
9

Ich arbeite an einer Anwendung, deren Workflow verwaltet wird, indem Nachrichten in SQS mit Boto übergeben werden.Wie erhalten Sie alle Nachrichten in der Amazon SQS-Warteschlange mithilfe der Boto-Bibliothek in Python?

Meine SQS-Warteschlange wächst allmählich, und ich habe keine Möglichkeit zu überprüfen, wie viele Elemente es enthalten soll.

Jetzt habe ich einen Daemon, der regelmäßig die Warteschlange abfragt und überprüft, ob ich eine feste Größe von Elementen habe. Betrachten wir zum Beispiel die folgende „Warteschlange“:

q = ["msg1_comp1", "msg2_comp1", "msg1_comp2", "msg3_comp1", "msg2_comp2"] 

Jetzt will ich überprüfen, ob ich „msg1_comp1“ haben „msg2_comp1“ und „msg3_comp1“ in der Warteschlange zusammen an einem gewissen Punkt in der Zeit, aber ich don‘ Ich kenne die Größe der Warteschlange.

Nach über die API suchen, es scheint, dass Sie entweder nur ein Element bekommen, oder eine festgelegte Anzahl von Elementen in der Warteschlange, aber nicht alle:

>>> rs = q.get_messages() 
>>> len(rs) 
1 
>>> rs = q.get_messages(10) 
>>> len(rs) 
10 

Ein Vorschlag in den Antworten vorgeschlagen werden würde bekomme zum Beispiel 10 Nachrichten in einer Schleife, bis ich nichts zurückbekomme, aber Nachrichten in SQS haben ein Sichtbarkeits-Timeout, was bedeutet, dass wenn ich Elemente aus der Warteschlange abrufe, sie nicht wirklich entfernt werden, sie werden nur für kurze Zeit unsichtbar sein von Zeit.

Gibt es eine einfache Möglichkeit, alle Nachrichten in der Warteschlange zu erhalten, ohne zu wissen, wie viele es sind?

Antwort

13

Setzen Sie Ihren Anruf q.get_messages(n) innerhalb while-Schleife:

all_messages=[] 
rs=q.get_messages(10) 
while len(rs)>0: 
    all_messages.extend(rs) 
    rs=q.get_messages(10) 

Zusätzlich dump won't support more than 10 messages entweder:

def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'): 
    """Utility function to dump the messages in a queue to a file 
    NOTE: Page size must be < 10 else SQS errors""" 
+0

kann ich nicht wirklich tun, da die Nachrichten in SQS einen Sichtbarkeits Timeout haben, so dass, wenn ich zum ersten Mal erhalten 10 Nachrichten, dann einige Male eine Schleife, beim nächsten Mal bekomme ich die gleichen 10 Nachrichten seit dem Timeout. Ich denke darüber nach, 'dump()' zu verwenden, aber ich muss die Datei danach lesen, das scheint albern zu sein, fehlt mir etwas? (Ich könnte das Sichtbarkeitszeitlimit auf eine sehr lange Zeit setzen, aber das scheint hässlich). –

+0

@linker - Sie haben gesagt, Sie müssen nach 'n' spezifischen Nachrichten suchen. bedeutet das, dass es einige Übereinstimmungskriterien gibt, mit denen Sie jede Nachricht vergleichen? –

+0

Sorry, wenn das verwirrend war, habe ich meinen Beitrag aktualisiert. –

5

Mein Verständnis ist, dass die verteilte Natur des SQS Service ziemlich Ihr Design nicht praktikabel macht. Jedes Mal, wenn Sie get_messages aufrufen, sprechen Sie mit einer anderen Gruppe von Servern, die einige, aber nicht alle Ihre Nachrichten enthalten. Daher ist es nicht möglich, von Zeit zu Zeit einzuchecken, um festzulegen, ob eine bestimmte Gruppe von Nachrichten bereit ist, und diese dann einfach zu akzeptieren.

Was Sie tun müssen, ist kontinuierlich abzufragen, nehmen Sie alle Nachrichten, wie sie ankommen, und speichern Sie sie lokal in Ihren eigenen Datenstrukturen. Nach jedem erfolgreichen Abruf können Sie Ihre Datenstrukturen überprüfen, um festzustellen, ob ein vollständiger Nachrichtensatz gesammelt wurde.

Beachten Sie, dass Nachrichten wird aus Reihenfolge ankommen, und einige Nachrichten wird zweimal geliefert werden, als Löschungen für alle SQS-Server propagieren, aber nachfolgende manchmal bekommen Anfragen schlagen die Nachrichten löschen.

0

Etwas wie der Code unten sollte den Trick tun. Sorry, es ist in C#, aber es sollte nicht schwierig sein, es in Python zu konvertieren. Das Wörterbuch wird verwendet, um die Duplikate auszusortieren.

public Dictionary<string, Message> GetAllMessages(int pollSeconds) 
    { 
     var msgs = new Dictionary<string, Message>(); 
     var end = DateTime.Now.AddSeconds(pollSeconds); 

     while (DateTime.Now <= end) 
     { 
      var request = new ReceiveMessageRequest(Url); 
      request.MaxNumberOfMessages = 10; 

      var response = GetClient().ReceiveMessage(request); 

      foreach (var msg in response.Messages) 
      { 
       if (!msgs.ContainsKey(msg.MessageId)) 
       { 
        msgs.Add(msg.MessageId, msg); 
       } 
      } 
     } 

     return msgs; 
    } 
9

Ich habe mit AWS SQS-Warteschlangen arbeiten sofortige Benachrichtigungen zu liefern, so brauche ich alle Nachrichten in Echtzeit zu verarbeiten. Der folgende Code hilft Ihnen, alle Nachrichten effizient zu entfernen und alle Fehler beim Entfernen zu beheben.

Hinweis: Um Nachrichten aus der Warteschlange zu entfernen, müssen Sie sie löschen.Ich bin mit dem aktualisierten boto3 AWS Python SDK, json Bibliothek und die folgenden Standardwerte:

import boto3 
import json 

region_name = 'us-east-1' 
queue_name = 'example-queue-12345' 
max_queue_messages = 10 
message_bodies = [] 
aws_access_key_id = '<YOUR AWS ACCESS KEY ID>' 
aws_secret_access_key = '<YOUR AWS SECRET ACCESS KEY>' 
sqs = boto3.resource('sqs', region_name=region_name, 
     aws_access_key_id=aws_access_key_id, 
     aws_secret_access_key=aws_secret_access_key) 
queue = sqs.get_queue_by_name(QueueName=queue_name) 
while True: 
    messages_to_delete = [] 
    for message in queue.receive_messages(
      MaxNumberOfMessages=max_queue_messages) 
     # process message body 
     body = json.loads(message.body) 
     message_bodies.append(body) 
     # add message to delete 
     messages_to_delete.append({ 
      'Id': message.message_id, 
      'ReceiptHandle': message.receipt_handle 
     }) 

    # if you don't receive any notifications the 
    # messages_to_delete list will be empty 
    if len(messages_to_delete) == 0: 
     break 
    # delete messages to remove them from SQS queue 
    # handle any errors 
    else: 
     delete_response = queue.delete_messages(
       Entries=messages_to_delete) 
+0

Eine Anpassung für die v2 'Boto' Pakete, um die' delete_messages' Funktion von 'Boto3' zu" rückportieren "ist [hier] (http://stackoverflow.com/a/40638174/4228193). Das eingebaute 'Boto' (2)' delete_message_batch' hat eine Begrenzung von 10 Nachrichten UND erfordert vollständige 'Nachricht'-Klassenobjekte, anstatt nur die' ID' und 'EmpfangsHandel' in einem Objekt. – mpag

0

Hinweis: Dies ist nicht als direkte Antwort auf die Frage gedacht. Vielmehr ist es eine Ergänzung zu @TimothyLiu's answer, vorausgesetzt, der Endbenutzer verwendet das Boto-Paket (aka Boto2) nicht Boto3. Dieser Code ist eine "Boto-2-isierung" des Anrufs delete_messages in his answer bezeichnet


A Boto (2) -Aufruf für delete_message_batch(messages_to_delete) wo messages_to_delete ist ein dict Objekt mit Schlüsseln: Wert entsprechend id: receipt_handle Paare liefern

AttributeError: 'dict' object has no attribute 'id'.

Es scheint delete_message_batch erwartet ein Message Klassenobjekt; das Kopieren der Boto source for delete_message_batch und die Verwendung eines nicht Message Objekts (ala boto3) schlägt ebenfalls fehl, wenn Sie mehr als 10 "Nachrichten" gleichzeitig löschen. Also musste ich die folgende Umgehung verwenden.

eprint Code von here

from __future__ import print_function 
import sys 
from itertools import islice 

def eprint(*args, **kwargs): 
    print(*args, file=sys.stderr, **kwargs) 

@static_vars(counter=0) 
def take(n, iterable, reset=False): 
    "Return next n items of the iterable as same type" 
    if reset: take.counter = 0 
    take.counter += n 
    bob = islice(iterable, take.counter-n, take.counter) 
    if isinstance(iterable, dict): return dict(bob) 
    elif isinstance(iterable, list): return list(bob) 
    elif isinstance(iterable, tuple): return tuple(bob) 
    elif isinstance(iterable, set): return set(bob) 
    elif isinstance(iterable, file): return file(bob) 
    else: return bob 

def delete_message_batch2(cx, queue, messages): #returns a string reflecting level of success rather than throwing an exception or True/False 
    """ 
    Deletes a list of messages from a queue in a single request. 
    :param cx: A boto connection object. 
    :param queue: The :class:`boto.sqs.queue.Queue` from which the messages will be deleted 
    :param messages: List of any object or structure with id and receipt_handle attributes such as :class:`boto.sqs.message.Message` objects. 
    """ 
    listof10s = [] 
    asSuc, asErr, acS, acE = "","",0,0 
    res = [] 
    it = tuple(enumerate(messages)) 
    params = {} 
    tenmsg = take(10,it,True) 
    while len(tenmsg)>0: 
    listof10s.append(tenmsg) 
    tenmsg = take(10,it) 
    while len(listof10s)>0: 
    tenmsg = listof10s.pop() 
    params.clear() 
    for i, msg in tenmsg: #enumerate(tenmsg): 
     prefix = 'DeleteMessageBatchRequestEntry' 
     numb = (i%10)+1 
     p_name = '%s.%i.Id' % (prefix, numb) 
     params[p_name] = msg.get('id') 
     p_name = '%s.%i.ReceiptHandle' % (prefix, numb) 
     params[p_name] = msg.get('receipt_handle') 
    try: 
     go = cx.get_object('DeleteMessageBatch', params, BatchResults, queue.id, verb='POST') 
     (sSuc,cS),(sErr,cE) = tup_result_messages(go) 
     if cS: 
     asSuc += ","+sSuc 
     acS += cS 
     if cE: 
     asErr += ","+sErr 
     acE += cE 
    except cx.ResponseError: 
     eprint("Error in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params)) 
    except: 
     eprint("Error of unknown type in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params)) 
    return stringify_final_tup(asSuc, asErr, acS, acE, expect=len(messages)) #mdel #res 

def stringify_final_tup(sSuc="", sErr="", cS=0, cE=0, expect=0): 
    if sSuc == "": sSuc="None" 
    if sErr == "": sErr="None" 
    if cS == expect: sSuc="All" 
    if cE == expect: sErr="All" 
    return "Up to {} messages removed [{}]\t\tMessages remaining ({}) [{}]".format(cS,sSuc,cE,sErr) 
1

führe ich dies in einem Cronjob

from django.core.mail import EmailMessage 
from django.conf import settings 
import boto3 
import json 

sqs = boto3.resource('sqs', aws_access_key_id=settings.AWS_ACCESS_KEY_ID, 
     aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY, 
     region_name=settings.AWS_REGION) 

queue = sqs.get_queue_by_name(QueueName='email') 
messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1) 

while len(messages) > 0: 
    for message in messages: 
     mail_body = json.loads(message.body) 
     print("E-mail sent to: %s" % mail_body['to']) 
     email = EmailMessage(mail_body['subject'], mail_body['message'], to=[mail_body['to']]) 
     email.send() 
     message.delete() 

    messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1) 
Verwandte Themen