2017-12-18 1 views
1

Ich spielte um Multithreading zu verstehen, so schrieb ich die folgende Client/Server-Anwendung, wo der Server einen Befehl an den Client sendet, überprüft der Client diesen Befehl, wenn es gleich ist zu "a" sendet es eine Antwort an den Server.Python Multithread-Server kann eine Client-Nachricht auf einmal behandeln

Im Servercode habe ich zwei Sockets und einen Thread erstellt; Der erste Socket sendet (veröffentlicht) den Befehl an alle verbundenen (abonnierten) Clients. Im Thread wartet der zweite Socket auf eine Antwort von den Clients, aber da der Thread einige blockierende Operationen ausführt (z. B. Speichern der vom Client gesendeten Informationen in einer Datenbank), kann er einen Client gleichzeitig behandeln, obwohl der Socket (req-rep socket) kann mehrere Nachrichten gleichzeitig empfangen.

server.py

import zmq 
import logging 
import threading 
import time 

logging.basicConfig(level=logging.DEBUG) 


class Server(object): 
    def __init__(self): 
     self.context = zmq.Context() 
     self.pub_port = 7777 
     self.rep_port = 7778 

     self.pub_socket = None 
     self.rep_socket = None 
     self.interface = "*" 

    def bind_ports(self): 
     logging.debug("[bind_ports] binding the ports....") 
     self.pub_socket = self.context.socket(zmq.PUB) 
     pub_bind_str = "tcp://{}:{}".format(self.interface, self.pub_port) 
     self.pub_socket.bind(pub_bind_str) 

     self.rep_socket = self.context.socket(zmq.REP) 
     rep_bind_str = "tcp://{}:{}".format(self.interface, self.rep_port) 
     self.rep_socket.bind(rep_bind_str) 

    def received_info(self): 
     while True: 
      # logging.debug("[received_flow] ") 
      cl_data = self.rep_socket.recv_json() 
      logging.info("[received_data] data <{}>".format(flow)) 
      self.rep_socket.send(b"\x00") 
      self.blocking_op(cl_data) 

    def blocking_op(self, data): 
     time.sleep(1) # simulating some blocking operations e.g. storing info in a database 

    def push_instruction(self, cmd): 
     logging.debug("[push_inst] Sending the instruction <%s> to the clients...", 
     # logging.debug("[push_inst] Sending the instruction <%s> to the agents ...", 
     cmd) 
     instruction = {"cmd": cmd} 
     self.pub_socket.send_json(instruction) 

    def create_thread(self): 
     thread = threading.Thread(target=self.received_info) 
     thread.daemon = True 
     thread.start() 
     logging.debug("[create_thread] Thread created <{}>".format(
                 thread.is_alive())) 

    def start_main_loop(self): 
     logging.debug("[start_main_loop] Loop started....") 
     self.bind_ports() 
     self.create_thread() 

     while True: 
      cmd = input("Enter your command: ") 
      self.push_instruction(cmd) 

if __name__ == "__main__": 
    Server().start_main_loop() 

client.py

import zmq 
import logging 
import random 
import time 

logging.basicConfig(level=logging.DEBUG) 

class Client(object): 
    def __init__(self): 
     self.context = zmq.Context() 
     self.sub_socket = None 
     self.req_socket = None 

     self.pub_port = 7777 
     self.req_port = 7778 
     self.server_ip = 'localhost' 

     self.client_id = "" 

    def connect_to_server(self): 
     logging.debug("[conn_to_serv] Connecting to the server ....") 
     self.sub_socket = self.context.socket(zmq.SUB) 
     self.sub_socket.setsockopt_string(zmq.SUBSCRIBE, "") 
     conn_str = "tcp://{}:{}".format(self.server_ip, self.pub_port) 
     self.sub_socket.connect(conn_str) 

     self.req_socket = self.context.socket(zmq.REQ) 
     req_conn_str = "tcp://{}:{}".format(self.server_ip, self.req_port) 
     self.req_socket.connect(req_conn_str) 

    def get_instruction(self): 
     inst = self.sub_socket.recv_json() 
     logging.debug("[get_inst] Server sent inst") 
     cmd = inst["cmd"] 
     return cmd 
    def send_flow(self, x, y): 
     flow = { 
      "client_id": self.client_id, 
      "x": x, 
      "y": y 
     } 
     self.req_socket.send_json(flow) 

    def start_main_loop(self): 
     logging.debug("starting the main loop ....") 
     self.client_id = input("What is your id: ") 
     self.connect_to_server() 

     while True: 
      inst = self.get_instruction() 
      logging.info("[Main_loop] inst<{}>".format(inst)) 
      if inst == "a": 
       # time.sleep(random.uniform(.6, 1.5)) 
       self.send_flow("xxx", "yyy") 
       self.req_socket.recv() 
       logging.debug("[main_loop] server received the flow") 

if __name__ == "__main__": 
    Client().start_main_loop() 

Ich würde es begrüßen, wenn jemand kann mir den Server zu verbessern, so dass es mehrere Kunden Nachricht an die dienen kann gleiche Zeit.

+0

Wenn Ihre Antwort blockiert oder eine lange Zeit dauert, dann wäre ein guter Weg, die Antwort in Ihrem 'receive_info()' einzulesen und dann einen Thread zu starten, der die eigentliche Verarbeitung durchführt. Die Ausführung dieses Threads würde so lange dauern, wie es dauert, aber es würde Ihre Hauptschleife nicht blockieren. – Hannu

Antwort

1

Ich konnte Ihren Code nicht ausführen und testen, aber wenn Ihr Problem receive_info() blockiert ist, würden Sie das umgehen, indem Sie einen Thread starten, um die tatsächliche Antwort zu behandeln. So etwas wie diese (vielleicht Fehler, ich war nicht in der Lage zu testen, mit Ihrem Code enthalten -. Zum Beispiel keine Ahnung, was flow ist)

def handle_response(self, data): 
    logging.info("[received_data] data <{}>".format(flow)) 
    self.rep_socket.send(b"\x00") 
    self.blocking_op(data) 

def received_info(self): 
     while True: 
      # logging.debug("[received_flow] ") 
      cl_data = self.rep_socket.recv_json() 
      _t = threading.Thread(target=self.handle_response, args=(cl_data,)) 
      _t.start() 

Dies hat Ihre received_info() Schleife, wie es ist, aber anstatt das zu tun, die Verarbeitung Dort wird ein neuer Thread gestartet, um die Antwort zu verarbeiten. Es nimmt, was es braucht, um abzuschließen und dann stirbt der Thread, aber Ihr received_info() wird sofort bereit sein, auf neue Antworten zu warten.

+0

Vielen Dank Hannu, es hat funktioniert. übrigens in args = (cl_data,) warum gibt es nach cl_data ein koma? eine zusätzliche Frage: Denken Sie, wenn ich 1000 Clients behandeln will, ist es besser, Threads zu verwenden oder gevent (oder asyncio) zu verwenden? – Corey

+0

Das Komma ist da, da Sie nur ein Argument übergeben und 'args' muss ein Tupel sein. Wenn Sie mehrere Argumente übergeben, können Sie args = (a, b, c) ohne das nachgestellte Komma angeben, aber es ist die einfachste Möglichkeit, ein Tupel aus einem Element zu erstellen. – Hannu

+0

Ich bin kein Experte für asyncio, kann also die Leistung nicht kommentieren. Python ist sowieso nicht die effizienteste Parallelverarbeitungssprache wegen GIL. Probiere Threads aus und wenn es ein Problem gibt, untersuche es. Mit Threads könntest du absolut in Ordnung sein. – Hannu

Verwandte Themen