2016-11-03 3 views
1

Ich schrieb ein Programm, das Ereignisse mit asyncio und aiohttp schreiben würde. Dieses Programm funktioniert, wenn ich es lokal ausführe. Ich kann 10k Events kein Problem posten. Aber ich SCPed die gesamte Code-Basis zu einer entfernten Maschine und innerhalb dieser Maschine kann ich nicht mehr als 15 Ereignisse veröffentlichen, ohne diesen Fehler:Konnte keine hohe Anzahl von Posts auf Remote-Computer vs lokale mit Asynio und Aiohttp

RuntimeError: Event loop is closed 
Exception ignored in: <coroutine object Poster.async_post_event at 0x7f4a53989410> 
Traceback (most recent call last): 
    File "/home/bli1/qe-trinity/tracer/utils/poster.py", line 63, in async_post_event 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 565, in __aenter__ 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 198, in _request 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 316, in connect 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 349, in _release_waiter 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 332, in set_result 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 447, in call_soon 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 456, in _call_soon 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed 
RuntimeError: Event loop is closed 
Exception ignored in: <coroutine object Poster.async_post_event at 0x7f4a5397ffc0> 
Traceback (most recent call last): 
    File "/home/bli1/qe-trinity/tracer/utils/poster.py", line 63, in async_post_event 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 565, in __aenter__ 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 198, in _request 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 316, in connect 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 349, in _release_waiter 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 332, in set_result 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 447, in call_soon 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 456, in _call_soon 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed 
RuntimeError: Event loop is closed 

Wie kann ich debuggen diese oder die Quelle des Problems herausfinden? Hier

ist die Klasse, die ich erstellt und ich verwende die Methode post() auszuführen:

import uuid 
import os 
import asyncio 
import time 
import random 
import json 
import aiohttp 
from tracer.utils.phase import Phase 

class Poster(Phase): 
    def __init__(self, log, endpoint, num_post, topic, datafile, timeout, oracles, secure=False, thru_proxy=True): 
     Phase.__init__(self, log, "post", oracles, secure, thru_proxy) 
     self.log = log 
     self.num_post = int(num_post) 
     self.datafile = datafile.readlines() 
     self.topic = topic 
     self.endpoint = self.set_endpoint(endpoint, self.topic) 
     self.response = None 
     self.timeout = timeout 

    def random_line(self): 
     """ Returns random line from file and converts it to JSON """ 
     return json.loads(random.choice(self.datafile)) 

    @staticmethod 
    def change_uuid(event): 
     """ Creates new UUID for event_id """ 
     new_uuid = str(uuid.uuid4()) 
     event["event_header"]["event_id"] = new_uuid 
     return event 

    @staticmethod 
    def wrapevent(event): 
     """ Wrap event with metadata for analysis later on """ 
     return { 
      "tracer": { 
       "post": { 
        "statusCode": None, 
        "timestamp": None, 
       }, 
       "awsKafkaTimestamp": None, 
       "qdcKakfaTimestamp": None, 
       "hdfsTimestamp": None 
      }, 
      "event": event 
     } 

    def gen_random_event(self): 
     random_event = self.random_line() 
     event = self.change_uuid(random_event) 
     dataspec = self.wrapevent(event) 
     return dataspec 

    async def async_post_event(self, event, session): 
     async with session.post(self.endpoint, data=event, proxy=self.proxy) as resp: 
      event["tracer"]["post"]["timestamp"] = time.time() * 1000.0 
      event["tracer"]["post"]["statusCode"] = resp.status 
      unique_id = event["event"]["event_header"]["event_id"] 
      oracle_endpoint = os.path.join(self.oracle, unique_id) 
     async with session.put(oracle_endpoint, data=json.dumps(event), proxy=self.proxy) as resp: 
      if resp.status != 200: 
       self.log.debug("Post to ElasticSearch not 200") 
       self.log.debug(event["event"]["event_header"]["event_id"]) 
       self.log.debug("Status code: " + str(resp.status)) 
      return event["event"]["event_header"]["event_id"], resp.status 

    async def async_post_events(self, events): 
     coros = [] 
     conn = aiohttp.TCPConnector(verify_ssl=self.secure) 
     async with aiohttp.ClientSession(connector=conn) as session: 
      for event in events: 
       coros.append(self.async_post_event(event, session)) 
      return await asyncio.gather(*coros) 

    def post(self): 
     event_loop = asyncio.get_event_loop() 
     try: 
      events = [self.gen_random_event() for i in range(self.num_post)] 
      start_time = time.time() 
      results = event_loop.run_until_complete(self.async_post_events(events)) 
      print("Time taken: " + str(time.time() - start_time)) 
     finally: 
      event_loop.close() 

Antwort

2

Sie können eine Schleife wieder benutzen, wenn es geschlossen ist. Von AbstractEventLoop.close Dokumentation:

This is idempotent and irreversible. No other methods should be called after this one.

Entweder entfernen Sie den loop.close Anruf oder eine neue Schleife für jeden Post erstellen.

Mein Rat wäre, diese Probleme zu vermeiden, indem Sie alles innerhalb der Schleife laufen lassen und async_post_events bei Bedarf erwarten.

Verwandte Themen