2016-08-11 6 views
0

Ich versuche mehrere Warteschlangen gleichzeitig mit Python, asyncio und asynqp zu konsumieren.Asyncio und Rabbitmq (asynqp): wie aus mehreren Warteschlangen gleichzeitig konsumieren

Ich verstehe nicht, warum mein Funktionsaufruf asyncio.sleep() keinen Effekt hat. Der Code pausiert dort nicht. Um ehrlich zu sein, ich verstehe eigentlich nicht, in welchem ​​Kontext der Callback ausgeführt wird, und ob ich überhaupt Kontrolle über die Ereignisschleife abgeben kann (so dass der asyncio.sleep() Aufruf sinnvoll wäre).

Was wäre, wenn ich einen aiohttp.ClientSession.get() Funktionsaufruf in meiner process_msg Callback-Funktion verwenden müsste? Ich kann das nicht, weil es keine Coroutine ist. Es muss einen Weg geben, der jenseits meines gegenwärtigen Verständnisses von Asynchron liegt.

#!/usr/bin/env python3 

import asyncio 
import asynqp 


USERS = {'betty', 'bob', 'luis', 'tony'} 


def process_msg(msg): 
    asyncio.sleep(10) 
    print('>> {}'.format(msg.body)) 
    msg.ack() 

async def connect(): 
    connection = await asynqp.connect(host='dev_queue', virtual_host='asynqp_test') 
    channel = await connection.open_channel() 
    exchange = await channel.declare_exchange('inboxes', 'direct') 

    # we have 10 users. Set up a queue for each of them 
    # use different channels to avoid any interference 
    # during message consumption, just in case. 
    for username in USERS: 
     user_channel = await connection.open_channel() 
     queue = await user_channel.declare_queue('Inbox_{}'.format(username)) 
     await queue.bind(exchange, routing_key=username) 
     await queue.consume(process_msg) 

    # deliver 10 messages to each user 
    for username in USERS: 
     for msg_idx in range(10): 
      msg = asynqp.Message('Msg #{} for {}'.format(msg_idx, username)) 
      exchange.publish(msg, routing_key=username) 


loop = asyncio.get_event_loop() 
loop.run_until_complete(connect()) 
loop.run_forever() 

Antwort

1

Ich verstehe nicht, warum mein asyncio.sleep() - Funktionsaufruf keinen Effekt hat.

Weil asyncio.sleep() ein zukünftiges Objekt zurückgibt, die in Kombination mit einem Ereignisschleife (oder async/await Semantik) verwendet werden muss.

Sie können nicht await in einfache def Deklaration verwenden, da der Rückruf außerhalb async/await Kontext aufgerufen wird, die an eine Ereignisschleife unter der Haube angeschlossen ist. Mit anderen Worten ist das Mischen von Callback-Stil mit async/await Stil ziemlich schwierig. obwohl

die einfache Lösung ist es, die Arbeit zurück zu der Ereignisschleife planen:

async def process_msg(msg): 
    await asyncio.sleep(10) 
    print('>> {}'.format(msg.body)) 
    msg.ack() 

def _process_msg(msg): 
    loop = asyncio.get_event_loop() 
    loop.create_task(process_msg(msg)) 
    # or if loop is always the same one single line is enough 
    # asyncio.ensure_future(process_msg(msg)) 

# some code 
await queue.consume(_process_msg) 

Hinweis, dass es keine Rekursion in _process_msg Funktion, das heißt der Körper von process_msg wird nicht ausgeführt, während in _process_msg. Die interne process_msg-Funktion wird aufgerufen, sobald das Steuerelement zur Ereignisschleife zurückkehrt.

def async_to_callback(coro): 
    def callback(*args, **kwargs): 
     asyncio.ensure_future(coro(*args, **kwargs)) 
    return callback 

async def process_msg(msg): 
    # the body 

# some code 
await queue.consume(async_to_callback(process_msg)) 
:

Dies kann mit dem folgenden Code verallgemeinern

Verwandte Themen