Ich versuche mehrere Warteschlangen gleichzeitig mit Python, asyncio und asynqp zu konsumieren.Asyncio und Rabbitmq (asynqp): wie aus mehreren Warteschlangen gleichzeitig konsumieren
Ich verstehe nicht, warum mein Funktionsaufruf asyncio.sleep()
keinen Effekt hat. Der Code pausiert dort nicht. Um ehrlich zu sein, ich verstehe eigentlich nicht, in welchem Kontext der Callback ausgeführt wird, und ob ich überhaupt Kontrolle über die Ereignisschleife abgeben kann (so dass der asyncio.sleep()
Aufruf sinnvoll wäre).
Was wäre, wenn ich einen aiohttp.ClientSession.get()
Funktionsaufruf in meiner process_msg
Callback-Funktion verwenden müsste? Ich kann das nicht, weil es keine Coroutine ist. Es muss einen Weg geben, der jenseits meines gegenwärtigen Verständnisses von Asynchron liegt.
#!/usr/bin/env python3
import asyncio
import asynqp
USERS = {'betty', 'bob', 'luis', 'tony'}
def process_msg(msg):
asyncio.sleep(10)
print('>> {}'.format(msg.body))
msg.ack()
async def connect():
connection = await asynqp.connect(host='dev_queue', virtual_host='asynqp_test')
channel = await connection.open_channel()
exchange = await channel.declare_exchange('inboxes', 'direct')
# we have 10 users. Set up a queue for each of them
# use different channels to avoid any interference
# during message consumption, just in case.
for username in USERS:
user_channel = await connection.open_channel()
queue = await user_channel.declare_queue('Inbox_{}'.format(username))
await queue.bind(exchange, routing_key=username)
await queue.consume(process_msg)
# deliver 10 messages to each user
for username in USERS:
for msg_idx in range(10):
msg = asynqp.Message('Msg #{} for {}'.format(msg_idx, username))
exchange.publish(msg, routing_key=username)
loop = asyncio.get_event_loop()
loop.run_until_complete(connect())
loop.run_forever()