2016-04-14 10 views
0

Ich habe eine Tornado-Webanwendung, diese App kann GET- und POST-Anfragen vom Client erhalten.So erhalten Sie mehrere Anfragen in einer Tornado-Anwendung

Die POSTs Anfrage legen eine Information in einer Tornado Queue, dann poppe ich diese Informationen aus der Warteschlange und damit eine Operation auf der Datenbank, diese Operation kann sehr langsam sein, es kann einige Sekunden dauern, um abzuschließen!

In der Zwischenzeit, dass diese Datenbankoperation weitergeht, möchte ich in der Lage sein, andere POSTs (die andere Informationen in die Warteschlange stellen) und GET zu empfangen. Die GET sind stattdessen sehr schnell und müssen ihr Ergebnis sofort an den Client zurückgeben.

Das Problem ist, dass der Server keine anderen Anforderungen vom Client akzeptiert, wenn ich aus der Warteschlange und den langsamen Vorgang starten. Wie kann ich das beheben?

Dies ist der semplified Code, den ich bisher geschrieben habe (Import ist für vermeiden Wand des Textes weggelassen):

# URLs are defined in a config file 
application = tornado.web.Application([ 
    (BASE_URL, Variazioni), 
    (ARTICLE_URL, Variazioni), 
    (PROMO_URL, Variazioni), 
    (GET_FEEDBACK_URL, Feedback) 
]) 

class Server: 

    def __init__(self): 
     http_server = tornado.httpserver.HTTPServer(application, decompress_request=True) 
     http_server.bind(8889) 
     http_server.start(0) 

     transactions = TransactionsQueue() #contains the queue and the function with interact with it 
     IOLoop.instance().add_callback(transactions.process) 

    def start(self): 
     try: 
      IOLoop.instance().start() 
     except KeyboardInterrupt: 
      IOLoop.instance().stop() 

if __name__ == "__main__": 
    server = Server() 
    server.start() 



class Variazioni(tornado.web.RequestHandler): 
    ''' Handle the POST request. Put an the data received in the queue ''' 

    @gen.coroutine 
    def post(self): 
     TransactionsQueue.put(self.request.body) 
     self.set_header("Location", FEEDBACK_URL) 


class TransactionsQueue: 
    ''' Handle the queue that contains the data 
     When a new request arrive, the generated uuid is putted in the queue 
     When the data is popped out, it begin the operation on the database 
    ''' 

    queue = Queue(maxsize=3) 

    @staticmethod 
    def put(request_uuid): 
     ''' Insert in the queue the uuid in postgres format ''' 
     TransactionsQueue.queue.put(request_uuid) 


    @gen.coroutine 
    def process(self): 
     ''' Loop over the queue and load the data in the database ''' 
     while True: 
      # request_uuid is in postgres format 
      transaction = yield TransactionsQueue.queue.get() 
      try: 
       # this is the slow operation on the database 
       yield self._load_json_in_db(transaction) 
      finally: 
       TransactionsQueue.queue.task_done() 

Außerdem verstehe ich nicht, warum, wenn ich in einer Reihe 5 POST tun, es ausdrückte alle fünf Daten in der Warteschlange obwohl die maximale Größe ist 3.

Antwort

1

Ich werde vermuten, dass Sie einen synchronen Datenbanktreiber verwenden, also _load_json_in_db, obwohl es eine Coroutine ist, ist nicht wirklich async. Daher blockiert es die gesamte Ereignisschleife, bis die lange Operation abgeschlossen ist. Aus diesem Grund akzeptiert der Server bis zum Ende der Operation keine weiteren Anfragen.

Da _load_json_in_db die Ereignisschleife blockiert, kann Tornado während der Ausführung keine weiteren Anforderungen akzeptieren, sodass Ihre Warteschlange niemals auf die maximale Größe anwächst.

Sie benötigen zwei Fixes.

Verwenden Sie zuerst einen asynchronen Datenbanktreiber, der speziell für Tornado geschrieben wurde, oder führen Sie Datenbankoperationen für Threads mit Tornados ThreadPoolExecutor aus.

Sobald die Anwendung fertig ist in der Lage, um die Warteschlange zu füllen, so zweiten, TransactionsQueue.put tun müssen:

TransactionsQueue.queue.put_nowait(request_uuid) 

Dies löst eine Ausnahme aus, wenn bereits drei Elemente in der Warteschlange sind, die ich denke, ist das, was du hast vor.

+0

Ja, ich benutze psycopg2 für den Zugriff auf die Datenbank. Ich werde versuchen, 'ThreadPoolExecutor' zu verwenden, es scheint jetzt das einfachste zu sein. Frage: Wenn ich einen db-Treiber für tornado (momoko oder Abfragen) verwende '_load_json_in_db' muss immer eine Coroutine sein? – k4ppa

+0

Korrigieren; Jede Funktion in einer Tornado-Anwendung, die E/A ausführt, muss asynchron geschrieben sein: das heißt, sie muss eine Coroutine sein, oder sie muss einen Rückruf ausführen, der später mit dem Ergebnis der E/A ausgeführt wird. –

Verwandte Themen