2017-03-15 4 views
1

Ich startet gleichzeitige Threads ein paar Sachen zu tun:Wie implementiert man eine dynamische Menge gleichzeitiger Threads?

concurrent = 10 
q = Queue(concurrent * 2) 
for j in range(concurrent): 
    t = threading.Thread(target=doWork) 
    t.daemon = True 
    t.start() 
try: 
    # process each line and assign it to an available thread 
    for line in call_file: 
     q.put(line) 
    q.join() 
except KeyboardInterrupt: 
    sys.exit(1) 

Zur gleichen Zeit habe ich eine deutliche Thread Zählzeit haben:

def printit(): 
    threading.Timer(1.0, printit).start() 
    print current_status 

printit() 

Ich möchte (oder Abnahme) der Menge an gleichzeitig erhöhen Threads für den Hauptprozess sagen wir jede Minute. Ich kann im Zeitthread einen Zeitzähler erstellen und Dinge jede Minute ausführen lassen, aber wie kann ich die Anzahl gleichzeitiger Threads im Hauptprozess ändern?

Ist es möglich (und wenn ja wie), das zu tun?

Antwort

1

Das ist mein Arbeiter:

def UpdateProcesses(start,processnumber,CachesThatRequireCalculating,CachesThatAreBeingCalculated,CacheDict,CacheLock,IdleLock,FileDictionary,MetaDataDict,CacheIndexDict): 
NewPool() 
while start[processnumber]: 
    IdleLock.wait() 
    while len(CachesThatRequireCalculating)>0 and start[processnumber] == True: 
     CacheLock.acquire() 
     try: 
      cacheCode = CachesThatRequireCalculating[0] # The list can be empty if an other process takes the last item during the CacheLock 
      CachesThatRequireCalculating.remove(cacheCode) 
      print cacheCode,"starts processing by",processnumber,"process" 
     except: 
      CacheLock.release() 
     else: 
      CacheLock.release() 
      CachesThatAreBeingCalculated.append(cacheCode[:3]) 
      Array,b,f = TIPP.LoadArray(FileDictionary[cacheCode[:2]])#opens the dask array 
      Array = ((Array[:,:,CacheIndexDict[cacheCode[:2]][cacheCode[2]]:CacheIndexDict[cacheCode[:2]][cacheCode[2]+1]].compute()/2.**(MetaDataDict[cacheCode[:2]]["Bit Depth"])*255.).astype(np.uint16)).transpose([1,0,2]) #slices and calculates the array 
      f.close() #close the file 
      if CachesThatAreBeingCalculated.count(cacheCode[:3]) != 0: #if not, this cache is not needed annymore (the cacheCode is removed bij wavelengthchange) 
       CachesThatAreBeingCalculated.remove(cacheCode[:3]) 
       try: #If the first time the object if not aivalable try a second time 
        CacheDict[cacheCode[:3]] = Array 
       except: 
        CacheDict[cacheCode[:3]] = Array 
       print cacheCode,"done processing by",processnumber,"process" 
    if start[processnumber]: 
     IdleLock.clear() 

Dies ist, wie ich sie starten:

self.ProcessLst = [] #list with all the processes who calculate the caches 
    for processnumber in range(min(NumberOfMaxProcess,self.processes)): 
     self.ProcessTerminateLst.append(True) 
    for processnumber in range(min(NumberOfMaxProcess,self.processes)): 
     self.ProcessLst.append(process.Process(target=Proc.UpdateProcesses,args=(self.ProcessTerminateLst,processnumber,self.CachesThatRequireCalculating,self.CachesThatAreBeingCalculated,self.CacheDict,self.CacheLock,self.IdleLock,self.FileDictionary,self.MetaDataDict,self.CacheIndexDict,))) 
     self.ProcessLst[-1].daemon = True 
     self.ProcessLst[-1].start() 

ich sie so schließen:

for i in range(len(self.ProcessLst)): #For both while loops in the processes self.ProcessTerminateLst[i] must be True. So or the process is now ready to be terminad or is still in idle mode. 
     self.ProcessTerminateLst[i] = False 

    self.IdleLock.set() #Makes sure no process is in Idle and all are ready to be terminated 
1

Ich würde einen Pool verwenden. Ein Pool hat eine maximale Anzahl von Threads, die er gleichzeitig verwendet, aber Sie können eine Inf-Anzahl von Jobs anwenden. Sie bleiben auf der Warteliste, bis ein Thread verfügbar ist. Ich glaube nicht, dass Sie die Anzahl der aktuellen Prozesse im Pool ändern können.

+0

ich einen Schlaf in meinen Jobs verwenden könnte und verringern oder erhöhen Sie diesen Wert, um mehr oder weniger Jobs zu haben, aber was ich will, ist die Menge der gleichzeitigen Threads zu ändern (da jeder Thread ac sein wird oncurrent connection, ich schreibe einen Ladesimulator für einen Server und ich muss das Aufwärmen des Caches simulieren), also funktionieren diese Tricks nicht in meinem Fall –

+0

Ich habe einen ähnlichen Fall, wo ich einen Manager verwende, um lst zwischen zu verwalten Prozesse und a haben while-Schleifen in meinen Threads. Also starte ich einfach eine x Anzahl von Threads und wenn ich sie schließen will, mache ich die Variable in der while-Schleife True und die Threads werden beendet. Diese Variable befindet sich in einer Managerliste, sodass Sie sie im Hauptthread ändern können. Ich kann dir meinen Code zeigen, um dir eine Idee zu geben, wie ich es gelöst habe, aber ich bin neu in diesem Forum und weiß nicht wie. –

+0

hmmm ... sieht aus wie Sie können nur verringern, nicht erhöhen ... und ich benutze globale Variablen, so kann ich Staaten zwischen Threads teilen ^^ –

Verwandte Themen