2015-03-30 16 views
7

Ich verwende einen Tornado-Webserver, um Elemente in Warteschlange zu stellen, die außerhalb des Anfrage/Antwort-Zyklus verarbeitet werden müssen.Erstellen einer Verarbeitungswarteschlange in Tornado

In meinem untenstehenden vereinfachten Beispiel füge ich jedes Mal, wenn eine Anfrage eingeht, eine neue Zeichenfolge zu einer Liste namens queued_items hinzu. Ich möchte etwas erstellen, das diese Liste beobachten und die darin enthaltenen Elemente verarbeiten wird.

(In meinem echten Code werden die Elemente verarbeitet und über einen TCP-Socket gesendet, der verbunden sein kann oder nicht verbunden sein kann, wenn die Webanforderung eintrifft. Ich möchte, dass der Webserver die Elemente trotz der Socketverbindung in der Warteschlange hält)

Ich versuche, diesen Code einfach zu halten und keine externen Warteschlangen/Programme wie Redis oder Beanstalk zu verwenden. Es wird keine sehr hohe Lautstärke haben.

Was ist ein guter Weg mit Tornado Idiome, um die client.queued_items Liste für neue Elemente zu sehen und zu verarbeiten, wie sie ankommen?

import time 

import tornado.ioloop 
import tornado.gen 
import tornado.web 

class Client(): 

    def __init__(self): 
     self.queued_items = [] 

    @tornado.gen.coroutine 
    def watch_queue(self): 
     # I have no idea what I'm doing 
     items = yield client.queued_items 
     # go_do_some_thing_with_items(items) 

class IndexHandler(tornado.web.RequestHandler): 

    def get(self): 
     client.queued_items.append("%f" % time.time()) 
     self.write("Queued a new item") 

if __name__ == "__main__": 

    client = Client() 

    # Watch the queue for when new items show up 
    client.watch_queue() 

    # Create the web server 
    application = tornado.web.Application([ 
     (r'/', IndexHandler), 
    ], debug=True) 

    application.listen(8888) 
    tornado.ioloop.IOLoop.instance().start() 

Antwort

11

Es gibt eine Bibliothek toro genannt, die Synchronisierungsgrund für tornado zur Verfügung stellt. [Update: Ab Tornado 4.2, toro wurde in tornado verschmolzen.]

Klingt wie Sie gerade eine toro.Queue (oder tornado.queues.Queue in tornado 4.2+) nutzen könnten diese zu handhaben:

import time 

import toro 
import tornado.ioloop 
import tornado.gen 
import tornado.web 

class Client(): 

    def __init__(self): 
     self.queued_items = toro.Queue() 

    @tornado.gen.coroutine 
    def watch_queue(self): 
     while True: 
      items = yield self.queued_items.get() 
      # go_do_something_with_items(items) 

class IndexHandler(tornado.web.RequestHandler): 

    @tornado.gen.coroutine 
    def get(self): 
     yield client.queued_items.put("%f" % time.time()) 
     self.write("Queued a new item") 

if __name__ == "__main__": 

    client = Client() 

    # Watch the queue for when new items show up 
    tornado.ioloop.IOLoop.instance().add_callback(client.watch_queue) 

    # Create the web server 
    application = tornado.web.Application([ 
     (r'/', IndexHandler), 
    ], debug=True) 

    application.listen(8888) 
    tornado.ioloop.IOLoop.instance().start() 

Es sind ein paar Veränderungen erforderlich, abgesehen von der Datenstruktur aus einer Liste zu einem toro.Queue Schalt:

  1. Wir brauchenplanen 10, um innerhalb des IOLoop mit add_callback zu laufen, anstatt es direkt außerhalb eines IOLoop-Kontexts aufzurufen.
  2. IndexHandler.get muss in eine Coroutine konvertiert werden, da toro.Queue.put eine Coroutine ist.

Ich habe auch eine while True Schleife watch_queue, so dass es für immer laufen wird, und nicht nur ein Element der Verarbeitung und Aussteigen dann.

+0

Das ist genau das, was ich brauchte. Danke, dass Sie mir gezeigt haben, wie Sie es umsetzen können. – Scott

+0

Dano - Wie kann ich aufhören, die Warteschlange zu beobachten? Wenn meine Verbindung schlecht wird, muss ich die Objekte in der Warteschlange vorübergehend nicht mehr verarbeiten, sie aber nicht verlieren. – Scott

+1

toro wurde in Tornado zusammengeführt und ist nun veraltet. Für Tornado> = 4.2 können Sie 'tornado.queues.Queue' verwenden –