Ich schrieb ein Programm, das Ereignisse mit asyncio
und aiohttp
schreiben würde. Dieses Programm funktioniert, wenn ich es lokal ausführe. Ich kann 10k Events kein Problem posten. Aber ich SCPed die gesamte Code-Basis zu einer entfernten Maschine und innerhalb dieser Maschine kann ich nicht mehr als 15 Ereignisse veröffentlichen, ohne diesen Fehler:Konnte keine hohe Anzahl von Posts auf Remote-Computer vs lokale mit Asynio und Aiohttp
RuntimeError: Event loop is closed
Exception ignored in: <coroutine object Poster.async_post_event at 0x7f4a53989410>
Traceback (most recent call last):
File "/home/bli1/qe-trinity/tracer/utils/poster.py", line 63, in async_post_event
File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 565, in __aenter__
File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 198, in _request
File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 316, in connect
File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 349, in _release_waiter
File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 332, in set_result
File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 447, in call_soon
File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 456, in _call_soon
File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed
RuntimeError: Event loop is closed
Exception ignored in: <coroutine object Poster.async_post_event at 0x7f4a5397ffc0>
Traceback (most recent call last):
File "/home/bli1/qe-trinity/tracer/utils/poster.py", line 63, in async_post_event
File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 565, in __aenter__
File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 198, in _request
File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 316, in connect
File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 349, in _release_waiter
File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 332, in set_result
File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 447, in call_soon
File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 456, in _call_soon
File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed
RuntimeError: Event loop is closed
Wie kann ich debuggen diese oder die Quelle des Problems herausfinden? Hier
ist die Klasse, die ich erstellt und ich verwende die Methode post()
auszuführen:
import uuid
import os
import asyncio
import time
import random
import json
import aiohttp
from tracer.utils.phase import Phase
class Poster(Phase):
def __init__(self, log, endpoint, num_post, topic, datafile, timeout, oracles, secure=False, thru_proxy=True):
Phase.__init__(self, log, "post", oracles, secure, thru_proxy)
self.log = log
self.num_post = int(num_post)
self.datafile = datafile.readlines()
self.topic = topic
self.endpoint = self.set_endpoint(endpoint, self.topic)
self.response = None
self.timeout = timeout
def random_line(self):
""" Returns random line from file and converts it to JSON """
return json.loads(random.choice(self.datafile))
@staticmethod
def change_uuid(event):
""" Creates new UUID for event_id """
new_uuid = str(uuid.uuid4())
event["event_header"]["event_id"] = new_uuid
return event
@staticmethod
def wrapevent(event):
""" Wrap event with metadata for analysis later on """
return {
"tracer": {
"post": {
"statusCode": None,
"timestamp": None,
},
"awsKafkaTimestamp": None,
"qdcKakfaTimestamp": None,
"hdfsTimestamp": None
},
"event": event
}
def gen_random_event(self):
random_event = self.random_line()
event = self.change_uuid(random_event)
dataspec = self.wrapevent(event)
return dataspec
async def async_post_event(self, event, session):
async with session.post(self.endpoint, data=event, proxy=self.proxy) as resp:
event["tracer"]["post"]["timestamp"] = time.time() * 1000.0
event["tracer"]["post"]["statusCode"] = resp.status
unique_id = event["event"]["event_header"]["event_id"]
oracle_endpoint = os.path.join(self.oracle, unique_id)
async with session.put(oracle_endpoint, data=json.dumps(event), proxy=self.proxy) as resp:
if resp.status != 200:
self.log.debug("Post to ElasticSearch not 200")
self.log.debug(event["event"]["event_header"]["event_id"])
self.log.debug("Status code: " + str(resp.status))
return event["event"]["event_header"]["event_id"], resp.status
async def async_post_events(self, events):
coros = []
conn = aiohttp.TCPConnector(verify_ssl=self.secure)
async with aiohttp.ClientSession(connector=conn) as session:
for event in events:
coros.append(self.async_post_event(event, session))
return await asyncio.gather(*coros)
def post(self):
event_loop = asyncio.get_event_loop()
try:
events = [self.gen_random_event() for i in range(self.num_post)]
start_time = time.time()
results = event_loop.run_until_complete(self.async_post_events(events))
print("Time taken: " + str(time.time() - start_time))
finally:
event_loop.close()