Erstellen von asynchronen Post HTTP-Anfragen, die für /v2/apps
benötigt wird, kann mit aiohttp
Modul erfolgen:
import asyncio
import aiohttp
async def post(url, json):
async with aiohttp.ClientSession() as session:
async with session.post(url, json=json) as resp:
return await resp.json()
async def main():
res = await post('http://httpbin.org/post', {'test': 'object'})
print(res['json']) # {'test': 'object'}
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
loop.close()
Wenn Sie /v2/events
zu verfolgen Deployment Erfolg verwenden möchten Sie für Strom verlangen sollte (siehe api doc). Es kann in aiohttp
erreicht werden damit asynchronous iteration ist: Sie gerade asynchron Inhalt line-by-line warten Ereignis zum Beispiel Sie benötigen, lesen:
import asyncio
import aiohttp
async def stream(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
async for line in resp.content:
yield line
async def main():
async for line in stream('http://httpbin.org/stream/10'):
print(line) # check if line contains event you need
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
loop.close()
Wenn Sie /v2/deployments
verwenden möchten, sollten Sie verlangen, sie in regelmäßigen Abständen eine gewisse Verzögerung warten mit asyncio.sleep
. In diesem Fall blockiert Ihre Funktion nicht:
import asyncio
import aiohttp
async def get(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()
async def main():
while True:
# Make request to see if deplayment finished:
res = await get('http://httpbin.org/get')
print(res['origin']) # break if ok here
# Async wait before next try:
await asyncio.sleep(3)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
loop.close()
Vielen Dank für Ihre Hilfe. Ich fürchte, ich habe meine Frage nicht klar genug formuliert. Ich würde gerne wissen, ob es eine andere Art und Weise als 'Asynchron-def bereitstellen (app): deployment_id = erwarten Pfosten (app) erwarten deployment_event (deployment_id)' , die sowieso eine Race-Bedingung hat. – Karsten