2017-05-26 1 views
0

Ich führte einige Testcode wie unten, um die Leistung der Verwendung von Pool und Prozess in Linux zu überprüfen. Ich benutze Python 2.7. Der Quellcode von multiprocessing.Pool scheint zu zeigen, dass es multiprocessing.Process verwendet. Multiprocessing.Pool kostet jedoch viel Zeit und Mem als gleiche Anzahl von Multiprocessing.Process, und ich bekomme das nicht.Warum multiprocessing.Pool und multiprocessing.Process in Linux so anders ausführen

Hier ist, was ich getan habe:

  1. Erstellen Sie eine große dict und dann Teilprozesse.

  2. Übergeben Sie das Diktat für jeden Teilprozess schreibgeschützt.

  3. Jeder Unterprozess führt einige Berechnungen durch und gibt ein kleines Ergebnis zurück.

Unten ist der mein Testcode:

from multiprocessing import Pool, Process, Queue 
import time, psutil, os, gc 

gct = time.time 
costTime = lambda ET: time.strftime('%H:%M:%S', time.gmtime(int(ET))) 

def getMemConsumption(): 
    procId = os.getpid() 
    proc = psutil.Process(procId) 
    mem = proc.memory_info().rss 
    return "process ID %d.\nMemory usage: %.6f GB" % (procId, mem*1.0/1024**3) 

def f_pool(l, n, jobID): 
    try: 
     result = {} 
     # example of subprocess work 
     for i in xrange(n): 
      result[i] = l[i] 
     # work done 
     # gc.collect() 
     print getMemConsumption() 
     return 1, result, jobID 
    except: 
     return 0, {}, jobID 

def f_proc(q, l, n, jobID): 
    try: 
     result = {} 
     # example of subprocess work 
     for i in xrange(n): 
      result[i] = l[i] 
     # work done 
     print getMemConsumption() 
     q.put([1, result, jobID]) 
    except: 
     q.put([0, {}, jobID]) 

def initialSubProc(targetFunc, procArgs, jobID): 
    outQueue = Queue() 
    args = [outQueue] 
    args.extend(procArgs) 
    args.append(jobID) 
    p = Process(target = targetFunc, args = tuple(args)) 
    p.start() 
    return p, outQueue 


def track_add_Proc(procList, outQueueList, maxProcN, jobCount, 
        maxJobs, targetFunc, procArgs, joinFlag, all_result): 
    if len(procList) < maxProcN: 
     p, q = initialSubProc(targetFunc, procArgs, jobCount) 
     outQueueList.append(q) 
     procList.append(p) 
     jobCount += 1 
     joinFlag.append(0) 
    else: 
     for i in xrange(len(procList)): 
      if not procList[i].is_alive() and joinFlag[i] == 0: 
       procList[i].join() 
       all_results.append(outQueueList[i].get()) 
       joinFlag[i] = 1 # in case of duplicating result of joined subprocess 
       if jobCount < maxJobs: 
        p, q = initialSubProc(targetFunc, procArgs, jobCount) 
        procList[i] = p 
        outQueueList[i] = q 
        jobCount += 1 
        joinFlag[i] = 0 
    return jobCount 

if __name__ == '__main__': 
    st = gct() 
    d = {i:i**2 for i in xrange(10000000)} 
    print "MainProcess create data dict\n%s" % getMemConsumption() 
    print 'Time to create dict: %s\n\n' % costTime(gct()-st) 

    nproc = 2 
    jobs = 8 
    subProcReturnDictLen = 1000 
    procArgs = [d, subProcReturnDictLen] 

    print "Use multiprocessing.Pool, max subprocess = %d, jobs = %d\n" % (nproc, jobs) 
    st = gct() 
    pool = Pool(processes = nproc) 
    for i in xrange(jobs): 
     procArgs.append(i) 
     sp = pool.apply_async(f_pool, tuple(procArgs)) 
     procArgs.pop(2) 
     res = sp.get() 
     if res[0] == 1: 
      # do something with the result 
      pass 
     else: 
      # do something with subprocess exception handle 
      pass 
    pool.close() 
    pool.join() 
    print "Total time used to finish all jobs: %s" % costTime(gct()-st) 
    print "Main Process\n", getMemConsumption(), '\n' 

    print "Use multiprocessing.Process, max subprocess = %d, jobs = %d\n" % (nproc, jobs) 
    st = gct() 
    procList = [] 
    outQueueList = [] 
    all_results = [] 
    jobCount = 0 
    joinFlag = [] 
    while (jobCount < jobs): 
     jobCount = track_add_Proc(procList, outQueueList, nproc, jobCount, 
            jobs, f_proc, procArgs, joinFlag, all_results) 
    for i in xrange(nproc): 
     if joinFlag[i] == 0: 
      procList[i].join() 
      all_results.append(outQueueList[i].get()) 
      joinFlag[i] = 1 
    for i in xrange(jobs): 
     res = all_results[i] 
     if res[0] == 1: 
      # do something with the result 
      pass 
     else: 
      # do something with subprocess exception handle 
      pass 
    print "Total time used to finish all jobs: %s" % costTime(gct()-st) 
    print "Main Process\n", getMemConsumption() 

Hier ist das Ergebnis:

MainProcess create data dict 
process ID 21256. 
Memory usage: 0.841743 GB 
Time to create dict: 00:00:02 


Use multiprocessing.Pool, max subprocess = 2, jobs = 8 

process ID 21266. 
Memory usage: 1.673084 GB 
process ID 21267. 
Memory usage: 1.673088 GB 
process ID 21266. 
Memory usage: 2.131172 GB 
process ID 21267. 
Memory usage: 2.131172 GB 
process ID 21266. 
Memory usage: 2.176079 GB 
process ID 21267. 
Memory usage: 2.176083 GB 
process ID 21266. 
Memory usage: 2.176079 GB 
process ID 21267. 
Memory usage: 2.176083 GB 

Total time used to finish all jobs: 00:00:49 
Main Process 
process ID 21256. 
Memory usage: 0.843079 GB 


Use multiprocessing.Process, max subprocess = 2, jobs = 8 

process ID 23405. 
Memory usage: 0.840614 GB 
process ID 23408. 
Memory usage: 0.840618 GB 
process ID 23410. 
Memory usage: 0.840706 GB 
process ID 23412. 
Memory usage: 0.840805 GB 
process ID 23415. 
Memory usage: 0.840900 GB 
process ID 23417. 
Memory usage: 0.840973 GB 
process ID 23419. 
Memory usage: 0.841061 GB 
process ID 23421. 
Memory usage: 0.841152 GB 

Total time used to finish all jobs: 00:00:00 
Main Process 
process ID 21256. 
Memory usage: 0.843781 GB 

Ich weiß nicht, warum Subprozesse von multiprocessing.Pool über 1.6GB müssen in der Anfang, aber Subprozess von Multiprocessing.Process benötigt nur 0,84 GBs, was den Speicherkosten des Hauptprozesses entspricht. Es scheint mir, dass nur multiprocessing.Process den "copy-on-write" Vorteil von Linux genießt, da die Zeit für alle Jobs weniger als 1s beträgt. Ich weiß nicht warum Multiprocessing.Pool das nicht gefällt. Aus dem Quellcode scheint multiprocessing.Pool wie ein Wrapper von multiprocessing.Process zu sein.

Antwort

0

Frage: Ich weiß nicht, warum Subprozesse von multiprocessing.Pool über 1.6GB müssen am Anfang,
... scheint Pool wie ein Wrapper von multiprocessing.Process

Diese ist, alsSpeicher für die Ergebnisse für alle Jobs.
Zweitens verwendet PoolzweiSimpleQueue() und dreiThreads.
Dritte, Duplikat alle bestanden argv Daten vor dem Passieren bis zu einem process.

Ihr process Beispiel Verwendung nur einQueue() für alle, vorbei argv wie sie sind.

Pool ist weit weg, nur ein Wrapper zu sein.

+0

Vielen Dank. Es macht jetzt Sinn, zwei Warteschlangen zu verwenden, um sowohl das Ergebnis als auch die Eingabe umzukehren. Nur eine kleine Korrektur, 'Pool' verwendet' multiprocessing.Queue' anstelle von 'multiprocessing.Manager(). Queue'. – Finix

Verwandte Themen