2017-02-27 5 views
0

Ich versuche, alle Nachrichten aus der rabbitMQ-Warteschlange zu bekommen.Erhalten Sie alle Nachrichten von rabbitMQ

const messages = await rabbit.getMessages(outputQueue, false); 

Hier ist die Realisierung der getMessages-Methode. Das Problem ist, dass es nur 3-5 Nachrichten verarbeitet und "Entschlossenheit" aufruft. Einige Zeit später verarbeitet es Restnachrichten, aber die "Auflösung" wurde bereits aufgerufen und kann nicht erneut ausgeführt werden.

const amqp = require('amqplib'); 
. 
. 
let amqpUrl; 
let queueConf; 

const init = (connection, queue) => { 
    amqpUrl =`amqp://${connection.user}:${connection.password}@${connection.host}:${connectio n.port}`; 
    if (connection.vhost) { 
amqpUrl = `amqp://${connection.user}:${connection.password}@${connection.host}:${connection.port}/${connection.vhost}`; 
    } 
    queueConf = queue; 
} 

const getChannel =() => new Promise((resolve) => { 
    amqp.connect(amqpUrl).then((conn) => { 
    conn.createChannel().then((ch) => { 
     ch.prefetch(1000).then(() => resolve(ch)) 
    }) 
    }) 
}) 

module.exports = (connection, queue) => { 
    init(connection, queue); 
    return { 
    getMessages: (queueName, cleanQueue) => new Promise((resolve) => { 
     let messages = []; 
     let i = 1; 
     getChannel().then((ch) => { 
     ch.consume(queueName, (msg) => { 
      messages.push(msg); 
      console.log(msg.content.toString()) 
     }, { noAck: cleanQueue }).then(() => { 
      logger.info(`Retreived ${messages.length} messages from ${queueName}`); 
      resolve(messages) 
     }) 
     }) 
    }) 
    . 
    . 
    }; 
    }; 

Vielen Dank im Voraus!

+1

Beweis dafür, dass Versprechungen nicht für alles eine Lösung gibt - und verspricht auf jeden Fall passen nicht für diese Art der Sache –

Antwort

1

Sie können es so machen, aber es wird sehr langsam und wird möglicherweise nie auflösen, wenn Nachrichten der Warteschlange schneller hinzugefügt werden, als Sie sie verbrauchen können. Im Wesentlichen zu einer Zeit erhalte immer Sie eine Nachricht bis channel.get() mit false anstelle eines Nachrichtenobjekt löst:

getMessages: (queueName, cleanQueue) => { 
    let messages = [] 
    let i = 1 
    return getChannel().then(function getMessage (ch) { 
    return ch.get(queueName, { noAck: cleanQueue }).then((msg) => { 
     if (msg) { 
     messages.push(msg) 
     return getMessage(ch) 
     } else { 
     logger.info(`Retrieved ${messages.length} messages from ${queueName}`) 
     return messages 
     } 
    }) 
    }).catch((err) => { 
    err.consumedMessages = messages 
    return Promise.reject(err) 
    }) 
} 
Verwandte Themen