Ich versuche, pathos.multiprocessing.Pool in meinem Projekt zu verwenden. Allerdings wird das folgende Problem beim Beenden des Pools auftreten. Ich benutze CentOS 6.5, ich bin mir nicht sicher, ob es durch pathos.multiprocessing.Pool oder andere Sache verursacht wird, kann mir jemand dabei helfen?Irgendwann pathos.multiprocessing.Pool kann nicht korrekt beendet werden
Traceback (most recent call last):
File "/usr/local/lib/python2.7/threading.py", line 801, in __bootstrap_inner
self.run()
File "/usr/local/lib/python2.7/threading.py", line 1073, in run
self.function(*self.args, **self.kwargs)
File "receiver.py", line 132, in kill_clients
pool.terminate()
File "/usr/local/lib/python2.7/site-packages/multiprocess/pool.py", line 465, in terminate
self._terminate()
File "/usr/local/lib/python2.7/site-packages/multiprocess/util.py", line 207, in __call__
res = self._callback(*self._args, **self._kwargs)
File "/usr/local/lib/python2.7/site-packages/multiprocess/pool.py", line 513, in _terminate_pool
p.terminate()
File "/usr/local/lib/python2.7/site-packages/multiprocess/process.py", line 137, in terminate
self._popen.terminate()
File "/usr/local/lib/python2.7/site-packages/multiprocess/forking.py", line 174, in terminate
os.kill(self.pid, signal.SIGTERM)
OSError: [Errno 3] Ein solcher Prozess
Die verdrahtete Sache ist am Anfang, dass es funktioniert gut. Aber wenn der vierte Job empfangen wird, wird es ein solches Problem geben.
class Receiver:
def __init__(self):
....
self.results={}
def kill_clients(self, client_list, pool):
for client in client_list:
client.kill()
pool.terminate()
def process_result(self, result):
if result is None:
self.results = {}
return
res = result.split(':')
if len(res) != 4:
raise Exception("result with wrong format: %s" % result)
self.results['%s_%s' % (res[0], res[1])] = {"code": res[3], "msg": res[4]}
...
def handler(self, job):
self.lg.debug("Receive job in rtmp_start_handler.")
self.lg.debug("<%s>" % str(job))
# each client corresponding one process
cli_counts = job['count']
pool = Pool(processes=cli_counts)
clients = []
try:
for i in xrange(cli_counts):
rtmp_cli = RtmpClient(job['case'], i)
clients.append(rtmp_cli)
[pool.apply_async(client.run, callback=self.process_result)
for client in clients]
pool.close()
sleep(1)
self.lg.debug("All clients are started.")
t = Timer(
job['timeout'],
self.kill_clients,
args=(clients, pool)
)
t.start()
self.lg.debug("Timer is started. timeout %s s" % job['timeout'])
pool.join()
except Exception, e:
self.lg.warning("Exception occurred: %s" % e)
self.lg.warning(format_exc())
return "0"
# here the self.results shall be ready
return self.parse_results()