2017-08-26 6 views
2

Ich verwende PThreads für Multi-Threading in PHP. Ich habe es erfolgreich installiert und auf meinem XAMPP-Server unter Windows ausgeführt. Ich habe 100K Datensätze in der Datenbank, und ich möchte 20 Fäden in parallel.Every Thread ausgeführt wird 5k Datensatz aus der Datenbank aufrufen und Prozess them.Here mein Code ist dieses fürPthread für PHP Ausgabe

require('mailscript.php'); 
class My extends Thread{ 

    function __construct() { 
     $this->mailscript = new mailscript(); 
    } 

    function run(){ 
     $this->mailscript->runMailScript(5000); 
    } 
} 

for($i=0;$i<20;$i++){ 
    $pool[] = new My(); 
} 

foreach($pool as $worker){ 
    $worker->start(); 
} 
foreach($pool as $worker){ 
    $worker->join(); 
} 

Wenn ich diesen Code ausführen es nur Führen Sie maximal 600 Datensätze pro Thread aus. Gibt es ein Begrenzungsproblem für die Anzahl der Threads in PThreads. Was ist das Problem, bitte helfen Sie mir

Antwort

1

hie, hier ist, wie ich pthreads mit Ihrem Beispiel mit einer Sammlungsmethode umgehen würde, wenn notwendig. Hoffe das wird dir helfen.

/*pthreads batches */ 
$batches = array(); 

$nbpool = 20; // cpu 10 cores 

/* job 1 */ 
$list = [/* data1 */]; 
$batches[] = array_chunk($list, 5000); 

/* job 2 */ 
$list2 = [/* data2 */]; 
$batches[] = array_chunk($list2, 10000); 

/*final collected results*/ 
$resultFinal = []; 

/* loop across batches */ 
foreach ($batches as $key => $chunks) { 

    /* for intermediate collection */ 
    $data[$key] = []; 

    /* how many workers */ 
    $workCount = count($chunks); 

    /* set pool job up to max cpu capabilities */ 
    $pool = new Pool($nbpool, Worker::class); 

    /* pool cycling submit */ 
    foreach (range(1, $workCount) as $i) { 
     $chunck = $chunks[$i - 1]; 
     $pool->submit(new processWork($chunck)); 
    } 

    /* on collection cycling */ 
    $collector = function (\Collectable $work) use (&$data) { 

     /* is worker complete ? */ 
     $isGarbage = $work->isGarbage(); 

     /* worker complete */ 
     if ($isGarbage) { 
      $data[$key] = $work->result; 
     } 
     return $isGarbage; 
    }; 
    do { 
     /* collection on pool stack */ 
     $count = $pool->collect($collector); 
     $isComplete = count($data) === $workCount; 
    } while (!$isComplete); 

    /* push stack results */ 
    array_push($resultFinal, $data); 

    /* close pool */ 
    $pool->shutdown(); 
} 

class processWork extends \Threaded implements \Collectable { 

    private $isGarbage; 
    private $process; 
    public $result; 

    public function __construct($process) { 
     $this->process = $process; 
    } 

    public function run() { 
     $workerDone = array(); 
     foreach ($this->process as $k => $el) { 
      /* whatever stuff with $this->process */ 
     } 
     $this->result = $workerDone; 
     $this->isGarbage = true; // yeah, it s done 
    } 

    public function isGarbage(): bool { 
     return $this->isGarbage; 
    } 
}