Ich habe ein agentenbasiertes Modell, bei dem mehrere Agenten durch einen zentralen Prozess gestartet werden und über einen anderen zentralen Prozess kommunizieren. Jeder Agent und der Kommunikationsprozess kommunizieren über zmq. Allerdings, wenn ich anfangen, mehr als 100 Agenten standard_out sendet:zeromq und python multiprocessing, zu viele offene Dateien
Invalid argument (src/stream_engine.cpp: 143) Zu viele offene Dateien (src/ipc_listener.cpp: 292)
und Mac Os gibt einen Problembericht aus:
Python wurde unerwartet beendet, während das libzmq.5.dylib-Plug-in verwendet wurde.
Das Problem scheint mir, dass zu viele Kontexte geöffnet sind. Aber wie kann ich das mit Multiprocessing vermeiden?
Ich lege Teil des Codes unter:
class Agent(Database, Logger, Trade, Messaging, multiprocessing.Process):
def __init__(self, idn, group, _addresses, trade_logging):
multiprocessing.Process.__init__(self)
....
def run(self):
self.context = zmq.Context()
self.commands = self.context.socket(zmq.SUB)
self.commands.connect(self._addresses['command_addresse'])
self.commands.setsockopt(zmq.SUBSCRIBE, "all")
self.commands.setsockopt(zmq.SUBSCRIBE, self.name)
self.commands.setsockopt(zmq.SUBSCRIBE, group_address(self.group))
self.out = self.context.socket(zmq.PUSH)
self.out.connect(self._addresses['frontend'])
time.sleep(0.1)
self.database_connection = self.context.socket(zmq.PUSH)
self.database_connection.connect(self._addresses['database'])
time.sleep(0.1)
self.logger_connection = self.context.socket(zmq.PUSH)
self.logger_connection.connect(self._addresses['logger'])
self.messages_in = self.context.socket(zmq.DEALER)
self.messages_in.setsockopt(zmq.IDENTITY, self.name)
self.messages_in.connect(self._addresses['backend'])
self.shout = self.context.socket(zmq.SUB)
self.shout.connect(self._addresses['group_backend'])
self.shout.setsockopt(zmq.SUBSCRIBE, "all")
self.shout.setsockopt(zmq.SUBSCRIBE, self.name)
self.shout.setsockopt(zmq.SUBSCRIBE, group_address(self.group))
self.out.send_multipart(['!', '!', 'register_agent', self.name])
while True:
try:
self.commands.recv() # catches the group adress.
except KeyboardInterrupt:
print('KeyboardInterrupt: %s,self.commands.recv() to catch own adress ~1888' % (self.name))
break
command = self.commands.recv()
if command == "!":
subcommand = self.commands.recv()
if subcommand == 'die':
self.__signal_finished()
break
try:
self._methods[command]()
except KeyError:
if command not in self._methods:
raise SystemExit('The method - ' + command + ' - called in the agent_list is not declared (' + self.name)
else:
raise
except KeyboardInterrupt:
print('KeyboardInterrupt: %s, Current command: %s ~1984' % (self.name, command))
break
if command[0] != '_':
self.__reject_polled_but_not_accepted_offers()
self.__signal_finished()
#self.context.destroy()
der gesamte Code ist unter http://www.github.com/DavoudTaghawiNejad/abce
[Hier finden Sie Informationen zum Überprüfen und Ändern Ihres FD-Limits] (http://www.cyberciti.biz/faq/linux-increase-the-maximum-number-of-open-files/). – Jason