2017-11-08 1 views
0

Ich habe eine empfangene Event Handler von EventingBasicConsumer in RabbitMQ hinzugefügt. Ich versuche zu überprüfen, ob die Warteschlange verbraucht (verarbeitet und jetzt leer) ist, sollte es den Verbraucher und die Verbindung zu schließen. Ich kann die Bedingung nicht finden, die feststellen kann, ob die Warteschlange verarbeitet wird.Stop Rabbit MQ Consumer Event Wenn die Warteschlange leer ist

Bitte helfen

public void ProcessQueue(string queueName, Func<string, bool> ProcessMessage) 
    { 
     //lock (this.Model) 
     { 
      this.Model.BasicQos(0, 1, false); 
      EventingBasicConsumer consumer = new EventingBasicConsumer(this.Model); 

      consumer.Received += (model, ea) => 
      { 
       var body = ea.Body; 
       var message = Encoding.UTF8.GetString(body); 
       bool processed = ProcessMessage.Invoke(message); 
       if (processed) 
        this.SendAcknowledgement(ea.DeliveryTag); 
       else 
        this.StopProcessingQueue(consumer.ConsumerTag); 

       // Check if no message for next 2 minutes, 
       //  Stop Consumer and close connection 

      }; 

      this.Model.BasicConsume(queue: queueName, 
          autoAck: false, 
          consumer: consumer); 
     } 
    } 
+1

ist leer Wir haben eine Anwendung, die Länge der Warteschlange überwacht und automatisch dreht sich nach oben oder unten die Anzahl der Verbraucher . Vielleicht willst du so etwas? https://stackoverflow.com/questions/1038318/check-rabbitmq-queue-size-from-client –

Antwort

0

ich keine Eigenschaft finden könnte entweder so hatte einen Timer zu implementieren, die jedes Mal eine Meldung zurückgesetzt wird empfangen und falls die verstrichene Zeit mehr als die 2 Minuten können Sie ein Feuer Bereinigungsverfahren, das die Verbraucher und enge Verbindung

+0

Richtig, ich habe es jetzt auf eine andere Weise implementiert –

0

stoppt Wie ich keiner Weise Kaninchen MQ Verbraucher Ereignis zu stoppen, wenn die Warteschlange leer gefunden haben, ist, habe ich unter Methode implementiert Nachrichten zu verarbeiten, indem die Nachrichtenanzahl von API vorbei

„localhost:/api/queues“

Unten finden Sie die Funktion zur Verarbeitung von Nachrichten bis Warteschlange

/// <summary> 
/// (Recommanded) Processes the queue till the number of messages provided. 
/// Added to manage the load (process batches by batches) 
/// </summary> 
/// <param name="queueName">Name of the queue.</param> 
/// <param name="ProcessMessage">The process message.</param> 
/// <param name="count">The count.</param> 
public uint ProcessQueueByMessageCount(string queueName, Func<string, bool> HowToProcessMessage, uint messageCount) 
{ 
    uint messagesToProcess = messageCount; 
    using (var connect = this) 
    { 
     while (messageCount > 0) 
     { 
      BasicGetResult result = connect.Model.BasicGet(queueName, false); 
      bool processed = HowToProcessMessage.Invoke(System.Text.Encoding.UTF8.GetString(result.Body)); 
      if (processed) 
      { 
       this.SendAcknowledgement(result.DeliveryTag); 
       messageCount--; 
      } 
      else 
      { 
       connect.Model.BasicNack(result.DeliveryTag, false, true); 
       break; 
      } 
     } 
    } 
    return messagesToProcess - messageCount; 
} 
Verwandte Themen