1

Das ist eine ziemlich komplizierte Frage, also werde ich mein Bestes tun, um klar in meiner Erklärung zu sein und nicht zu viele unnötige Details zu geben.Py 2.7 arch: Wie persistente HTTP/S mit mehreren Servern und sammeln nicht Daten zum Senden mehrmals?

Ich habe letztes Jahr ein Python-Skript für die Arbeit entwickelt. Es packt grundlegende Systemdaten und sendet sie an einen HTTP/S-Server, der Befehle zurücksenden kann, wenn ein Benutzer dies wünscht. Es war ein großes Experiment für das letzte Jahr, zu sehen, was funktioniert und was nicht. Testen verschiedener Bedürfnisse innerhalb des Unternehmens usw. Aber jetzt habe ich ein ziemlich solides Verständnis von dem, was wir brauchen. So beginne ich meine Reise auf Version 2.

Das Ziel dieser neuen Version ist es, die Funktionalität beizubehalten und gleichzeitig System-/CPU-Last und Bandbreite zu reduzieren. Nach dem Entwickeln dieses Python-Skripts wird der Rest der Arbeit auf dem HTTP/S-Server ausgeführt. Meine Frage bezieht sich speziell auf die Client-Seite, das Python-Skript. Ich verwende Python 2.7.x, meistens auf Debian-basierten Systemen.

Das Skript v1 greift auf Systemdaten zu, liest eine Konfigurationsdatei mit Servern, an die die Daten gesendet werden, und verwendet Threads zum Senden an die einzelnen Server. (Immer noch in diesen Threads) kann jeder Server 1 oder mehrere Befehle zurückgeben, die dann auch über ihre eigenen Threads verarbeitet werden. Das Skript wird einmal pro Minute über crontab ausgeführt. Sie können 5 oder mehr Server haben, die jeweils 10 Befehle senden, und das Skript führt immer noch alles reibungslos, effektiv und ohne viel Zeit aus, um die von den Servern ausgegebenen Befehle zu beenden.

Im v2-Skript, ich suche die folgenden erforderlichen Änderungen vorzunehmen:

  • Wird als Systemdienst ausgeführt werden. Anstatt dass der Code jede Minute von cron ausgeführt wird, wird das Skript alle paar Sekunden wiederholt.

  • Die Schleife muss Daten einmal jedes Mal durch die Schleife sammeln, sie dann zu jedem Web-Server senden (wie in der Konfigurationsdatei definiert)

  • Ich mag persistente HTTP/S-Verbindungen für Performance und Bandbreitenoptimierung .

  • Ich möchte nicht jedes Mal Daten durch die Schleife für jeden HTTP/S-Server sammeln. Ich möchte Daten nur einmal pro Iteration durch die Hauptschleife sammeln, die den Dienst steuert, und dann diese Daten an die Threads senden, die die eingerichteten persistenten HTTP/S-Verbindungen verwalten.

Hier liegt mein Problem. Wie bekomme ich persistente Verbindungen in ihren jeweiligen Threads und bekomme Daten zu diesen Threads, während ich die Daten nur einmal sammle?

Von does httplib reuse TCP connections? Ich sehe, dass persistente Verbindungen können in einer solchen Art und Weise durchgeführt werden (Danke Corey Goldberg):

con = httplib.HTTPConnection("myweb.com") 
while True: 
    con.request("GET", "/x.css", headers={"Connection":" keep-alive"}) 
    result = con.getresponse() 
    result.read() 
    print result.reason, result.getheaders() 

Datenerfassung muss innerhalb dieser Schleife passieren. Aber ich brauche das in mehreren Threads, die gleichzeitig mit verschiedenen Servern sprechen, und möchte nicht die Ressourcen verschwenden, um die Daten mehr als einmal zu holen. Ich sehe einfach nicht, wie es möglich ist, angesichts meiner relativ begrenzten Kenntnisse von Python.

Grundsätzlich, wie ich es jetzt sehe, muss es eine Schleife geben, die das HTTP/S innerhalb ihrer Threads antreibt. Dann brauche ich eine Art Schleife, um meine Daten zu sammeln und sie auf die HTTP/S-Verbindungen vorzubereiten. Aber wie bekomme ich die ersten Loops innerhalb der zweiten Loops?Es ist so, als müsste ich die persistente HTTP/S-Verbindungsschleife in der Datensammelschleife verwenden, aber ich brauche auch die Datensammelschleife in der HTTP/S-Schleife.

Ich möchte jede reine 2.7.x Pythonic Möglichkeiten, die dies erreicht werden könnte erkunden. Abhängig von externen Dienstprogrammen kann aus verschiedenen Gründen problematisch sein. Dieses Skript wird, wenn es fertig ist, auf mehr als 150 Linux-Systemen bereitgestellt werden und je weniger es schief gehen kann, desto besser.

Vielen Dank für Ihre Hilfe und Rücksichtnahme!

Antwort

1

Ich werde das hier für andere, die wie ich, suchen, um ihre Python Verständnis zu erweitern. Ich brauchte eine Weile, um herauszufinden, wie ich dieses Problem angehen sollte, aber die Lösung wurde klargestellt, nachdem ich mit einem Kollegen gesprochen hatte, der diese Art von Problem verstand.

Kurz gesagt, die Antwort, die für mich funktionierte, verwendete native Threading- und Queue-Module von Python 2.7.x.

Ich habe dies mein Hauptprogramm, das die verschiedenen Threads und Warteschlangen verwaltet, die ich eingerichtet habe. Die NetworkWorker-Klasse, die das Threading-Modul erweitert, spinnt bei der Initialisierung auch neue Queues für jede Instanz. Die Warteschlangenreferenz/der Handler wird in einer globalen Listenvariablen gespeichert. Ich durchlaufe einfach die Queue-Liste und sende Daten an jede Thread-Queue in meinem Haupt-Thread (main.py). Dann bekommt jeder Thread seine Daten und tut was er soll. Daten, die von jeder HTTP-Verbindung zurück empfangen werden, werden in eine andere Warteschlange geladen, die von einem einzelnen Befehlsausführungsthread in main.py verarbeitet wird.

Der folgende Code wurde aus dem ursprünglichen Kontext geändert/extrahiert. Ich habe es getestet und es funktioniert einwandfrei, solange Sie die Server in der Datei "self.conf" korrekt konfigurieren, die sich in main.py> mein_service>init befindet, und die Serverantwort mit gültigem JSON. Ehrlich gesagt könnte es etwas sauber machen. Um sicherzustellen, dass der Code öffentlich und zugänglich bleibt, habe ich eine Creative Commons-Lizenz hinzugefügt. Jeder, der glaubt, dass dieser Code dem eigenen Code ähnelt, kann sich mit mir in Verbindung setzen, um eine korrekte Zuordnung zu erhalten.

Mit Ausnahme von main.py sind die Namen der anderen 2 Dateien wichtig. shared_globals.py und workerThread.py Dateinamen Groß- und Kleinschreibung und müssen im selben Ordner wie main.py

Haupt ausführbar sein: main.py

#!/usr/bin/python 
# encoding=utf8 

from time import sleep, time 
import subprocess, sys, os # used to get IP, system calls, etc 
import json 

# For web support 
import httplib 
import urllib 
import zlib 
import base64 

# wokerThread Dependancy 
import shared_globals 
from workerThread import NetworkWorker 

import Queue 
import threading 

''' 
This work, Python NetworkWorker Queue/Threading, is licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License. 
Written by John Minton @ http://pythonjohn.com/ 
To view a copy of this license, visit http://creativecommons.org/licenses/by-nc-sa/4.0/. 
''' 

class my_service: 

    # * * * * 
    def __init__(self): 

     # Manually list off the servers I want to talk to 

     self.conf = {} 
     self.conf['servers'] = {} 

     self.conf['servers']['ServerName1'] = {} 
     self.conf['servers']['ServerName1']['protocol'] = "http" 
     self.conf['servers']['ServerName1']['url'] = "server.com" 
     self.conf['servers']['ServerName1']['port'] = "80" 
     self.conf['servers']['ServerName1']['path'] = "/somefile.php" 
     self.conf['servers']['ServerName1']['timeout'] = "10" # Seconds. Make sure this is long enough for your largest OR mission critical HTTP/S transactions to finish + time it takes to wait for your data to come into your persistant HTTP/S thread. Data comes in every 2 seconds, so 5-10 seconds should be fine. Anything that takes too long will cause the queue to back up too much. 

     self.conf['servers']['ServerName2'] = {} 
     self.conf['servers']['ServerName2']['protocol'] = "http" 
     self.conf['servers']['ServerName2']['url'] = "otherserver.net" 
     self.conf['servers']['ServerName2']['port'] = "80" 
     self.conf['servers']['ServerName2']['path'] = "/dataio.php" 
     self.conf['servers']['ServerName2']['timeout'] = "5" 

     # Start the Threading Manager, which will manage the various threads and their components 
     # All cross thread communication needs to be managed with Queues 
     self.threadManager() 


    def threadManager(self): 

     # A place to reference all threads 
     self.threads = [] 

     print "Loading Shared Globals" 
     # This is the 3rd file in this project. I would not need this if 
     # the NetworkWorker Thread was inside of this same file. But since it 
     # is in another file, we use this shared_globals file to make the Queue's 
     # list and other shared resources available between the main thread and the NetworkWorker Threads 
     shared_globals.init() 

     # Keep track of all the threads/classes we are initializing 
     self.workers = {} # Keep track of all the worker threads 

     print "Initalizing Network Worker Threads from Config" 
     # For each server we want to talk to, we start a worker thread 
     # Read servers from self.conf and init threads/workers 
     for t in self.conf['servers']: # Loop through servers in config 
      # T = server name 
      #print "T: ", self.conf['servers'][t] 
      self.workers[t] = NetworkWorker()  # Save worker handlers to workers dict 

      # Set the server data for each NetworkWorker Thread 
      self.workers[t].set_server(self.conf['servers'][t]['url'], self.conf['servers'][t]['port'], self.conf['servers'][t]['timeout'], self.conf['servers'][t]['path']) 

     print "Initalizing Command Processing Queue" 
     cmd_q = Queue.Queue() 
     cmd_q.daemon = True 
     shared_globals.cmd_active_queue = cmd_q 

     print "Starting Command Processing thread" 
     # Start the data gathering thread 
     t_cmd = threading.Thread(target=self.command_que_thread_manager) 
     t_cmd.daemon = True 
     self.threads.append(t_cmd) 
     t_cmd.start() 

     print "Start Data Gathering thread" 
     # Start the data gathering thread 
     t = threading.Thread(target=self.data_collector_thread) 
     t.daemon = True 
     self.threads.append(t) 
     t.start() 

     print "Starting Worker threads" 
     for w in self.workers:  # Loop through all worker handlers 
      self.workers[w].start() # Start the jobs 

     # We have our NetworkWorker Threads running, and they init their own queues which we 
     # send data to using the def below titled self.send_data_to_networkWorkers 

     print "Service Started\n\n\n" 

     # This keeps the main thread listening so you can perform actions like killing the application with CTRL+C 
     while threading.active_count() > 0: 
      try: 
       sleep(0.1) 
      except (KeyboardInterrupt, SystemExit): # Exits the main thread without complainnt! 
       print "\n" 
       os._exit(0) 
     os._exit(0) 

    def data_collector_thread(self): 
     ''' 
     Gather all the data we want to send to each server 
     Send data to the queues for each NetworkWorker thread we init'd above 
     ''' 
     # Loop indefinately 
     while True: 

      # Gather your data and load into data Dict 
      data = {"data":"values"} 
      print "\n\nData to be sent to all NetworkWorker threads: ", data, "\n\n" 

      # Prep the data for HTTP/S 
      # If you need to do something else with the data besides sending it to the threads, do it here 
      data = self.prep_data_for_HTTP(data) # Do any pre-HTTP/S processing here 
      self.send_data_to_networkWorkers(data) # Send the data out to all the Threads Queue's 
      sleep(2) # wait for a little bit and then iterate through the loop again. This is your main loop timer. 

    def prep_data_for_HTTP(self, data): 
     ''' 
     I am converting my data from a python dict to a JSON Starting 
     I compress the JSON Starting 
     I load the compressed string into another dict, as the HTTP/S object (in the NetworkWorker thread) expects a DICT 
     URL encode the data for HTTP/S POST transit 
     Return the manipulated data object, now ready for HTTP/S 
     ''' 
     data = json.dumps(data, encoding='utf8') # Now continue preparing for HTTP/S 
     data = zlib.compress(data, 8) 
     # In PHP, get the data from the $_POST['data'] key 
     data = {"data":data} 
     data = urllib.urlencode(data) 
     return data 
    # END DEF 

    def command_que_thread_manager(self): 
     ''' 
     Run as a thread 
     Send data to this thread via it's queue, init'd above in thread Manager 
     Grabs data, and then does something to process it 
     ''' 
     while True: 
      data = shared_globals.cmd_active_queue.get() 
      print "Processing Command: ", data 
    # END DEF 

    def send_data_to_networkWorkers(self,data): 
     ''' 
     Send data to all the NetworkWorker threads 
     ''' 
     for q in shared_globals.network_active_queues: 
      q.put(data) 

    def clean_exit(self): 
     ''' 
     Run when exiting the program for a clean exit 
     I don't think I actually call this in my example, 
     but upon main thread exit it would be a good idea to do so 
     ''' 
     for w in self.workers:  # Loop through all worker handlers 
      self.workers[w].stop() # Stop the jobs 

    # END DEF 

# END CLASS 

if __name__ == "__main__": 
    my_service = my_service() 

Geteilt Globals-Datei: shared_globals.py

#!/usr/bin/python 
# encoding=utf8 

''' 
This work, Python NetworkWorker Queue/Threading, is licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License. 
Written by John Minton @ http://pythonjohn.com/ 
To view a copy of this license, visit http://creativecommons.org/licenses/by-nc-sa/4.0/. 
''' 

def init(): 

    global network_active_queues 
    global cmd_active_queues 
    global cmd_q 

    # Keep track of the data going to the Network Worker Threads 
    print "Initalizing Network Active Queues" 
    network_active_queues = [] 

    # Keep track of the commands 
    print "Initalizing Command Active Queues" 
    cmd_active_queue = "" 

    # ? 
    #cmd_q = [] 

NetworkWorker Klasse: workerThread.py

#!/usr/bin/python 
# encoding=utf8 
''' 
This work, Python NetworkWorker Queue/Threading, is licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License. 
Written by John Minton @ http://pythonjohn.com/ 
To view a copy of this license, visit http://creativecommons.org/licenses/by-nc-sa/4.0/. 
''' 
import Queue 
import threading 

import httplib 
import urllib 
import json 

# wokerThread Dependancy 
# Add another queue list for HTTP/S Responses 
import shared_globals 

class NetworkWorker(threading.Thread): 

    def __init__(self): 
     ''' 
     Extend the Threading module 
     Start a new Queue for this instance of this class 
     Run the thread as a daemon 
     shared_globals is an external file for my globals between main script and this class. 
     Append this Queue to the list of Queue's in shared_globals.network_active_queues 
     Loop through shared_globals.network_active_queues to send data to all Queues that were started with this class 
     ''' 
     threading.Thread.__init__(self) 
     self.q = Queue.Queue() 
     self.q.daemon = True 
     shared_globals.network_active_queues.append(self.q) 
     # Init the queue for processing commands 

    def run(self): 
     ''' 
     Establish a persistant HTTP Connection 
     Pull data from the Queue 
     When data comes in, send it to the server 
     I send the response from the HTTP server to another queue/thread 
     You can do what you want to do with responses from the HTTP Server 
     ''' 
     # Set your headers 
     headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain", "Connection": "keep-alive"} # "Connection": "keep-alive" for persistance 
     # Init the presistant HTTP connection 
     http_request = httplib.HTTPConnection(self.url, int(self.port), timeout=int(self.timeout)) 
     # Init response_data 
     response_data = str() 
     # Start the loop 
     while True: 
      # The code waits here for the queue to have data. If no data, it just sleeps until you send it data via it's Queue. 
      data = self.q.get() 
      # .... When it gets data, we proceed with the data variable. 
      try: 
       http_request.request("POST", self.path, data, headers) 
       response = http_request.getresponse() 
       response_data = response.read() 
       # This is the response from the HTTP/S Server 
       print "Response: ", response_data 
      except Exception, e: 
       # In the event something goes wrong, we can simply try to reestablish the HTTP 
       print e, "Re-establishing HTTP/S Connection" 
       http_request = httplib.HTTPConnection(self.url, int(self.port), timeout=int(self.timeout)) 

      # If the HTTP transaction was successful, we will have our HTTP response data in response_data variable 
      if response_data: 
       # Try Except will fail on bad JSON object   
       try: 
        # Validate JSON & Convert from JSON to native Python Dict 
        json_data = json.loads(response_data) 

        # Send response from server to the command thread manager 
        shared_globals.cmd_active_queue.put(json_data) 

       except ValueError, e: 
        print "Bad Server Response: Discarding Invalid JSON" 
        # Repackage the invalid JSON, or some identifier thereof, and send to command processing thread 
        # Load into THIS NetworkWorker's thread queue a new data object to tell the server that there was malformed JSON and to resend the data. 
        #http_request.request("POST", self.path, data, headers) 
        #response = http_request.getresponse() 
        #response_data = response.read() 


     # Place this here for good measure, if we ever exit the while loop we will close the HTTP/S connection 
     http_request.close() 

    # END DEF 


    def set_server(self, url, port, timeout, path): 
     ''' 
     Use this to set the server for this class/thread instance 
     Variables that are passed in are translated to class instance variables (self) 
     ''' 
     self.url = url 
     self.port = port 
     self.timeout = timeout 
     self.path = path 
    # END DEF 


    def stop(self): 
     ''' 
     Stop this queue 
     Stop this thread 
     Clean up anything else as needed - tell other threads/queues to shutdown 
     ''' 
     shared_globals.network_active_queues.remove(self.q) 
     #self.q.put("shutdown") # Do we need to tell the threads to shutdown? Perhaps if reloading the config 
     self.join() 

    # END DEF 

# END CLASS 
Verwandte Themen