2017-02-14 3 views
0

Ich entwickle eine Anwendung für Messaging mit Rabbit MQ. Ich verwende EventingBasicConsumer, Exchange und QueueBind. Zum Testen starte ich meine Anwendung zum Empfangen von Nachrichten und schalte dann meinen Ethernet-Controller aus. Nachdem ich es eingeschaltet habe, fangen die Nachrichten wieder an zu beginnen, aber ungefähr 500 Nachrichten waren verloren. Mein Code:RabbitMQ EventingBasicConsumer Nachrichten verlieren

private void DoWork() 
    { 
     try 
     { 
      var connection = ConnectionFactory.CreateConnection(); 
      IModel model = connection.CreateModel(); 

      // Configure the Quality of service for the model. Below is how what each setting means. 
      // BasicQos(0="Dont send me a new message untill I’ve finshed", 1= "Send me one message at a time", false ="Apply to this Model only") 
      model.BasicQos(0, 1, false); 

      model.ExchangeDeclare(Options.RabbitConnectionOptions.Exchange, RabbitConstants.ExchangeType, RabbitConstants.ExchangeDurable, RabbitConstants.ExchangeAutoDelete); 
      var queueDeclareOk = model.QueueDeclare("SomeSubsruberQueue3", RabbitConstants.QueueDurable, RabbitConstants.QueueExclusive, RabbitConstants.ExchangeAutoDelete); 


      var queueName = queueDeclareOk.QueueName; 

      foreach (var optionsBindingKey in Options.BindingKeys) 
      { 
       Logger.Debug($"QueueBind for {nameof(optionsBindingKey)}={optionsBindingKey}"); 
       model.QueueBind(queueName, Options.RabbitConnectionOptions.Exchange, optionsBindingKey); 
      } 


      var consumer = new EventingBasicConsumer(model); 

      consumer.Received += (ch, ea) => 
      { 
       try 
       { 
        var body = ea.Body; 
        var message = Encoding.UTF8.GetString(body); 
        var routingKey = ea.RoutingKey; 
        var messageToLog = $" [x] Received '{routingKey}':'{message}'";       

        Logger.Info(messageToLog); 
        Console.WriteLine($"receavedCount={++receavedCount}"); 

       } 
       catch (Exception e) 
       { 
        Console.WriteLine(e); 
        throw; 
       } 
      }; 

      model.BasicConsume(queueName, RabbitConstants.QueueAutoAck, consumer); 
     } 
     catch (Exception e) 
     { 
      Logger.Error(e); 
      Console.WriteLine(e); 
      throw; 
     } 
    } 

Wie kann ich Nachrichten verhindern zu verlieren?

P.s. hier das Protokoll, zeigt verlorene Nachrichten an. "Id" ist sequentiell, so u sehen können, dass Massagen mit id'S von 926 bis 1299

2017-02-14 10:52:01.4916 <Root><Subscribers><Subscriber Id="922" Name="1c" /> 
2017-02-14 10:52:01.4916 <Root><Subscribers><Subscriber Id="923" Name="1c" /> 
2017-02-14 10:52:01.4916 <Root><Subscribers><Subscriber Id="924" Name="1c" /> 
2017-02-14 10:52:01.5056 <Root><Subscribers><Subscriber Id="925" Name="1c" /> 
2017-02-14 10:52:22.5606 <Root><Subscribers><Subscriber Id="1300" Name="1c" /> 
2017-02-14 10:52:22.5606 <Root><Subscribers><Subscriber Id="1301" Name="1c" /> 
2017-02-14 10:52:22.5676 <Root><Subscribers><Subscriber Id="1302" Name="1c" /> 
2017-02-14 10:52:22.6046 <Root><Subscribers><Subscriber Id="1303" Name="1c" /> 

UPD verloren wurde:

wenn ich ändern:

model.BasicConsume(queueName,true, consumer); 

zu

model.BasicConsume(queueName,false, consumer); 

und verwenden Sie explizite ASK

((EventingBasicConsumer)ch).Model.BasicAck(ea.DeliveryTag, false); 

ich habe sehr seltsames Verhalten:

2017-02-14 13:06:35.9835| DeliveryTag=23, <Subscriber Id="778" Name="1c" /> 
2017-02-14 13:06:36.0295| DeliveryTag=24, <Subscriber Id="779" Name="1c" /> 
2017-02-14 13:06:57.3285| DeliveryTag=26, <Subscriber Id="782" Name="1c" /> 
2017-02-14 13:06:57.3755| DeliveryTag=27, <Subscriber Id="783" Name="1c" /> 

Es gibt keine DeliveryTag = 25!

+0

lose = seine Baggy, verlieren = verloren ..... – BugFinder

Antwort

2

Verwenden Sie nicht Auto-ACK. Lesen Sie eine Nachricht, tun Sie, was Sie damit machen wollen, und bestätigen Sie dann explizit mit ACK, wenn Sie damit fertig sind.

0

Ок dank Emil Vikström, muss ich ausdrücklich verwenden ASK:

model.BasicConsume(queueName,false, consumer); 

und nach der Verarbeitung Nachricht fragen:

((EventingBasicConsumer)ch).Model.BasicAck(ea.DeliveryTag, false); 

Frage über verlorene Nachricht: RabbitMQ Nachricht Neuordnen, wenn es um Rückkehr Warteschlange. So ist es nicht los, es wird nur später erhalten

Verwandte Themen