2017-02-07 5 views
1

Ich implementiere Active MQ Publisher und Abonnenten in C#. Ich verwende die Apache.NMS.ActiveMQ .net Client-Bibliothek, um mit dem Broker zu kommunizieren.Unbestätigte Nachricht auf ActiveMQ mit Failover-Setup wird nicht erneut an Abonnent

<package id="Apache.NMS" version="1.7.1" targetFramework="net461" /> 
    <package id="Apache.NMS.ActiveMQ" version="1.7.2" targetFramework="net461" /> 

ActiveMQ mit einem Failover-Setup auf 4 Servern konfiguriert ist (.225, .226, .346, .347 - letzte Teile IP als Referenz). Der Broker URL sieht etwa so

Failover: // tcp: //103.24.34.225: 61616, tcp: //103.24.34.226: 61616, tcp: //103.24.34.346: 61616, tcp: // 103.24.34.347:61616

Hier ist, wie ich die Veröffentlichung

var brokerUrl = "failover://tcp://103.24.34.225:61616,tcp://103.24.34.226:61616,tcp://103.24.34.346:61616,tcp://103.24.34.347:61616"; 
    var connectionFactory = new ConnectionFactory(brokerUrl); 
    using (var connection = connectionFactory.CreateConnection("conn1", "conn$34")) 
    { 
     connection.ClientId = "TESTPUBLISHER"; 
     connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge; 
     connection.Start(); 

     var session = connection.CreateSession(); 
     var topic = new ActiveMQTopic("ACCOUNT.UPDATE"); 
     var producer = session.CreateProducer(topic); 


     var msg = "43342_test"; //DateTime.Now.ToString("yyyyMdHHmmss_fff") + "-TEST"; 
     var textMessage = producer.CreateTextMessage(msg); 

     textMessage.Properties.SetString("Topic", "ACCOUNT.UPDATE"); 
     textMessage.Properties.SetString("Action", "UPDATE"); 
     textMessage.Properties.SetString("DataContractType", "Account"); 

     producer.Send(textMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, new TimeSpan(0, 0, 60, 0, 0)); 

    } 

Hier ist, wie ich zu dem Thema am abonnieren möchte. Dieser Code ist so eingerichtet, dass mehrere gemeinsam genutzte Teilnehmer auf eingehende Nachrichten warten können. Mir wurde gesagt, dass ich virtuelle Themen verwenden muss, um das zu erreichen. Also habe ich meinen Abonnenten für die Verwendung von Virtual Topic konfiguriert und es wird in einem Windows Service-Projekt gehostet. Ich verwende den Bestätigungsmodus als ClientAcknowledge, so dass, wenn eine Nachricht nicht bestätigt wird, sie immer wieder kommen sollte. Das folgende Code-Snippet repräsentiert nur den wichtigen Teil des Windows-Dienstes.

var brokerUrl = "failover://tcp://103.24.34.225:61616,tcp://103.24.34.226:61616,tcp://103.24.34.346:61616,tcp://103.24.34.347:61616"; 
IConnectionFactory factory = new ConnectionFactory(new Uri(brokerUrl)); 
IConnection connection = factory.CreateConnection("conn1", "conn$34")) 

    connection.ClientId = "TESTSUBSCRIBER"; 
    connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge; 
    connection.ConnectionInterruptedListener += OnConnectionInturrupted; 
    connection.ExceptionListener += OnConnectionExceptionListener; 
    connection.ConnectionResumedListener += OnConnectionResumedListener; 
    connection.Start(); 

    ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); 
    var queue = new ActiveMQQueue("VT.TESTSUBSCRIBER.ACCOUNT.UPDATE"); 
    ActiveMQTopic topic = new ActiveMQTopic(); 
    IMessageConsumer consumer = session.CreateConsumer(queue); 

    consumer.Listener += OnMessage; 



private void OnMessage(IMessage message) 
{ 
    var payload = ((ITextMessage)message).Text; 
    Log.Info($"Received message for Client TESTSUBSCRIBER - [{payload}]"); 
    if(payload != "43342_test") 
    { 
     message.Acknowledge(); 
      Log.Info($"Message acknowledged for Client TESTSUBSCRIBER - [{payload}]"); 
    } 
} 

private void OnConnectionResumedListener() 
{ 
    Log.Info($"Subscriber connection resumed for Client TESTSUBSCRIBER"); 
} 

private void OnConnectionExceptionListener(Exception exception) 
{ 
    Log.Error(exception); 
} 

private void OnConnectionInturrupted() 
{ 
    Log.Error($"Subscriber connection interrupted for Client TESTSUBSCRIBER"); 
} 

Ich bin in der Lage zu veröffentlichen und Abonnenten Nachrichten. Ich stoße auf ein Problem mit einem bestimmten Fall. Nehmen wir an, der Teilnehmer hat eine Verbindung zum (.225-Broker-Server) aus dem Pool der Failover-Server hergestellt. Der Herausgeber hat eine Nachricht veröffentlicht. Der Abonnent hat es erhalten und es befindet sich mitten in der Verarbeitung. Aber aufgrund einiger Wartungsarbeiten musste der Windows-Dienst heruntergefahren werden. Als Ergebnis wurde die Verbindung des Teilnehmers zum Broker getrennt. Als der Windows-Dienst wieder verfügbar wurde, stellte der Teilnehmer diesmal eine Verbindung zu einem anderen Broker-Server (.346-Broker-Server) aus dem Failover-Pool her. Wenn das passierte, wurde die unbestätigte Nachricht nie erneut übermittelt. Aber wenn ich den Windows-Dienst neu starte und mit etwas Glück Wenn Verbindung zum .225-Broker hergestellt wurde (derselbe Server, mit dem der Teilnehmer ursprünglich verbunden war), erhält der Teilnehmer nun die unbestätigte Nachricht.

Meine Annahme ist, dass, wenn ActiveMQ in einem Failover-Setup konfiguriert ist, egal welchen Broker-Server aus dem Failover-Pool der Teilnehmer eine Verbindung herstellen kann, sollte es immer die unbestätigten Nachrichten erhalten.

In einigen Situationen scheint Fail Over Setup zu funktionieren. Nehmen wir an, der Teilnehmer war mit dem .346-Brokerserver aus dem Failover-Pool verbunden. Der Publisher ist mit einem anderen Broker-Server (.225-Broker) aus demselben Pool verbunden und veröffentlicht eine Nachricht. Der Abonnent empfängt Nachrichten. Dies beweist, dass Fail Over Setup funktioniert.

Sobald ein Abonnent jedoch eine Nachricht von einem Broker-Server erhält und der Teilnehmer vor dem Bestätigen der Nachricht getrennt wird, muss er die Verbindung zum selben Broker-Server wiederherstellen, um die nicht bestätigte Nachricht zu empfangen. Das klingt nicht richtig für mich.

Ist für die Konfiguration des Active MQ-Servers eine zusätzliche Konfiguration erforderlich, damit dieser Anwendungsfall funktioniert?

Antwort

0

Die Lösung für dieses Problem war nicht auf der Client-Seite, sondern mit der Active MQ Server-Konfiguration.

Fügen Sie ConditionalNetworkBridgeFilter für die Richtlinie für die Ablaufsteuerung der Ablaufsteuerung hinzu, und aktivieren Sie replayWhenNoConsumers.

Verwandte Themen