2017-12-24 20 views
1

Ich arbeite an einem Big-Data-Problem und bin mit einigen Nebenläufigkeit und async io Probleme fest. Das Problem ist wie folgt:Async HTTP API-Aufruf für jede Zeile der Datei - Python

1) Haben mehrere große Dateien (~ 4gb jedes x bis 15), die ich bin mit der Verarbeitung ProcessPoolExecutor von concurrent.futures Modul auf diese Weise:

def process(source): 
    files = os.list(source) 
    with ProcessPoolExecutor() as executor: 
     future_to_url = {executor.submit(process_individual_file, source, input_file):input_file for input_file in files} 
     for future in as_completed(future_to_url): 
      data = future.result() 

2) Jetzt in jeder Datei, möchte ich Zeile für Zeile, Prozesslinie gehen, um eine bestimmte JSON zu erstellen, solche 2K jsons zusammen gruppieren und eine API mit dieser Anfrage treffen, um eine Antwort zu erhalten. Hier ist der Code:

def process_individual_file(source, input_file): 
    limit = 2000 
    with open(source+input_file) as sf: 
     for line in sf: 
      json_array.append(form_json(line)) 
      limit -= 1 

      if limit == 0: 
       response = requests.post(API_URL, json=json_array) 
       #check response status here 
       limit = 2000 

3) Nun ist das Problem, die Anzahl der Zeilen in jeder Datei wirklich groß ist und dass API-Aufruf blockiert und etwas langsam zu reagieren, das Programm nimmt sehr viel Zeit in Anspruch.

4) Was ich erreichen möchte ist, dass diese API Anruf async, so dass ich nächsten Batch von 2000 weitermachen, wenn dieser API-Aufruf passiert.

5) Dinge, die ich bis jetzt versucht habe: Ich habe versucht, dies unter Verwendung asyncio zu implementieren, aber dort müssen wir den Satz von zukünftigen Aufgaben sammeln und warten auf den Abschluss mit Ereignisschleife. Etwas wie folgt aus:

async def process_individual_file(source, input_file): 
    tasks = [] 
    limit = 2000 
    with open(source+input_file) as sf: 
     for line in sf: 
      json_array.append(form_json(line)) 
      limit -= 1 

      if limit == 0: 
       tasks.append(asyncio.ensure_future(call_api(json_array))) 
       limit = 2000 

    await asyncio.wait(tasks) 

ioloop = asyncio.get_event_loop() 
ioloop.run_until_complete(process_individual_file(source, input_file)) 
ioloop.close() 

6) Ich bin wirklich nicht das Verständnis, da diese indirekt die gleiche wie vorherige, da er alle Aufgaben sammeln wartet, bevor sie starten. Kann mir jemand bei der richtigen Architektur dieses Problems helfen? Wie kann ich die API async aufrufen, ohne alle Aufgaben zu sammeln und parallel den nächsten Stapel verarbeiten zu können?

Antwort

1

Ich bin nicht verstehen, das ist wirklich, weil dies indirekt die gleiche wie vorherige, da er alle Aufgaben sammeln wartet, bevor sie starten.

Nein, Sie irren sich hier. Wenn Sie asyncio.Task mit asyncio.ensure_future erstellen, wird sofort die Coroutine call_api ausgeführt. Dies ist, wie Aufgaben in asyncio Arbeit:

import asyncio 


async def test(i): 
    print(f'{i} started') 
    await asyncio.sleep(i) 


async def main(): 
    tasks = [ 
     asyncio.ensure_future(test(i)) 
     for i 
     in range(3) 
    ] 

    await asyncio.sleep(0) 
    print('At this moment tasks are already started') 

    await asyncio.wait(tasks) 


if __name__ == '__main__': 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(main()) 

Ausgang:

0 started 
1 started 
2 started 
At this moment tasks are already started 

Problem mit Ihrem Ansatz ist, dass process_individual_file nicht tatsächlich asynchron ist: es hat große Menge an CPU-bezogene Job ohne zu Steuerrückkehr- Ihres asyncio Ereignisschleife. Es ist ein Problem - die Ereignisschleife der Funktion blockiert die Ausführung unmöglicher Aufgaben.

Sehr einfach, aber effektive Lösung, die ich denke, die Sie verwenden können - ist die Kontrolle zu Ereignisse zurückzukehren Schleife manuell asyncio.sleep(0) nach wenige Menge der Ausführung process_individual_file verwenden, zum Beispiel auf jede Zeile zu lesen:

async def process_individual_file(source, input_file): 
    tasks = [] 
    limit = 2000 
    with open(source+input_file) as sf: 
     for line in sf: 
      await asyncio.sleep(0) # Return control to event loop to allow it execute tasks 

      json_array.append(form_json(line)) 
      limit -= 1 

      if limit == 0: 
       tasks.append(asyncio.ensure_future(call_api(json_array))) 
       limit = 2000 

    await asyncio.wait(tasks) 

Upd:

es wird mehr als Millionen von Anfragen gemacht werden und daher bin ich unbequemes Gefühl für sie alle zukünftige Objekte zu speichern in einem 0.123.Liste

Es macht viel Sinn. Nichts Gutes wird passieren, wenn Sie Millionen von parallelen Netzwerkanforderungen ausführen. Üblicherweise wird in diesem Fall die Verwendung von Synchronisationsprimitiven wie asyncio.Semaphore als Grenzwert festgelegt.

Ich rate Ihnen, Generator zu machen json_array aus Datei, und erwerben Semaphore vor dem Hinzufügen neuer Aufgabe und Freigabe auf Task bereit. Sie erhalten sauberen Code, der vor vielen parallel laufenden Aufgaben geschützt ist.

Dies sieht wie etwas wie folgt aus:

def get_json_array(input_file): 
    json_array = [] 
    limit = 2000 

    with open(input_file) as sf: 
     for line in sf: 
      json_array.append(form_json(line)) 

      limit -= 1 
      if limit == 0: 
       yield json_array # generator will allow split file-reading logic from adding tasks 

       json_array = [] 
       limit = 2000 


sem = asyncio.Semaphore(50) # don't allow more than 50 parallel requests 

async def process_individual_file(input_file): 
    for json_array in get_json_array(input_file): 
     await sem.acquire() # file reading wouldn't resume until there's some place for newer tasks 
     task = asyncio.ensure_future(call_api(json_array)) 
     task.add_done_callback(lambda t: sem.release()) # on task done - free place for next tasks 
     task.add_done_callback(lambda t: print(t.result())) # print result on some call_api done 
+0

Dank @Mikhail Gerasimov für mich zu korrigieren. In der Tat habe ich es nicht richtig verstanden und war daher festgefahren, da die erwarteten Ergebnisse nicht kamen. Was du sagst, macht Sinn, aber noch ein weiterer Zweifel, den ich hier bekomme, ist, dass wegen der Größe der Datei mehr als Millionen von Anfragen zu erledigen sind und ich mich daher unwohl fühle, zukünftige Objekte für alle in einem zu speichern Liste. –

+0

Hier ist ein weiterer Artikel, den ich in der Zwischenzeit durchging [link] (https://hackernoon.com/controlling-python-async-creep-ec0a0f4b79ba). Scheint so, als könnte ich eine Event-Schleife in einem anderen Thread auslösen und meine Futures an diesen Thread delegieren und den Callback bei der API-Antwort auslösen. Momentan arbeite ich an demselben Projekt. Lassen Sie mich auch Ihre Vorschläge ausprobieren und mit Ergebnissen zurückkommen. Vielen Dank :) –

+0

@ShubhamPatil Ich aktualisierte die Antwort, die zeigt, wie man viele parallele Anträge vermeidet. –

Verwandte Themen