Ich werde das hier für andere, die wie ich, suchen, um ihre Python Verständnis zu erweitern. Ich brauchte eine Weile, um herauszufinden, wie ich dieses Problem angehen sollte, aber die Lösung wurde klargestellt, nachdem ich mit einem Kollegen gesprochen hatte, der diese Art von Problem verstand.
Kurz gesagt, die Antwort, die für mich funktionierte, verwendete native Threading- und Queue-Module von Python 2.7.x.
Ich habe dies mein Hauptprogramm, das die verschiedenen Threads und Warteschlangen verwaltet, die ich eingerichtet habe. Die NetworkWorker-Klasse, die das Threading-Modul erweitert, spinnt bei der Initialisierung auch neue Queues für jede Instanz. Die Warteschlangenreferenz/der Handler wird in einer globalen Listenvariablen gespeichert. Ich durchlaufe einfach die Queue-Liste und sende Daten an jede Thread-Queue in meinem Haupt-Thread (main.py). Dann bekommt jeder Thread seine Daten und tut was er soll. Daten, die von jeder HTTP-Verbindung zurück empfangen werden, werden in eine andere Warteschlange geladen, die von einem einzelnen Befehlsausführungsthread in main.py verarbeitet wird.
Der folgende Code wurde aus dem ursprünglichen Kontext geändert/extrahiert. Ich habe es getestet und es funktioniert einwandfrei, solange Sie die Server in der Datei "self.conf" korrekt konfigurieren, die sich in main.py> mein_service>init befindet, und die Serverantwort mit gültigem JSON. Ehrlich gesagt könnte es etwas sauber machen. Um sicherzustellen, dass der Code öffentlich und zugänglich bleibt, habe ich eine Creative Commons-Lizenz hinzugefügt. Jeder, der glaubt, dass dieser Code dem eigenen Code ähnelt, kann sich mit mir in Verbindung setzen, um eine korrekte Zuordnung zu erhalten.
Mit Ausnahme von main.py sind die Namen der anderen 2 Dateien wichtig. shared_globals.py und workerThread.py Dateinamen Groß- und Kleinschreibung und müssen im selben Ordner wie main.py
Haupt ausführbar sein: main.py
#!/usr/bin/python
# encoding=utf8
from time import sleep, time
import subprocess, sys, os # used to get IP, system calls, etc
import json
# For web support
import httplib
import urllib
import zlib
import base64
# wokerThread Dependancy
import shared_globals
from workerThread import NetworkWorker
import Queue
import threading
'''
This work, Python NetworkWorker Queue/Threading, is licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.
Written by John Minton @ http://pythonjohn.com/
To view a copy of this license, visit http://creativecommons.org/licenses/by-nc-sa/4.0/.
'''
class my_service:
# * * * *
def __init__(self):
# Manually list off the servers I want to talk to
self.conf = {}
self.conf['servers'] = {}
self.conf['servers']['ServerName1'] = {}
self.conf['servers']['ServerName1']['protocol'] = "http"
self.conf['servers']['ServerName1']['url'] = "server.com"
self.conf['servers']['ServerName1']['port'] = "80"
self.conf['servers']['ServerName1']['path'] = "/somefile.php"
self.conf['servers']['ServerName1']['timeout'] = "10" # Seconds. Make sure this is long enough for your largest OR mission critical HTTP/S transactions to finish + time it takes to wait for your data to come into your persistant HTTP/S thread. Data comes in every 2 seconds, so 5-10 seconds should be fine. Anything that takes too long will cause the queue to back up too much.
self.conf['servers']['ServerName2'] = {}
self.conf['servers']['ServerName2']['protocol'] = "http"
self.conf['servers']['ServerName2']['url'] = "otherserver.net"
self.conf['servers']['ServerName2']['port'] = "80"
self.conf['servers']['ServerName2']['path'] = "/dataio.php"
self.conf['servers']['ServerName2']['timeout'] = "5"
# Start the Threading Manager, which will manage the various threads and their components
# All cross thread communication needs to be managed with Queues
self.threadManager()
def threadManager(self):
# A place to reference all threads
self.threads = []
print "Loading Shared Globals"
# This is the 3rd file in this project. I would not need this if
# the NetworkWorker Thread was inside of this same file. But since it
# is in another file, we use this shared_globals file to make the Queue's
# list and other shared resources available between the main thread and the NetworkWorker Threads
shared_globals.init()
# Keep track of all the threads/classes we are initializing
self.workers = {} # Keep track of all the worker threads
print "Initalizing Network Worker Threads from Config"
# For each server we want to talk to, we start a worker thread
# Read servers from self.conf and init threads/workers
for t in self.conf['servers']: # Loop through servers in config
# T = server name
#print "T: ", self.conf['servers'][t]
self.workers[t] = NetworkWorker() # Save worker handlers to workers dict
# Set the server data for each NetworkWorker Thread
self.workers[t].set_server(self.conf['servers'][t]['url'], self.conf['servers'][t]['port'], self.conf['servers'][t]['timeout'], self.conf['servers'][t]['path'])
print "Initalizing Command Processing Queue"
cmd_q = Queue.Queue()
cmd_q.daemon = True
shared_globals.cmd_active_queue = cmd_q
print "Starting Command Processing thread"
# Start the data gathering thread
t_cmd = threading.Thread(target=self.command_que_thread_manager)
t_cmd.daemon = True
self.threads.append(t_cmd)
t_cmd.start()
print "Start Data Gathering thread"
# Start the data gathering thread
t = threading.Thread(target=self.data_collector_thread)
t.daemon = True
self.threads.append(t)
t.start()
print "Starting Worker threads"
for w in self.workers: # Loop through all worker handlers
self.workers[w].start() # Start the jobs
# We have our NetworkWorker Threads running, and they init their own queues which we
# send data to using the def below titled self.send_data_to_networkWorkers
print "Service Started\n\n\n"
# This keeps the main thread listening so you can perform actions like killing the application with CTRL+C
while threading.active_count() > 0:
try:
sleep(0.1)
except (KeyboardInterrupt, SystemExit): # Exits the main thread without complainnt!
print "\n"
os._exit(0)
os._exit(0)
def data_collector_thread(self):
'''
Gather all the data we want to send to each server
Send data to the queues for each NetworkWorker thread we init'd above
'''
# Loop indefinately
while True:
# Gather your data and load into data Dict
data = {"data":"values"}
print "\n\nData to be sent to all NetworkWorker threads: ", data, "\n\n"
# Prep the data for HTTP/S
# If you need to do something else with the data besides sending it to the threads, do it here
data = self.prep_data_for_HTTP(data) # Do any pre-HTTP/S processing here
self.send_data_to_networkWorkers(data) # Send the data out to all the Threads Queue's
sleep(2) # wait for a little bit and then iterate through the loop again. This is your main loop timer.
def prep_data_for_HTTP(self, data):
'''
I am converting my data from a python dict to a JSON Starting
I compress the JSON Starting
I load the compressed string into another dict, as the HTTP/S object (in the NetworkWorker thread) expects a DICT
URL encode the data for HTTP/S POST transit
Return the manipulated data object, now ready for HTTP/S
'''
data = json.dumps(data, encoding='utf8') # Now continue preparing for HTTP/S
data = zlib.compress(data, 8)
# In PHP, get the data from the $_POST['data'] key
data = {"data":data}
data = urllib.urlencode(data)
return data
# END DEF
def command_que_thread_manager(self):
'''
Run as a thread
Send data to this thread via it's queue, init'd above in thread Manager
Grabs data, and then does something to process it
'''
while True:
data = shared_globals.cmd_active_queue.get()
print "Processing Command: ", data
# END DEF
def send_data_to_networkWorkers(self,data):
'''
Send data to all the NetworkWorker threads
'''
for q in shared_globals.network_active_queues:
q.put(data)
def clean_exit(self):
'''
Run when exiting the program for a clean exit
I don't think I actually call this in my example,
but upon main thread exit it would be a good idea to do so
'''
for w in self.workers: # Loop through all worker handlers
self.workers[w].stop() # Stop the jobs
# END DEF
# END CLASS
if __name__ == "__main__":
my_service = my_service()
Geteilt Globals-Datei: shared_globals.py
#!/usr/bin/python
# encoding=utf8
'''
This work, Python NetworkWorker Queue/Threading, is licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.
Written by John Minton @ http://pythonjohn.com/
To view a copy of this license, visit http://creativecommons.org/licenses/by-nc-sa/4.0/.
'''
def init():
global network_active_queues
global cmd_active_queues
global cmd_q
# Keep track of the data going to the Network Worker Threads
print "Initalizing Network Active Queues"
network_active_queues = []
# Keep track of the commands
print "Initalizing Command Active Queues"
cmd_active_queue = ""
# ?
#cmd_q = []
NetworkWorker Klasse: workerThread.py
#!/usr/bin/python
# encoding=utf8
'''
This work, Python NetworkWorker Queue/Threading, is licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.
Written by John Minton @ http://pythonjohn.com/
To view a copy of this license, visit http://creativecommons.org/licenses/by-nc-sa/4.0/.
'''
import Queue
import threading
import httplib
import urllib
import json
# wokerThread Dependancy
# Add another queue list for HTTP/S Responses
import shared_globals
class NetworkWorker(threading.Thread):
def __init__(self):
'''
Extend the Threading module
Start a new Queue for this instance of this class
Run the thread as a daemon
shared_globals is an external file for my globals between main script and this class.
Append this Queue to the list of Queue's in shared_globals.network_active_queues
Loop through shared_globals.network_active_queues to send data to all Queues that were started with this class
'''
threading.Thread.__init__(self)
self.q = Queue.Queue()
self.q.daemon = True
shared_globals.network_active_queues.append(self.q)
# Init the queue for processing commands
def run(self):
'''
Establish a persistant HTTP Connection
Pull data from the Queue
When data comes in, send it to the server
I send the response from the HTTP server to another queue/thread
You can do what you want to do with responses from the HTTP Server
'''
# Set your headers
headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain", "Connection": "keep-alive"} # "Connection": "keep-alive" for persistance
# Init the presistant HTTP connection
http_request = httplib.HTTPConnection(self.url, int(self.port), timeout=int(self.timeout))
# Init response_data
response_data = str()
# Start the loop
while True:
# The code waits here for the queue to have data. If no data, it just sleeps until you send it data via it's Queue.
data = self.q.get()
# .... When it gets data, we proceed with the data variable.
try:
http_request.request("POST", self.path, data, headers)
response = http_request.getresponse()
response_data = response.read()
# This is the response from the HTTP/S Server
print "Response: ", response_data
except Exception, e:
# In the event something goes wrong, we can simply try to reestablish the HTTP
print e, "Re-establishing HTTP/S Connection"
http_request = httplib.HTTPConnection(self.url, int(self.port), timeout=int(self.timeout))
# If the HTTP transaction was successful, we will have our HTTP response data in response_data variable
if response_data:
# Try Except will fail on bad JSON object
try:
# Validate JSON & Convert from JSON to native Python Dict
json_data = json.loads(response_data)
# Send response from server to the command thread manager
shared_globals.cmd_active_queue.put(json_data)
except ValueError, e:
print "Bad Server Response: Discarding Invalid JSON"
# Repackage the invalid JSON, or some identifier thereof, and send to command processing thread
# Load into THIS NetworkWorker's thread queue a new data object to tell the server that there was malformed JSON and to resend the data.
#http_request.request("POST", self.path, data, headers)
#response = http_request.getresponse()
#response_data = response.read()
# Place this here for good measure, if we ever exit the while loop we will close the HTTP/S connection
http_request.close()
# END DEF
def set_server(self, url, port, timeout, path):
'''
Use this to set the server for this class/thread instance
Variables that are passed in are translated to class instance variables (self)
'''
self.url = url
self.port = port
self.timeout = timeout
self.path = path
# END DEF
def stop(self):
'''
Stop this queue
Stop this thread
Clean up anything else as needed - tell other threads/queues to shutdown
'''
shared_globals.network_active_queues.remove(self.q)
#self.q.put("shutdown") # Do we need to tell the threads to shutdown? Perhaps if reloading the config
self.join()
# END DEF
# END CLASS