0

Ich bin auf der Suche Klein und aufgeschoben. Im folgenden Beispiel versuche ich, eine Zahl mit einem Kindprozess zu erhöhen und über Future zurückzugeben. Ich kann den Rückruf der Zukunft erhalten.Klein App mit aufgeschobenen

Das Problem ist, dass verzögerte Objekt ruft nie die Funktion cb() und die Anforderung an den Endpunkt nie zurückgibt. Bitte helfen Sie mir, das Problem zu identifizieren.

Nach meinem server.py Code ist

from klein import Klein 
from twisted.internet.defer import inlineCallbacks, returnValue 
import Process4 

if __name__ == '__main__': 
    app = Klein() 

    @app.route('/visit') 
    @inlineCallbacks 
    def get_num_visit(request):   
     try: 
      resp = yield Process4.get_visitor_num() 
      req.setResponseCode(200) 
      returnValue('Visited = {}'.format(resp)) 
     except Exception as e: 
      req.setResponseCode(500) 
      returnValue('error {}'.format(e)) 

    print('starting server') 
    app.run('0.0.0.0', 5005) 

Es folgt Process4.py Code

from multiprocessing import Process 
from concurrent.futures import Future 
from time import sleep 
from twisted.internet.defer import Deferred 

def foo(x): 
    result = x+1 
    sleep(3) 
    return result 


class MyProcess(Process): 

    def __init__(self, target, args): 
     super().__init__() 
     self.target = target 
     self.args = args 
     self.f = Future() 
     self.visit = 0 

    def run(self): 
     r = foo(self.visit) 
     self.f.set_result(result=r) 

def cb(result): 
    print('visitor number {}'.format(result)) 
    return result 

def eb(err): 
    print('error occurred {}'.format(err)) 
    return err 


def future_to_deferred(future): 
    d = Deferred() 

    def callback(f): 
     e = f.exception() 
     if e: 
      d.errback(e) 
     else: 
      d.callback(f.result()) 

    future.add_done_callback(callback) 
    return d 

def get_visitor_num(): 
    p1 = MyProcess(target=foo, args=None) 
    d = future_to_deferred(p1.f) 
    p1.start() 
    d.addCallback(cb) 
    d.addErrback(eb) 
    sleep(1) 
    return d 

Edit 1

Rückrufe hinzufügen, bevor der Prozess p1 Start löst das Problem der Aufruf der Funktion cb(). Die HTTP-Anforderung an den Endpunkt wird jedoch nicht zurückgegeben.

+0

abgedreht und der stdlib Multiprocessing-Modul eine schlechte Passform sind. Betrachten Sie stattdessen Ampulle. Siehe https://stackoverflow.com/questions/5715217/mix-python-twisted-with-multiprocessing und https://stackoverflow.com/questions/1470850/twisted-network-client-with-multiprocessing-workers und andere ähnliche Fragen auf SO. –

+0

Ich bin sicher, dass 'reactor.callFromThread' aufgerufen werden muss, damit die Ergebnisse im Haupt-Thread gesetzt werden. Werfen Sie einen Blick auf [diese Antwort gab ich eine Weile zurück] (https://stackoverflow.com/questions/45930518/how-to-make-twisted-defer-get-function-result/45969032#45969032) und sehen Sie, ob es macht Sinn. Sie sollten etwas Ähnliches anwenden können. –

+0

Danke für die Antwort. Bitte werfen Sie einen Blick auf meine Antwort unten. @ notorious.no, Jean-Paul Calderone –

Antwort

0

Es stellt sich heraus, dass zukünftiges Ergebnis Einstellung self.f.set_result (result = r) im run() Methode triggert die Rückruf() Methode im Kindprozess, wo kein Thread Warten auf die Rückgabe des Ergebnisses!

Um die Callback() -Funktion im MainProcess ausgelöst zu bekommen, musste ich das Ergebnis vom Child-Prozess mit einer Multiprocess-Queue mit einem Worker-Thread im MainProcess holen und dann das zukünftige Ergebnis setzen.

@ notorious.no Danke für die Antwort. Eine Sache, die mir aufgefallen ist, ist, dass reactor.callFromThread in meinem modifizierten Code von Worker-Thread zu MainThread wechselt, aber d.callback (f.result()) funktioniert gut, gibt aber das Ergebnis vom Worker-Thread zurück.

Folgende ist die modifizierte Arbeits Code

server.py

from klein import Klein 
from twisted.internet.defer import inlineCallbacks, returnValue 


import Process4 

if __name__ == '__main__': 
    app = Klein() 
    visit_count = 0 

    @app.route('/visit') 
    @inlineCallbacks 
    def get_num_visit(req): 
     global visit_count 
     try: 
      resp = yield Process4.get_visitor_num(visit_count) 
      req.setResponseCode(200) 
      visit_count = resp 
      returnValue('Visited = {}'.format(resp)) 
     except Exception as e: 
      req.setResponseCode(500) 
      returnValue('error {}'.format(e)) 

    print('starting server') 
    app.run('0.0.0.0', 5005) 

Process4.py

from multiprocessing import Process, Queue 
from concurrent.futures import Future 
from time import sleep 
from twisted.internet.defer import Deferred 
import threading 
from twisted.internet import reactor 


def foo(x, q): 
    result = x+1 
    sleep(3) 
    print('setting result, {}'.format(result)) 
    q.put(result) 


class MyProcess(Process): 

    def __init__(self, target, args): 
     super().__init__() 
     self.target = target 
     self.args = args 
     self.visit = 0 

    def run(self): 
     self.target(*self.args) 


def future_to_deferred(future): 
    d = Deferred() 

    def callback(f): 
     e = f.exception() 
     print('inside callback {}'.format(threading.current_thread().name)) 
     if e: 
      print('calling errback') 
      d.errback(e) 
      # reactor.callFromThread(d.errback, e) 
     else: 
      print('calling callback with result {}'.format(f.result())) 
      # d.callback(f.result()) 
      reactor.callFromThread(d.callback, f.result()) 
    future.add_done_callback(callback) 
    return d 


def wait(q,f): 
    r = q.get(block=True) 
    f.set_result(r) 


def get_visitor_num(x): 

    def cb(result): 
     print('inside cb visitor number {} {}'.format(result, threading.current_thread().name)) 
     return result 

    def eb(err): 
     print('inside eb error occurred {}'.format(err)) 
     return err 

    f = Future() 
    q = Queue() 
    p1 = MyProcess(target=foo, args=(x,q,)) 

    wait_thread = threading.Thread(target=wait, args=(q,f,)) 
    wait_thread.start() 

    defr = future_to_deferred(f) 
    defr.addCallback(cb) 
    defr.addErrback(eb) 
    p1.start() 
    print('returning deferred') 
    return defr