Ich verbinde mich über HTTP mit einem lokalen Server (OSRM), um Routen zu senden und Fahrzeiten zurück zu bekommen. Ich bemerke, dass I/O langsamer ist als Threading, weil die Wartezeit für die Berechnung kleiner ist als die Zeit, die benötigt wird, um die Anfrage zu senden und die JSON-Ausgabe zu verarbeiten (I/O ist besser, wenn der Server etwas Zeit benötigt) bearbeite deine Anfrage -> du willst nicht, dass sie blockiert, weil du warten musst, das ist nicht mein Fall. Das Threading leidet unter der globalen Interpreter-Sperre, und so scheint es (und der Beweis unten), dass meine schnellste Option Multiprozessing ist.Python-Anfragen - Threads/Prozesse vs. IO
Das Problem mit Multiprocessing ist, dass es so schnell ist, dass es meine Sockets erschöpft und ich bekomme einen Fehler (Anfragen stellt jedes Mal eine neue Verbindung aus). Ich kann (seriell) das Objekt requests.Sessions() verwenden, um eine Verbindung am Leben zu erhalten, aber ich kann das nicht parallel laufen lassen (jeder Prozess hat seine eigene Sitzung).
Der nächste Code, den ich im Moment arbeiten muß, ist dieser Multiprocessing Code:
conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=cpu_count())
def ReqOsrm(url_input):
ul, qid = url_input
try:
response = conn_pool.request('GET', ul)
json_geocode = json.loads(response.data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from, used_to = json_geocode['via_points']
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
return out
else:
print("Done but no route: %d %s" % (qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("%s: %d %s" % (err, qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
# run:
pool = Pool(cpu_count())
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()
Allerdings habe ich nicht die HTTPConnectionPool bekommen kann richtig funktioniert und es schafft neue Steckdosen jedes Mal (glaube ich) und dann gibt mir den Fehler:
HTTPConnectionPool(host='127.0.0.1', port=5005): Max retries exceeded with url: /viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))
Mein Ziel ist es Abstandsberechnungen von einem OSRM-routing server ich laufe vor Ort (so schnell wie möglich) zu erhalten.
Ich habe eine Frage in zwei Teilen - im Grunde versuche ich, etwas Code mit multiprocessing.Pool() zu konvertieren, um besser Code (ordnungsgemäße asynchrone Funktionen - so dass die Ausführung bricht nie und es läuft so schnell wie möglich).
Das Problem, das ich habe, ist, dass alles, was ich versuche, scheint langsamer als Multiprozessing (ich präsentiere mehrere Beispiele unten, was ich versucht habe).
einige potenzielle Methoden sind: gevents, grequests, Tornados, Anfragen-Futures, asyncio usw.
A - Multiprocessing.Pool()
ich zunächst mit etwas wie folgt begonnen:
def ReqOsrm(url_input):
req_url, query_id = url_input
try_c = 0
#print(req_url)
while try_c < 5:
try:
response = requests.get(req_url)
json_geocode = response.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
....
pool = Pool(cpu_count()-1)
calc_routes = pool.map(ReqOsrm, url_routes)
Wo war ich zu einem lokalen Server verbunden werden (localhost, Port: 5005), die auf 8 Threads und supports parallel execution ins Leben gerufen wurde.
Nach ein wenig Suche erkannte ich den Fehler, den ich bekam, war, weil Anforderungen opening a new connection/socket for each-request war. Das war nach einer Weile eigentlich zu schnell und anstrengend. Es scheint, der Weg, dies zu adressieren, ist eine Anfragen.Session() - , aber ich habe nicht in der Lage, dies mit Multiprocessing arbeiten (wo jeder Prozess hat seine eigene Sitzung).
Frage 1.
Auf einigen Computern läuft dieser in Ordnung, z.B.:
gegen später Zum Vergleich: Servernutzung und 1700 Anfragen 45% pro Sekunde
, auf einigen aber es funktioniert nicht und ich verstehe nicht ganz, warum:
HTTPConnectionPool(host='127.0.0.1', port=5000): Max retries exceeded with url: /viaroute?loc=49.34343,3.30199&loc=49.56655,3.25837&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))
Meine Vermutung wäre, dass Anfragen den Socket sperren, wenn er verwendet wird - manchmal ist der Server zu langsam, um auf die alte Anfrage zu antworten und eine neue wird generiert. Der Server unterstützt Warteschlangen, aber Anfragen nicht so, statt der Warteschlange hinzufügen, bekomme ich den Fehler?
Frage 2.
ich gefunden:
Blocking Or Non-Blocking?
With the default Transport Adapter in place, Requests does not provide any kind of non-blocking IO. The Response.content property will block until the entire response has been downloaded. If you require more granularity, the streaming features of the library (see Streaming Requests) allow you to retrieve smaller quantities of the response at a time. However, these calls will still block.
If you are concerned about the use of blocking IO, there are lots of projects out there that combine Requests with one of Python’s asynchronicity frameworks.
Two excellent examples are grequests and requests-futures.
B - Anfragen-Futures
diese Adresse ich meinen Code neu zu schreiben, benötigen asynchrone Anfragen zu verwenden, so habe ich versucht, die folgende Verwendung:
from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed
(Übrigens beginne ich meinen Server mit der Option, alle Threads verwenden)
und der Hauptcode:
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
# Submit requests and process in background
for i in range(len(url_routes)):
url_in, qid = url_routes[i] # url |query-id
future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
futures[future] = qid
# Process the futures as they become complete
for future in as_completed(futures):
r = future.result()
try:
row = [futures[future]] + r.data
except Exception as err:
print('No route')
row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
calc_routes.append(row)
Wo meine Funktion (ReqOsrm) jetzt neu geschrieben als:
def ReqOsrm(sess, resp):
json_geocode = resp.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
# Cannot find route between points (code errors as 999)
else:
out = [999, 0, 0, 0, 0, 0, 0]
resp.data = out
Allerdings ist dieser Code langsamer als der Multiprozessing ein! Bevor ich 1700 Anfragen pro Sekunde bekam, bekomme ich jetzt 600 Sekunden. Ich schätze, das liegt daran, dass ich keine volle CPU-Auslastung habe, aber ich bin mir nicht sicher, wie ich es erhöhen kann.
C - Gewinde
Ich habe versucht eine andere Methode (creating threads) - war aber wieder nicht sicher, wie diese erhalten CPU-Auslastung zu maximieren (im Idealfall möchte ich meine Server sehen, 50 mit %, Nein?):
def doWork():
while True:
url,qid = q.get()
status, resp = getReq(url)
processReq(status, resp, qid)
q.task_done()
def getReq(url):
try:
resp = requests.get(url)
return resp.status_code, resp
except:
return 999, None
def processReq(status, resp, qid):
try:
json_geocode = resp.json()
# Found route between points
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
else:
print("Done but no route")
out = [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("Error: %s" % err)
out = [qid, 999, 0, 0, 0, 0, 0, 0]
qres.put(out)
return
#Run:
concurrent = 1000
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
try:
for url in url_routes:
q.put(url)
q.join()
except Exception:
pass
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]
Diese Methode ist schneller als requests_futures Ich denke, aber ich weiß nicht, wie viele Threads diese zu maximieren zu setzen -
D - Tornado (nicht funktioniert)
Ich versuche jetzt Tornado - aber kann es nicht ganz bekommen funktioniert es bricht mit existieren Code -1073741819 wenn ich curl benutze - wenn ich simple_httpclient es funktioniert, aber dann bekomme ich Timeout-Fehler:
ERROR:tornado.application:Multiple exceptions in yield list Traceback (most recent call last): File "C:\Anaconda3\lib\site-packages\tornado\gen.py", line 789, in callback result_list.append(f.result()) File "C:\Anaconda3\lib\site-packages\tornado\concurrent.py", line 232, in result raise_exc_info(self._exc_info) File "", line 3, in raise_exc_info tornado.httpclient.HTTPError: HTTP 599: Timeout
def handle_req(r):
try:
json_geocode = json_decode(r)
status = int(json_geocode['status'])
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
print(out)
except Exception as err:
print(err)
out = [999, 0, 0, 0, 0, 0, 0]
return out
# Configure
# For some reason curl_httpclient crashes my computer
AsyncHTTPClient.configure("tornado.simple_httpclient.SimpleAsyncHTTPClient", max_clients=10)
@gen.coroutine
def run_experiment(urls):
http_client = AsyncHTTPClient()
responses = yield [http_client.fetch(url) for url, qid in urls]
responses_out = [handle_req(r.body) for r in responses]
raise gen.Return(value=responses_out)
# Initialise
_ioloop = ioloop.IOLoop.instance()
run_func = partial(run_experiment, url_routes)
calc_routes = _ioloop.run_sync(run_func)
E - asyncio/aiohttp
Beschlossen einen anderen Ansatz, um zu versuchen (wenn auch groß sein würde Tornado arbeiten zu bekommen) mit asyncio und aiohttp.
Das funktioniert OK, aber immer noch langsamer als Multiprozessing!
ein weiterer Ansatz, andere als zu versuchen, um mit optimaler Threadpoolgröße zu täuschen ist eine verwenden, Ereignisschleife. Sie könnten Anfragen mit einem Rückruf registrieren und darauf warten, dass die Ereignisschleife bei jeder Antwort beantwortet wird. – dm03514
@ dm03514 Danke dafür! Aber ist das nicht das, was ich habe, wenn ich meine Anfrage-Futures-Beispiel mache? 'future = session.get (url_in, background_callback = lambda sess, resp: ReqOsrm (sess, resp))' – mptevsion
Ich habe RequestFuture noch nie benutzt, aber ich glaube, dass es sich immer noch in einem Threadpool befindet, die Ereignisschleife sollte neu sein Request-Modell alle zusammen, und wird nur einen einzigen Thread entlarven, so müssen Sie sich keine Gedanken über die Anzahl der Threads zu konfigurieren, um Arbeit zu tun :) python hat eine in stdlibrary https://pypi.python.org/pypi/aiohttp , was ich noch nie benutzt habe, aber relativ einfach aussieht, Tornado ist ein Framework, das auf OS-Ereignisbibliotheken aufgebaut ist und eine einfache API hat. http://tornadokevinlee.readthedocs.org/en/latest/httpclient.html – dm03514