Ich arbeite an einem Big-Data-Problem und bin mit einigen Nebenläufigkeit und async io Probleme fest. Das Problem ist wie folgt:Async HTTP API-Aufruf für jede Zeile der Datei - Python
1) Haben mehrere große Dateien (~ 4gb jedes x bis 15), die ich bin mit der Verarbeitung ProcessPoolExecutor von concurrent.futures Modul auf diese Weise:
def process(source):
files = os.list(source)
with ProcessPoolExecutor() as executor:
future_to_url = {executor.submit(process_individual_file, source, input_file):input_file for input_file in files}
for future in as_completed(future_to_url):
data = future.result()
2) Jetzt in jeder Datei, möchte ich Zeile für Zeile, Prozesslinie gehen, um eine bestimmte JSON zu erstellen, solche 2K jsons zusammen gruppieren und eine API mit dieser Anfrage treffen, um eine Antwort zu erhalten. Hier ist der Code:
def process_individual_file(source, input_file):
limit = 2000
with open(source+input_file) as sf:
for line in sf:
json_array.append(form_json(line))
limit -= 1
if limit == 0:
response = requests.post(API_URL, json=json_array)
#check response status here
limit = 2000
3) Nun ist das Problem, die Anzahl der Zeilen in jeder Datei wirklich groß ist und dass API-Aufruf blockiert und etwas langsam zu reagieren, das Programm nimmt sehr viel Zeit in Anspruch.
4) Was ich erreichen möchte ist, dass diese API Anruf async, so dass ich nächsten Batch von 2000 weitermachen, wenn dieser API-Aufruf passiert.
5) Dinge, die ich bis jetzt versucht habe: Ich habe versucht, dies unter Verwendung asyncio zu implementieren, aber dort müssen wir den Satz von zukünftigen Aufgaben sammeln und warten auf den Abschluss mit Ereignisschleife. Etwas wie folgt aus:
async def process_individual_file(source, input_file):
tasks = []
limit = 2000
with open(source+input_file) as sf:
for line in sf:
json_array.append(form_json(line))
limit -= 1
if limit == 0:
tasks.append(asyncio.ensure_future(call_api(json_array)))
limit = 2000
await asyncio.wait(tasks)
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(process_individual_file(source, input_file))
ioloop.close()
6) Ich bin wirklich nicht das Verständnis, da diese indirekt die gleiche wie vorherige, da er alle Aufgaben sammeln wartet, bevor sie starten. Kann mir jemand bei der richtigen Architektur dieses Problems helfen? Wie kann ich die API async aufrufen, ohne alle Aufgaben zu sammeln und parallel den nächsten Stapel verarbeiten zu können?
Dank @Mikhail Gerasimov für mich zu korrigieren. In der Tat habe ich es nicht richtig verstanden und war daher festgefahren, da die erwarteten Ergebnisse nicht kamen. Was du sagst, macht Sinn, aber noch ein weiterer Zweifel, den ich hier bekomme, ist, dass wegen der Größe der Datei mehr als Millionen von Anfragen zu erledigen sind und ich mich daher unwohl fühle, zukünftige Objekte für alle in einem zu speichern Liste. –
Hier ist ein weiterer Artikel, den ich in der Zwischenzeit durchging [link] (https://hackernoon.com/controlling-python-async-creep-ec0a0f4b79ba). Scheint so, als könnte ich eine Event-Schleife in einem anderen Thread auslösen und meine Futures an diesen Thread delegieren und den Callback bei der API-Antwort auslösen. Momentan arbeite ich an demselben Projekt. Lassen Sie mich auch Ihre Vorschläge ausprobieren und mit Ergebnissen zurückkommen. Vielen Dank :) –
@ShubhamPatil Ich aktualisierte die Antwort, die zeigt, wie man viele parallele Anträge vermeidet. –