12

Ich verbinde mich über HTTP mit einem lokalen Server (OSRM), um Routen zu senden und Fahrzeiten zurück zu bekommen. Ich bemerke, dass I/O langsamer ist als Threading, weil die Wartezeit für die Berechnung kleiner ist als die Zeit, die benötigt wird, um die Anfrage zu senden und die JSON-Ausgabe zu verarbeiten (I/O ist besser, wenn der Server etwas Zeit benötigt) bearbeite deine Anfrage -> du willst nicht, dass sie blockiert, weil du warten musst, das ist nicht mein Fall. Das Threading leidet unter der globalen Interpreter-Sperre, und so scheint es (und der Beweis unten), dass meine schnellste Option Multiprozessing ist.Python-Anfragen - Threads/Prozesse vs. IO

Das Problem mit Multiprocessing ist, dass es so schnell ist, dass es meine Sockets erschöpft und ich bekomme einen Fehler (Anfragen stellt jedes Mal eine neue Verbindung aus). Ich kann (seriell) das Objekt requests.Sessions() verwenden, um eine Verbindung am Leben zu erhalten, aber ich kann das nicht parallel laufen lassen (jeder Prozess hat seine eigene Sitzung).

Der nächste Code, den ich im Moment arbeiten muß, ist dieser Multiprocessing Code:

conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=cpu_count()) 

def ReqOsrm(url_input): 
    ul, qid = url_input  
    try: 
     response = conn_pool.request('GET', ul) 
     json_geocode = json.loads(response.data.decode('utf-8')) 
     status = int(json_geocode['status']) 
     if status == 200: 
      tot_time_s = json_geocode['route_summary']['total_time'] 
      tot_dist_m = json_geocode['route_summary']['total_distance'] 
      used_from, used_to = json_geocode['via_points'] 
      out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]] 
      return out 
     else: 
      print("Done but no route: %d %s" % (qid, req_url)) 
      return [qid, 999, 0, 0, 0, 0, 0, 0] 
    except Exception as err: 
     print("%s: %d %s" % (err, qid, req_url)) 
     return [qid, 999, 0, 0, 0, 0, 0, 0] 

# run: 
pool = Pool(cpu_count()) 
calc_routes = pool.map(ReqOsrm, url_routes) 
pool.close() 
pool.join() 

Allerdings habe ich nicht die HTTPConnectionPool bekommen kann richtig funktioniert und es schafft neue Steckdosen jedes Mal (glaube ich) und dann gibt mir den Fehler:

HTTPConnectionPool(host='127.0.0.1', port=5005): Max retries exceeded with url: /viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))


Mein Ziel ist es Abstandsberechnungen von einem OSRM-routing server ich laufe vor Ort (so schnell wie möglich) zu erhalten.

Ich habe eine Frage in zwei Teilen - im Grunde versuche ich, etwas Code mit multiprocessing.Pool() zu konvertieren, um besser Code (ordnungsgemäße asynchrone Funktionen - so dass die Ausführung bricht nie und es läuft so schnell wie möglich).

Das Problem, das ich habe, ist, dass alles, was ich versuche, scheint langsamer als Multiprozessing (ich präsentiere mehrere Beispiele unten, was ich versucht habe).

einige potenzielle Methoden sind: gevents, grequests, Tornados, Anfragen-Futures, asyncio usw.

A - Multiprocessing.Pool()

ich zunächst mit etwas wie folgt begonnen:

def ReqOsrm(url_input): 
    req_url, query_id = url_input 
    try_c = 0 
    #print(req_url) 
    while try_c < 5: 
     try: 
      response = requests.get(req_url) 
      json_geocode = response.json() 
      status = int(json_geocode['status']) 
      # Found route between points 
      if status == 200: 
      .... 

pool = Pool(cpu_count()-1) 
calc_routes = pool.map(ReqOsrm, url_routes) 

Wo war ich zu einem lokalen Server verbunden werden (localhost, Port: 5005), die auf 8 Threads und supports parallel execution ins Leben gerufen wurde.

Nach ein wenig Suche erkannte ich den Fehler, den ich bekam, war, weil Anforderungen opening a new connection/socket for each-request war. Das war nach einer Weile eigentlich zu schnell und anstrengend. Es scheint, der Weg, dies zu adressieren, ist eine Anfragen.Session() - , aber ich habe nicht in der Lage, dies mit Multiprocessing arbeiten (wo jeder Prozess hat seine eigene Sitzung).

Frage 1.

Auf einigen Computern läuft dieser in Ordnung, z.B.:

enter image description here

gegen später Zum Vergleich: Servernutzung und 1700 Anfragen 45% pro Sekunde

, auf einigen aber es funktioniert nicht und ich verstehe nicht ganz, warum:

HTTPConnectionPool(host='127.0.0.1', port=5000): Max retries exceeded with url: /viaroute?loc=49.34343,3.30199&loc=49.56655,3.25837&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))

Meine Vermutung wäre, dass Anfragen den Socket sperren, wenn er verwendet wird - manchmal ist der Server zu langsam, um auf die alte Anfrage zu antworten und eine neue wird generiert. Der Server unterstützt Warteschlangen, aber Anfragen nicht so, statt der Warteschlange hinzufügen, bekomme ich den Fehler?

Frage 2.

ich gefunden:

Blocking Or Non-Blocking?

With the default Transport Adapter in place, Requests does not provide any kind of non-blocking IO. The Response.content property will block until the entire response has been downloaded. If you require more granularity, the streaming features of the library (see Streaming Requests) allow you to retrieve smaller quantities of the response at a time. However, these calls will still block.

If you are concerned about the use of blocking IO, there are lots of projects out there that combine Requests with one of Python’s asynchronicity frameworks.

Two excellent examples are grequests and requests-futures.

B - Anfragen-Futures

diese Adresse ich meinen Code neu zu schreiben, benötigen asynchrone Anfragen zu verwenden, so habe ich versucht, die folgende Verwendung:

from requests_futures.sessions import FuturesSession 
from concurrent.futures import ThreadPoolExecutor, as_completed 

(Übrigens beginne ich meinen Server mit der Option, alle Threads verwenden)

und der Hauptcode:

calc_routes = [] 
futures = {} 
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session: 
    # Submit requests and process in background 
    for i in range(len(url_routes)): 
     url_in, qid = url_routes[i] # url |query-id 
     future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp)) 
     futures[future] = qid 
    # Process the futures as they become complete 
    for future in as_completed(futures): 
     r = future.result() 
     try: 
      row = [futures[future]] + r.data 
     except Exception as err: 
      print('No route') 
      row = [futures[future], 999, 0, 0, 0, 0, 0, 0] 
     calc_routes.append(row) 

Wo meine Funktion (ReqOsrm) jetzt neu geschrieben als:

def ReqOsrm(sess, resp): 
    json_geocode = resp.json() 
    status = int(json_geocode['status']) 
    # Found route between points 
    if status == 200: 
     tot_time_s = json_geocode['route_summary']['total_time'] 
     tot_dist_m = json_geocode['route_summary']['total_distance'] 
     used_from = json_geocode['via_points'][0] 
     used_to = json_geocode['via_points'][1] 
     out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]] 
    # Cannot find route between points (code errors as 999) 
    else: 
     out = [999, 0, 0, 0, 0, 0, 0] 
    resp.data = out 

Allerdings ist dieser Code langsamer als der Multiprozessing ein! Bevor ich 1700 Anfragen pro Sekunde bekam, bekomme ich jetzt 600 Sekunden. Ich schätze, das liegt daran, dass ich keine volle CPU-Auslastung habe, aber ich bin mir nicht sicher, wie ich es erhöhen kann.

enter image description here

C - Gewinde

Ich habe versucht eine andere Methode (creating threads) - war aber wieder nicht sicher, wie diese erhalten CPU-Auslastung zu maximieren (im Idealfall möchte ich meine Server sehen, 50 mit %, Nein?):

def doWork(): 
    while True: 
     url,qid = q.get() 
     status, resp = getReq(url) 
     processReq(status, resp, qid) 
     q.task_done() 

def getReq(url): 
    try: 
     resp = requests.get(url) 
     return resp.status_code, resp 
    except: 
     return 999, None 

def processReq(status, resp, qid): 
    try: 
     json_geocode = resp.json() 
     # Found route between points 
     if status == 200: 
      tot_time_s = json_geocode['route_summary']['total_time'] 
      tot_dist_m = json_geocode['route_summary']['total_distance'] 
      used_from = json_geocode['via_points'][0] 
      used_to = json_geocode['via_points'][1] 
      out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]] 
     else: 
      print("Done but no route") 
      out = [qid, 999, 0, 0, 0, 0, 0, 0] 
    except Exception as err: 
     print("Error: %s" % err) 
     out = [qid, 999, 0, 0, 0, 0, 0, 0] 
    qres.put(out) 
    return 

#Run: 
concurrent = 1000 
qres = Queue() 
q = Queue(concurrent) 

for i in range(concurrent): 
    t = Thread(target=doWork) 
    t.daemon = True 
    t.start() 
try: 
    for url in url_routes: 
     q.put(url) 
     q.join() 
    except Exception: 
     pass 

# Get results 
calc_routes = [qres.get() for _ in range(len(url_routes))] 

Diese Methode ist schneller als requests_futures Ich denke, aber ich weiß nicht, wie viele Threads diese zu maximieren zu setzen -

enter image description here

D - Tornado (nicht funktioniert)

Ich versuche jetzt Tornado - aber kann es nicht ganz bekommen funktioniert es bricht mit existieren Code -1073741819 wenn ich curl benutze - wenn ich simple_httpclient es funktioniert, aber dann bekomme ich Timeout-Fehler:

ERROR:tornado.application:Multiple exceptions in yield list Traceback (most recent call last): File "C:\Anaconda3\lib\site-packages\tornado\gen.py", line 789, in callback result_list.append(f.result()) File "C:\Anaconda3\lib\site-packages\tornado\concurrent.py", line 232, in result raise_exc_info(self._exc_info) File "", line 3, in raise_exc_info tornado.httpclient.HTTPError: HTTP 599: Timeout

def handle_req(r): 
    try: 
     json_geocode = json_decode(r) 
     status = int(json_geocode['status']) 
     tot_time_s = json_geocode['route_summary']['total_time'] 
     tot_dist_m = json_geocode['route_summary']['total_distance'] 
     used_from = json_geocode['via_points'][0] 
     used_to = json_geocode['via_points'][1] 
     out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]] 
     print(out) 
    except Exception as err: 
     print(err) 
     out = [999, 0, 0, 0, 0, 0, 0] 
    return out 

# Configure 
# For some reason curl_httpclient crashes my computer 
AsyncHTTPClient.configure("tornado.simple_httpclient.SimpleAsyncHTTPClient", max_clients=10) 

@gen.coroutine 
def run_experiment(urls): 
    http_client = AsyncHTTPClient() 
    responses = yield [http_client.fetch(url) for url, qid in urls] 
    responses_out = [handle_req(r.body) for r in responses] 
    raise gen.Return(value=responses_out) 

# Initialise 
_ioloop = ioloop.IOLoop.instance() 
run_func = partial(run_experiment, url_routes) 
calc_routes = _ioloop.run_sync(run_func) 

E - asyncio/aiohttp

Beschlossen einen anderen Ansatz, um zu versuchen (wenn auch groß sein würde Tornado arbeiten zu bekommen) mit asyncio und aiohttp.

Das funktioniert OK, aber immer noch langsamer als Multiprozessing!

enter image description here

+1

ein weiterer Ansatz, andere als zu versuchen, um mit optimaler Threadpoolgröße zu täuschen ist eine verwenden, Ereignisschleife. Sie könnten Anfragen mit einem Rückruf registrieren und darauf warten, dass die Ereignisschleife bei jeder Antwort beantwortet wird. – dm03514

+0

@ dm03514 Danke dafür! Aber ist das nicht das, was ich habe, wenn ich meine Anfrage-Futures-Beispiel mache? 'future = session.get (url_in, background_callback = lambda sess, resp: ReqOsrm (sess, resp))' – mptevsion

+1

Ich habe RequestFuture noch nie benutzt, aber ich glaube, dass es sich immer noch in einem Threadpool befindet, die Ereignisschleife sollte neu sein Request-Modell alle zusammen, und wird nur einen einzigen Thread entlarven, so müssen Sie sich keine Gedanken über die Anzahl der Threads zu konfigurieren, um Arbeit zu tun :) python hat eine in stdlibrary https://pypi.python.org/pypi/aiohttp , was ich noch nie benutzt habe, aber relativ einfach aussieht, Tornado ist ein Framework, das auf OS-Ereignisbibliotheken aufgebaut ist und eine einfache API hat. http://tornadokevinlee.readthedocs.org/en/latest/httpclient.html – dm03514

Antwort

1

Blick auf Ihren Multiprocessing-Code am Anfang der Frage. Es scheint, dass ein HttpConnectionPool() jedes Mal aufgerufen wird, wenn ReqOsrm aufgerufen wird. Daher wird für jede URL ein neuer Pool erstellt.Verwenden Sie stattdessen die Parameter initializer und args, um einen einzelnen Pool für jeden Prozess zu erstellen.

conn_pool = None 

def makePool(host, port): 
    global conn_pool 
    pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1) 

def ReqOsrm(url_input): 
    ul, qid = url_input 

    try: 
     response = conn_pool.request('GET', ul) 
     json_geocode = json.loads(response.data.decode('utf-8')) 
     status = int(json_geocode['status']) 
     if status == 200: 
      tot_time_s = json_geocode['route_summary']['total_time'] 
      tot_dist_m = json_geocode['route_summary']['total_distance'] 
      used_from, used_to = json_geocode['via_points'] 
      out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]] 
      return out 

     else: 
      print("Done but no route: %d %s" % (qid, req_url)) 
      return [qid, 999, 0, 0, 0, 0, 0, 0] 

    except Exception as err: 
     print("%s: %d %s" % (err, qid, req_url)) 
     return [qid, 999, 0, 0, 0, 0, 0, 0] 

if __name__ == "__main__": 
    # run: 
    pool = Pool(initializer=makePool, initargs=('127.0.0.1', 5005)) 
    calc_routes = pool.map(ReqOsrm, url_routes) 
    pool.close() 
    pool.join() 

Die Anfrage-Futures-Version scheint einen Einrückungsfehler zu haben. Die Schleife for future in as_completed(futures): ist unter der äußeren Schleife for i in range(len(url_routes)): eingerückt. Also wird eine Anfrage in der äußeren Schleife gemacht und dann wartet die innere Schleife darauf, dass diese Zukunft vor der nächsten Iteration der äußeren Schleife zurückkehrt. Dies führt dazu, dass die Anforderungen seriell und nicht parallel ausgeführt werden.

Ich denke, der Code sollte wie folgt lauten:

calc_routes = [] 
futures = {} 
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session: 
    # Submit all the requests and process in background 
    for i in range(len(url_routes)): 
     url_in, qid = url_routes[i] # url |query-id 
     future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp)) 
     futures[future] = qid 

    # this was indented under the code in section B of the question 
    # process the futures as they become copmlete 
    for future in as_completed(futures): 
     r = future.result() 
     try: 
      row = [futures[future]] + r.data 

     except Exception as err: 
      print('No route') 
      row = [futures[future], 999, 0, 0, 0, 0, 0, 0] 
     print(row) 
     calc_routes.append(row) 
2

Frage 1

Sie den Fehler, denn dieser Ansatz:

def ReqOsrm(url_input): 
    req_url, query_id = url_input 
    try_c = 0 
    #print(req_url) 
    while try_c < 5: 
     try: 
      response = requests.get(req_url) 
      json_geocode = response.json() 
      status = int(json_geocode['status']) 
      # Found route between points 
      if status == 200: 
      .... 

pool = Pool(cpu_count()-1) 
calc_routes = pool.map(ReqOsrm, url_routes) 

eine neue TCP-Verbindung für jede angeforderte URL erstellt und auf einem gewissen Punkt ist es nicht nur, weil Das System verfügt nicht über freie lokale Ports. Um zu bestätigen, dass Sie netstat ausführen können, während Code ausgeführt wird:

netstat -a -n | find /c "localhost:5005" 

Dies wird Ihnen eine Reihe von Verbindungen zum Server.

Auch das Erreichen von 1700 RPS sieht für diesen Ansatz ziemlich unrealistisch aus, da requests.get eine ziemlich teure Operation ist und es unwahrscheinlich ist, dass Sie sogar 50 RPS auf diese Weise erhalten können. Also müssen Sie wahrscheinlich Ihre RPS-Berechnungen überprüfen.

, welchen Fehler Sie Sitzungen müssen Um zu vermeiden, verwenden anstelle von Verbindungen aus dem Grund auf neu erstellen:

import multiprocessing 
import requests 
import time 


class Worker(multiprocessing.Process): 
    def __init__(self, qin, qout, *args, **kwargs): 
     super(Worker, self).__init__(*args, **kwargs) 
     self.qin = qin 
     self.qout = qout 

    def run(self): 
     s = requests.session() 
     while not self.qin.empty(): 
      result = s.get(self.qin.get()) 
      self.qout.put(result) 
      self.qin.task_done() 

if __name__ == '__main__': 
    start = time.time() 

    qin = multiprocessing.JoinableQueue() 
    [qin.put('http://localhost:8080/') for _ in range(10000)] 

    qout = multiprocessing.Queue() 

    [Worker(qin, qout).start() for _ in range(multiprocessing.cpu_count())] 

    qin.join() 

    result = [] 
    while not qout.empty(): 
     result.append(qout.get()) 

    print time.time() - start 
    print result 

Frage 2

Sie werden nicht höher erhalten RPS mit Gewinde oder Asynchron es sei denn, ich Ansätze/O benötigt mehr Zeit als Berechnungen (z. B. hohe Netzwerklatenz, große Antworten usw.), da Threads von GIL betroffen sind, da sie im selben Python-Prozess ausgeführt werden und asynchrone Bibliotheken durch lange laufende Berechnungen blockiert werden können.

Obwohl Threads oder asynchrone Bibliotheken die Leistung verbessern können, führt die Ausführung desselben Thread- oder asynchronen Codes in mehreren Prozessen trotzdem zu mehr Leistung.

5

Danke an alle für die Hilfe. Ich wollte meine Schlussfolgerungen posten:

Da meine HTTP-Anfragen an einen lokalen Server sind, der die Anfrage sofort verarbeitet, macht es wenig Sinn für mich, asynchrone Ansätze zu verwenden (im Vergleich zu den meisten Fällen, wenn Anfragen über das Internet gesendet werden). Der kostspielige Faktor für mich ist, die Anfrage zu senden und das Feedback zu verarbeiten, was bedeutet, dass ich mit mehreren Prozessen viel bessere Geschwindigkeiten erziele (Threads leiden unter GIL). Ich sollte auch Sitzungen verwenden, um die Geschwindigkeit zu erhöhen (keine Notwendigkeit, eine Verbindung zum SAME-Server zu schließen und wieder zu öffnen), und dabei helfen, Port-Erschöpfung zu verhindern.

Hier sind alle Methoden versucht (in Betrieb) mit Beispiel RPS:

Serien

S1. Serielle GET-Anforderung (keine Sitzung) -> 215 RPS

def ReqOsrm(data): 
    url, qid = data 
    try: 
     response = requests.get(url) 
     json_geocode = json.loads(response.content.decode('utf-8')) 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     return [qid, 200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     return [qid, 999, 0, 0] 
# Run:  
calc_routes = [ReqOsrm(x) for x in url_routes] 

S2. Serielle GET-Anforderung (requests.Session()) -> 335 RPS

S3. Seriell-GET-Anfrage (urllib3.HTTPConnectionPool) -> 545 RPS

conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=1) 
def ReqOsrm(data): 
    url, qid = data 
    try: 
     response = conn_pool.request('GET', url) 
     json_geocode = json.loads(response.data.decode('utf-8')) 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     return [qid, 200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     return [qid, 999, 0, 0] 
# Run:  
calc_routes = [ReqOsrm(x) for x in url_routes] 

Async IO

A4. AsyncIO mit aiohttp -> 450 RPS

import asyncio 
import aiohttp 
concurrent = 100 
def handle_req(data, qid): 
    json_geocode = json.loads(data.decode('utf-8')) 
    tot_time_s = json_geocode['paths'][0]['time'] 
    tot_dist_m = json_geocode['paths'][0]['distance'] 
    return [qid, 200, tot_time_s, tot_dist_m] 
def chunked_http_client(num_chunks): 
    # Use semaphore to limit number of requests 
    semaphore = asyncio.Semaphore(num_chunks) 
    @asyncio.coroutine 
    # Return co-routine that will download files asynchronously and respect 
    # locking fo semaphore 
    def http_get(url, qid): 
     nonlocal semaphore 
     with (yield from semaphore): 
      with aiohttp.ClientSession() as session: 
       response = yield from session.get(url) 
       body = yield from response.content.read() 
       yield from response.wait_for_close() 
     return body, qid 
    return http_get 
def run_experiment(urls): 
    http_client = chunked_http_client(num_chunks=concurrent) 
    # http_client returns futures, save all the futures to a list 
    tasks = [http_client(url, qid) for url, qid in urls] 
    response = [] 
    # wait for futures to be ready then iterate over them 
    for future in asyncio.as_completed(tasks): 
     data, qid = yield from future 
     try: 
      out = handle_req(data, qid) 
     except Exception as err: 
      print("Error for {0} - {1}".format(qid,err)) 
      out = [qid, 999, 0, 0] 
     response.append(out) 
    return response 
# Run: 
loop = asyncio.get_event_loop() 
calc_routes = loop.run_until_complete(run_experiment(url_routes)) 

A5. Gewindeschneiden ohne Sitzungen -> 330 RPS

from threading import Thread 
from queue import Queue 
concurrent = 100 
def doWork(): 
    while True: 
     url,qid = q.get() 
     status, resp = getReq(url) 
     processReq(status, resp, qid) 
     q.task_done() 
def getReq(url): 
    try: 
     resp = requests.get(url) 
     return resp.status_code, resp 
    except: 
     return 999, None 
def processReq(status, resp, qid): 
    try: 
     json_geocode = json.loads(resp.content.decode('utf-8')) 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     out = [qid, 200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     print("Error: ", err, qid, url) 
     out = [qid, 999, 0, 0] 
    qres.put(out) 
    return 
#Run: 
qres = Queue() 
q = Queue(concurrent) 
for i in range(concurrent): 
    t = Thread(target=doWork) 
    t.daemon = True 
    t.start() 
for url in url_routes: 
    q.put(url) 
q.join() 
# Get results 
calc_routes = [qres.get() for _ in range(len(url_routes))] 

A6. Threading mit HTTPConnectionPool -> 1550 RPS

from threading import Thread 
from queue import Queue 
from urllib3 import HTTPConnectionPool 
concurrent = 100 
conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=concurrent) 
def doWork(): 
    while True: 
     url,qid = q.get() 
     status, resp = getReq(url) 
     processReq(status, resp, qid) 
     q.task_done() 
def getReq(url): 
    try: 
     resp = conn_pool.request('GET', url) 
     return resp.status, resp 
    except: 
     return 999, None 
def processReq(status, resp, qid): 
    try: 
     json_geocode = json.loads(resp.data.decode('utf-8')) 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     out = [qid, 200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     print("Error: ", err, qid, url) 
     out = [qid, 999, 0, 0] 
    qres.put(out) 
    return 
#Run: 
qres = Queue() 
q = Queue(concurrent) 
for i in range(concurrent): 
    t = Thread(target=doWork) 
    t.daemon = True 
    t.start() 
for url in url_routes: 
    q.put(url) 
q.join() 
# Get results 
calc_routes = [qres.get() for _ in range(len(url_routes))] 

A7. Anfragen-Futures -> 520 RPS

from requests_futures.sessions import FuturesSession 
from concurrent.futures import ThreadPoolExecutor, as_completed 
concurrent = 100 
def ReqOsrm(sess, resp): 
    try: 
     json_geocode = resp.json() 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     out = [200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     print("Error: ", err) 
     out = [999, 0, 0] 
    resp.data = out 
#Run: 
calc_routes = [] 
futures = {} 
with FuturesSession(executor=ThreadPoolExecutor(max_workers=concurrent)) as session: 
    # Submit requests and process in background 
    for i in range(len(url_routes)): 
     url_in, qid = url_routes[i] # url |query-id 
     future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp)) 
     futures[future] = qid 
    # Process the futures as they become complete 
    for future in as_completed(futures): 
     r = future.result() 
     try: 
      row = [futures[future]] + r.data 
     except Exception as err: 
      print('No route') 
      row = [futures[future], 999, 0, 0] 
     calc_routes.append(row) 

mehrere Prozesse

P8. multiprocessing.worker + Queue + requests.session() -> 1058 RPS

from multiprocessing import * 
class Worker(Process): 
    def __init__(self, qin, qout, *args, **kwargs): 
     super(Worker, self).__init__(*args, **kwargs) 
     self.qin = qin 
     self.qout = qout 
    def run(self): 
     s = requests.session() 
     while not self.qin.empty(): 
      url, qid = self.qin.get() 
      data = s.get(url) 
      self.qout.put(ReqOsrm(data, qid)) 
      self.qin.task_done() 
def ReqOsrm(resp, qid): 
    try: 
     json_geocode = json.loads(resp.content.decode('utf-8')) 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     return [qid, 200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     print("Error: ", err, qid) 
     return [qid, 999, 0, 0] 
# Run: 
qout = Queue() 
qin = JoinableQueue() 
[qin.put(url_q) for url_q in url_routes] 
[Worker(qin, qout).start() for _ in range(cpu_count())] 
qin.join() 
calc_routes = [] 
while not qout.empty(): 
    calc_routes.append(qout.get()) 

P9. Mehrfachverarbeitung.Arbeiter + Warteschlange + HTTPConnectionPool() -> 1230 RPS

P10. v2 Multiprozessing (nicht wirklich sicher, wie dies anders ist) -> 1350 RPS

conn_pool = None 
def makePool(host, port): 
    global conn_pool 
    pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1) 
def ReqOsrm(data): 
    url, qid = data 
    try: 
     response = conn_pool.request('GET', url) 
     json_geocode = json.loads(response.data.decode('utf-8')) 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     return [qid, 200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     print("Error: ", err, qid, url) 
     return [qid, 999, 0, 0] 
# Run: 
pool = Pool(initializer=makePool, initargs=(ghost, gport)) 
calc_routes = pool.map(ReqOsrm, url_routes) 

Also abschließend scheint es, dass die besten Methoden sind für mich # 10 (und überraschend 6 #)

+1

Ein anderer Ansatz, den Sie versuchen könnten, ist Multiprozessing mit asyncio (oder gevent) zu verwenden. Ich habe nur gevent verwendet, aber es kann nur einen einzigen Kern nutzen, da es sich um Singlethread-Coroutinen handelt. Coroutine-Switches sollten schneller als Threads sein, sodass Multiprocessing + Coroutines am schnellsten sein könnten. –

+0

Werden Sie eine Antwort auswählen? – RootTwo

+0

Ich erhalte den Fehler: ChunkedEncodingError (ProtocolError ('Verbindung unterbrochen: IncompleteRead (162 Bytes gelesen)', IncompleteRead (162 Bytes gelesen)), wenn P8 ausgeführt wird – Phillip

1

Hier ein Muster, das ich mit gevent verwendet habe, das auf Koroutine basiert und möglicherweise nicht unter GIL leidet. Dies kann schneller sein als die Verwendung von Threads und vielleicht am schnellsten, wenn in Kombination mit Multiprocessing verwendet (zur Zeit würde es nur 1 Kern verwenden):

from gevent import monkey 
monkey.patch_all() 

import logging 
import random 
import time 
from threading import Thread 

from gevent.queue import JoinableQueue 
from logger import initialize_logger 

initialize_logger() 
log = logging.getLogger(__name__) 


class Worker(Thread): 

    def __init__(self, worker_idx, queue): 
     # initialize the base class 
     super(Worker, self).__init__() 
     self.worker_idx = worker_idx 
     self.queue = queue 

    def log(self, msg): 
     log.info("WORKER %s - %s" % (self.worker_idx, msg)) 

    def do_work(self, line): 
     #self.log(line) 
     time.sleep(random.random()/10) 

    def run(self): 
     while True: 
      line = self.queue.get() 
      self.do_work(line) 
      self.queue.task_done() 


def main(number_of_workers=20): 
    start_time = time.time() 

    queue = JoinableQueue() 
    for idx in range(number_of_workers): 
     worker = Worker(idx, queue) 
     # "daemonize" a thread to ensure that the threads will 
     # close when the main program finishes 
     worker.daemon = True 
     worker.start() 

    for idx in xrange(100): 
     queue.put("%s" % idx) 

    queue.join() 
    time_taken = time.time() - start_time 
    log.info("Parallel work took %s seconds." % time_taken) 

    start_time = time.time() 
    for idx in xrange(100): 
     #log.info(idx) 
     time.sleep(random.random()/10) 
    time_taken = time.time() - start_time 
    log.info("Sync work took %s seconds." % time_taken) 


if __name__ == "__main__": 
    main()