2013-04-03 5 views
5

Wir haben eine Reihe von Jobs und Mitarbeitern, die diese Jobs nacheinander bearbeiten. Für jeden Job müssen wir einige Daten formatieren und eine HTTP-POST-Anforderung mit den Daten als Anforderungsnutzdaten ausgeben.Wie sende ich asynchrone HTTP-Anfragen einzeln in Python?

Wie kann jeder Mitarbeiter diese HTTP-POST-Anfragen asynchron in einer single-threaded, non-blocking Weise ausgeben? Die Antwort aus der Anfrage ist uns egal - wir wollen nur, dass die Anfrage so schnell wie möglich ausgeführt wird und der Mitarbeiter sofort zum nächsten Job wechselt.

Wir haben mit gevent und die grequests Bibliothek (siehe Why does gevent.spawn not execute the parameterized function until a call to Greenlet.join?) erkundet. Unsere Arbeiter Code sieht etwa so aus:

def execute_task(worker, job): 

    print "About to spawn request" 
    greenlet = gevent.spawn(requests.post, url, params=params) 

    print "Request spawned, about to call sleep" 
    gevent.sleep() 

    print "Greenlet status: ", greenlet.ready() 

Die erste Print-Anweisung ausgeführt wird, aber die zweite und dritte Druck Aussagen nie gedruckt werden und die URL wird nie getroffen.

Wie können wir diese asynchronen Anforderungen ausführen?

+0

Es gibt eine Standard-Bibliothek mit dem Namen [asyncore] (http://docs.python.org/2/library/asyncore.html), die für Ihren Anwendungsfall jedoch möglicherweise zu niedrig ist. – lucasg

+0

Ich würde @georgesl auf dieser Seite zustimmen müssen, asyncore wäre ein großartiger Ort, um zu migrieren, weil es Ihnen bessere Flexibilität über Ihre Anwendung für die spätere Entwicklung gibt. Auch "http: // stackoverflow.com/questions/15753901/python-asyncore-client-socket-kann-nicht-determaine-connection-status/15754244 # 15754244" hier ist ein guter Anfang und ein Beispiel, wie es verwendet werden kann (siehe die Antwort auf meine Frage). Wenn nicht, müssten Sie es tatsächlich in mehreren Prozessen tun, selbst die "Unter" -Bibliotheken von Python werden es höchstwahrscheinlich für Sie fädeln, wenn Anfragen parallel gesendet werden können, das ist die Sache über Multi-Prozess – Torxed

+0

Ihr gevent code sieht okay aus (und ein schneller Test sagt mir, es funktioniert ganz gut, ich benutze gevent 1.0b3). Ich denke, es hängt vom Kontext ab, in dem 'execute_task' aufgerufen wird. – robertklep

Antwort

1

1) stellen ein Objekt Queue.Queue

2) so viele „Arbeiter“ Fäden machen, wie Sie wollen, welche Schleife und gelesen von der Queue.Queue

3) speisen die Aufträge in die Warteschlange. Queue

die Arbeitsthreads die Queue.Queue in der Reihenfolge, wie sie angeordnet sind, auf sie

Beispiel die Zeilen aus einer Datei liest und stellt sie in einer Queue.Queue

abgelesen werden
import sys 
import urllib2 
import urllib 
from Queue import Queue 
import threading 
import re 

THEEND = "TERMINATION-NOW-THE-END" 


#read from file into Queue.Queue asynchronously 
class QueueFile(threading.Thread): 
    def run(self): 
     if not(isinstance(self.myq, Queue)): 
      print "Queue not set to a Queue" 
      sys.exit(1) 
     h = open(self.f, 'r') 
     for l in h: 
      self.myq.put(l.strip()) # this will block if the queue is full 
     self.myq.put(THEEND) 

    def set_queue(self, q): 
     self.myq = q 

    def set_file(self, f): 
     self.f = f 

Eine Vorstellung davon, was ein Arbeiter-Thread wie (nur als Beispiel) sein könnte

class myWorker(threading.Thread): 
    def run(self): 
     while(running):   
      try: 
       data = self.q.get() # read from fifo 

       req = urllib2.Request("http://192.168.1.10/url/path") 
       req.add_data(urllib.urlencode(data)) 
       h1 = urllib2.urlopen(req, timeout=10) 
       res = h1.read() 
       assert(len(res) > 80) 

      except urllib2.HTTPError, e: 
       print e 

      except urllib2.URLError, e: 
       print "done %d reqs " % n 
       print e 
       sys.exit() 

die Objekte auf threading.Thread gehen, erstellen Sie das Objekt dann rufen Sie „Start“ auf der Instanz

basiert machen
1

Sie müssten es in verschiedenen Threads ausführen oder die integrierte Asyncore-Bibliothek verwenden. Die meisten Bibliotheken werden Threading utelisieren, ohne dass Sie es überhaupt wissen, oder es wird sich auf asyncore stützen, was ein Standardteil von Python ist.

Hier ist eine Kombination aus Threading und asyncore:

#!/usr/bin/python 
# -*- coding: iso-8859-15 -*- 
import asyncore, socket 
from threading import * 
from time import sleep 
from os import _exit 
from logger import * # <- Non-standard library containing a log function 
from config import * # <- Non-standard library containing settings such as "server" 

class logDispatcher(Thread, asyncore.dispatcher): 
    def __init__(self, config=None): 
     self.inbuffer = '' 
     self.buffer = '' 
     self.lockedbuffer = False 
     self.is_writable = False 

     self.is_connected = False 

     self.exit = False 
     self.initated = False 

     asyncore.dispatcher.__init__(self) 
     Thread.__init__(self) 

     self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 
     try: 
      self.connect((server, server_port)) 
     except: 
      log('Could not connect to ' + server, 'LOG_SOCK') 
      return None 

     self.start() 

    def handle_connect_event(self): 
     self.is_connected = True 

    def handle_connect(self): 
     self.is_connected = True 
     log('Connected to ' + str(server), 'LOG_SOCK') 

    def handle_close(self): 
     self.is_connected = False 
     self.close() 

    def handle_read(self): 
     data = self.recv(8192) 
     while self.lockedbuffer: 
      sleep(0.01) 

     self.inbuffer += data 


    def handle_write(self): 
     while self.is_writable: 
      sent = self.send(self.buffer) 
      sleep(1) 

      self.buffer = self.buffer[sent:] 
      if len(self.buffer) <= 0: 
       self.is_writable = False 
      sleep(0.01) 

    def _send(self, what): 
     self.buffer += what + '\r\n' 
     self.is_writable = True 

    def run(self): 
     self._send('GET/HTTP/1.1\r\n') 

while 1: 
    logDispatcher() # <- Initate one for each request. 
    asyncore.loop(0.1) 
    log('All threads are done, next loop in 10', 'CORE') 
    sleep(10) 

Oder Sie könnten einfach einen Thread tun, dass die Arbeit erledigt und dann stirbt.

from threading import * 
class worker(Thread): 
    def __init__(self, host, postdata) 
     Thread.__init__(self) 
     self.host = host 
     self.postdata = postdata 
     self.start() 
    def run(self): 
     sock.send(self.postdata) #Pseudo, create the socket! 

for data in postDataObjects: 
    worker('example.com', data) 

Wenn Sie die Anzahl der Threads beschränken müssen (wenn Sie 5k Beiträge sind das Senden über oder so könnte es auf dem System erhalten Besteuerung) tun nur while len(enumerate()) > 1000: sleep(0.1) und lassen Sie den Looper Objekt für ein paar Threads warten Aussterben.

0

wickeln Sie Ihre URL und Parameter in eine Liste, dann ein Paar einmal zum Aufgabenpool (der Aufgabenpool hat hier entweder eine Aufgabe oder ist leer), erstellt Threads, lesen Sie die Aufgabe aus dem Aufgabenpool, wann ein Thread die Aufgabe und senden Sie die Anfrage, dann Pop-out eine andere aus Ihrer Liste (dh das ist eigentlich eine Warteschlange)

1

Sie können die join Methode anstelle von sleep verwenden und dann überprüfen Sie den Status.Wenn Sie eines nach dem anderen ausführen möchten, wird das Problem gelöst. Ändern Sie Ihren Code leicht, um es zu testen, scheint es gut zu funktionieren.

import gevent 
import requests 

def execute_task(worker, job): 

    print "About to spawn request" 
    greenlet = gevent.spawn(requests.get, 'http://example.com', params={}) 

    print "Request spawned, about to call sleep" 
    gevent.sleep() 

    print "Greenlet status: ", greenlet.ready() 
    print greenlet.get() 

execute_task(None, None) 

gibt die Ergebnisse:

About to spawn request 
Request spawned, about to call sleep 
Greenlet status: True 
<Response [200]> 

Gibt es mehr in diesem Python-Prozess geht die GEVENT von der Ausführung dieses greenlet blockiert werden könnte?

Verwandte Themen