2017-11-19 4 views
1

Ich bin neu bei Nodejs, und was ich will ist, Daten aus der Datenbank zu lesen und zu berechnen. Um es schneller zu machen, verwende ich das Nodejs Cluster-Modul.NodeJS Cluster: Wie kann man Daten von Arbeitern im Master reduzieren?

gibt es Schlepptau globale Variablen: pairMap und NameSet, und ich zuteilen die Jobs Arbeiter in Master-Prozess, und sie tun einige Rechenwerke (die Karte und Satz zu ändern, genau wie die Karte-reduzieren)

scheint jedoch, dass die pairMap und nameSet nicht geändert und leer sind.(Code in der DoMasterAction) (eine andere seltsame Sache ist ich trösten die Daten, und es hat sich geändert, aber am Ende, zurück in den Master-Prozess leer).

die Daten wie folgt: (i die Hauptidee extrahieren):

const Promise = require('bluebird'); 
const cluster = require('cluster'); 
const numCPUs = require('os').cpus().length; 
const fs = Promise.promisifyAll(require('fs')) 

const utils = { 
    mergeMap:(source,dest)=>{ 
     for(let [key,value] of Object.entries(source)){ 
      if(!dest.has(key)) dest.set(key,value); 
      for(let [type,arr] of Object.entries(value)){ 
       const final = new Set([...dest.get[key][type],...arr]) 
       dest.get[key][type] = final; 
      } 
     } 
    } 
} 


/** 
* key: [email protected]||[email protected] 
* value: {to: [id1,id2,id3],cc,bcc} 
* @param row 
* @param map 
* @param nameSet 
*/ 
function countLinks(res,map,nameSet) { 
    nameSet.add(res); 
    map.set(res,{ 'test': Math.floor(Math.random()*10+1)}); 
} 


class hackingTeamPrepare { 

    constructor(bulk=100000,total = 1150000){ 
     this.bulk = bulk; 
     this.count = Math.ceil(total/this.bulk); 
     const parallelArr = new Array(this.count).fill(0).map((v,i)=> i); 
     this.jobs = parallelArr.map(v=> 'key'+v); 
     this.pairMap = new Map(); 
     this.nameSet = new Set(); 

     this.bindThis(); 
    } 

    bindThis(){ 
     this.doWorkerAction = this.doWorkerAction.bind(this); 
     this.doMasterAction = this.doMasterAction.bind(this); 
    } 

    doMasterAction() { 
     const workers = [],result = {}; 
     const self = this; 
     let count = 0,timeout; 

     for(let i=0;i<numCPUs;i++){ 
      const worker = cluster.fork(); 
      workers[i] = worker; 
     } 
     cluster.on('online', (worker) => { 
      worker.send(self.jobs.shift()); 
     }); 
     cluster.on('exit', function() { 
      if(self.jobs.length===0) return; 
      console.log('A worker process died, restarting...'); 
     }); 

     cluster.on('message',function (senderWorkder,info) { 
      const { workerId,jobIndex } = info; 
      result[jobIndex] = true; 
      console.log(`----worker ${workerId} done job: ${jobIndex}----`); 

      const finish = !self.jobs.length && Object.keys(result).length===self.count; 
      if(finish){ 
       // -----------------!!here!!--------------------------** 
       console.log('-------finished-------',self.pairMap,self.nameSet); // Map {}, Set {} 
       for(let id in cluster.workers){ 
        const curWorker = cluster.workers[id]; 
        curWorker.disconnect(); 
       } 
      }else{ 
       if(!self.jobs.length) return; 
       senderWorkder.send(self.jobs.shift()); 
      } 
     }) 
    } 



    /** 
    * {[person1,person2]: {to,cc,bcc}} 
    */ 
    doWorkerAction() { 
     //Process为worker, receive from master 
     const self = this; 
     process.on('message',(sql)=>{ 
      const jobPromise = Promise.resolve(sql).then(res => { 
        countLinks(res,self.pairMap,self.nameSet); 
        const data = { 
         workerId: process.pid, 
         jobIndex: sql, 
        } 

        // send to master 
        process.send(data); 
       }).catch(err=> { 
       console.log('-----query error----',err) 
      }); 
     }) 
    } 

    readFromPG(){ 
     if(cluster.isMaster){ 
      this.doMasterAction(); 
     }else if (cluster.isWorker){ 
      this.doWorkerAction(); 
     } 
    } 

    init(){ 
     this.readFromPG(); 
    } 
} 

const test = new hackingTeamPrepare(2,10); 
test.init(); 

jemand mir dabei helfen?

Ich habe versucht, Daten manuell in den Master-Prozess zusammenführen, aber die Daten von der worker.send gesendet scheint das Objekt darin zu ignorieren.

Antwort

0

Im Node.js-Cluster werden Objekte im Speicher nicht zwischen Master und Arbeitern geteilt.

pairMap und nameSet existieren getrennt in Master und in jedem Arbeiter. Wenn ein Arbeiter diese Objekte ändert, ändern sie sich im selben Arbeiter (Prozess), während sie im Meister und anderen Arbeitern unverändert bleiben.

Um Ihre Idee funktioniert, müssen Sie eine einzelne pairMap und einen einzigen nameSet innerhalb des Master-Prozess erhalten, Nachrichten senden und alle Daten, die Sie benötigen von Arbeitnehmern enthalten zu meistern, und aktualisieren diese Objekte die empfangenen Daten.

Beachten Sie, dass Sie kein Objekt als Nachricht vom Worker zum Master übergeben können. Wenn Sie etwas komplexe Daten benötigen, müssen Sie einfache JavaScript-Objekte (Schlüssel/Wert-Paare) senden. Zum Beispiel, wenn Sie eine Map Instanz von Arbeitnehmer-Master senden müssen, finden Sie die folgenden Funktionen genommen von here:

// source - http://2ality.com/2015/08/es6-map-json.html 
function mapToJson(map) { 
    return JSON.stringify([...map]); 
} 
function jsonToMap(jsonStr) { 
    return new Map(JSON.parse(jsonStr)); 
} 

// send message using this example: 
process.send(mapToJson(pairMap)); 

// receive message: 
worker.on('message', message => console.log(jsonToMap(message))) 
+0

Ja, ich zu dieser Lösung einige Zeit später geändert, und ich fand, dass, wenn ich Daten senden von Worker zu meistern, sollten die Daten zusammen mit der Karte oder Set-Typ nicht enthalten (sie alle in leer geändert {}) .. verstehe nicht warum. – luchen

+0

und es scheint, dass ich den Cluster in Webstorm nicht debuggen kann? – luchen

+0

Siehe meine aktualisierte Antwort –

Verwandte Themen