2016-07-31 6 views
0

Ich arbeite mit Python2.7 mit Futures-Modul installiert.Wie implementiert man Multithreading mit Tornado?

Ich versuche Multithreading in Tornado mit ThreadPoolExecutor zu implementieren.

Hier ist der Code, den ich implementiert habe.

from __future__ import absolute_import 
from base_handler import BaseHandler 
from tornado import gen 
from pyrestful import mediatypes 
from pyrestful.rest import get, post, put, delete 
from bson.objectid import ObjectId 
from spark_map import Map 
from concurrent import futures 
import tornado 
class MapService(BaseHandler): 

    MapDB = dict() 
    executor = futures.ProcessPoolExecutor(max_workers=3) 

    @tornado.web.asynchronous 
    @gen.coroutine 
    @post(_path='/map', _type=[str, str]) 
    def postMap(self, inp, out): 
     db = self.settings['db'] 
     function = lambda (x,y): (x,y[0]*2) 
     future = yield db.MapInfo.insert({'input': inp, 'output': out, 'input_function': str(function)}) 
     response = {"inserted ID": str(future)} 
     self.write(response) 

     m = Map(inp, out, function, appName=str(future)) 
     futuree = self.executor.submit(m.operation()) 
     self.MapDB[str(future)] = {'map_object': m, 'running_process_future_object': futuree} 
     self.finish() 

    @tornado.web.asynchronous 
    @gen.coroutine 
    @delete(_path='/map/{_id}', _types=[str]) 
    def deleteMap(self, _id): 
     db = self.settings['db'] 
     future = yield db.MapInfo.find_one({'_id': ObjectId(_id)}) 
     if future is None: 
      raise AttributeError('No entry exists in the database with the provided ID') 
     chk = yield db.MapInfo.remove(future) 
     response = { "Succes": "OK" } 
     self.write(response) 

     self.MapDB[_id]['map_object'].stop() 
     del self.MapDB[_id] 
     self.finish() 

Im obigen Code erhalte ich zwei Eingaben mit der Post-Anfrage in Inp und Out. Dann führe ich eine Operation mit ihnen durch. Diese Operation sollte dauern, bis eine Löschanforderung empfangen wird, um den Prozess zu stoppen und zu entfernen.

Das Problem, mit dem ich konfrontiert bin, ist mit den mehreren Anfragen. Es führt nur die erste Anforderung aus, während andere Anforderungen auf die erste Anforderung warten, wodurch der Haupt-IOLoop blockiert wird.

Also, ich möchte jeden Prozess in einem separaten Thread ausführen. Wie soll ich es umsetzen?

+0

Was macht m.operation()? –

+0

m.operation() ist eine Funktion der benutzerdefinierten Klasse Map, die ich implementiert habe. Es führt im Grunde einige Berechnungen für einige Daten durch. Diese Operation sollte ausgeführt werden, bis eine Löschanforderung für diese bestimmte Operation empfangen wird. – vidhan

+0

Klingt, als ob Sie in m.operation() blockieren und Sie müssen es asynchron umschreiben. Wir müssten jedoch etwas Code für operation() sehen, um sicher zu sein. Oder Sie können einfach einen "Ausdruck" vor und nach dem Aufruf von operation() einfügen. –

Antwort

1

Es scheint, dass m.operation() blockiert, so dass Sie es auf einem Thread ausführen müssen. Die Art und Weise blockiert es den Haupt-Thread tun, während m.operation() Aufruf, und startet einen Thread nach:

self.executor.submit(m.operation()) 

Sie wollen stattdessen die Funktion auf einen Thread zu übergeben, die es ausführt:

self.executor.submit(m.operation) 

Kein Parens.

Verwandte Themen