2017-04-07 1 views
1

Hallo Ich versuche, einen Testfall für die Implementierung der Failover-Unterstützung für ActiveMQ zu schreiben.Verlorene Nachricht beim Stoppen und Neustarten von Embedded ActiveMQ

Hier ist der Code

val brokerA = createBroker("A") 
brokerA.start 
val failoverUrl = s"failover:(vm://BrokerA?create=false)" + 
s"?randomize=false&maxReconnectAttempts=-1&reconnectSupported=true" 


val cFactory = new ActiveMQConnectionFactory(failoverUrl) 
val qConnection = getQueueConnection 
val session = createQueueSession(qConnection) 

private def totalReadMessagesCount(queueReceiver: QueueReceiver) = { 
val messages = Iterator.continually(Option(queueReceiver.receive(2000))).takeWhile(_.isDefined).flatten.toSeq 
messages.size 
} 

private def getReceiver = { 
val queueConnection = getQueueConnection 
queueConnection.start() 
val queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE) 
val queueReceiver = createQueueReceiver(queueSession, brokerA.getBrokerName) 
queueReceiver 
} 

def getQueueConnection =cFactory.createQueueConnection("admin", "") 

def createBroker(name:String) = { 
val broker = new BrokerService() 
val adaptor = new KahaDBPersistenceAdapter() 
broker.setBrokerName("Broker" + name) 
broker.addConnector(getBrokerUrl) 
broker.setPersistent(true) 
broker.setUseJmx(false) 
broker.setUseShutdownHook(false) 
broker 
} 

def getBrokerUrl = "tcp://localhost:0" 


val queueReceiver: QueueReceiver = getReceiver 
val messageCount = 500 
(1 to messageCount) map {count => 
    //Calling method to send message to ActiveMQ 
    if(count == 200){ 
    brokerA.stop() 
    brokerA.waitUntilStopped() 
    brokerA.start(true) 
    } 
} 
val totalCount = totalReadMessagesCount(queueReceiver) 
println(s"Read ${totalCount} messages") 
assert(totalCount == messageCount) 

Ich bin in der Lage mit activeMQ nach dem Neustart wieder zu verbinden, aber totalCount anzeigt 300 statt 500. Es scheint vorherige Nachrichten verloren gehen. Allerdings, wenn ich das gleiche Szenario im nicht eingebetteten Modus ausführen. Ich kann alle Nachrichten erhalten.

Bitte helfen Sie mir, wie kann ich jede Nachricht verlieren verhindern, während eingebettete aktive mq neu zu starten.

+0

Sie benötigen 2 (zwei) Instanzen von activeMQ haben den Failover zu testen. Das Failover bedeutet, dass ein Client, der nicht mit einem bestimmten Server sprechen kann, den nächsten aus der Liste der Server in der Verbindungszeichenfolge versucht. Sie haben nur einen Server in Ihrer Verbindungszeichenfolge. Siehe die Dokumentation hier: http://activemq.apache.org/failover-transport-reference.html – zloster

Antwort

1

Sie müssen wahre persistent setzen, ich weiß nicht, scala, aber hier ist Java-Code

public BrokerService broker() throws Exception { 
    final BrokerService broker = new BrokerService(); 
    //broker.addConnector("tcp://localhost:61616"); 
    broker.addConnector("stomp://localhost:61613"); 
    broker.addConnector("vm://localhost"); 
    PersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); 
    File dir = new File(System.getProperty("user.home") + File.separator + "kaha"); 
    if (!dir.exists()) { 
     dir.mkdirs(); 
    } 
    persistenceAdapter.setDirectory(dir); 
    broker.setPersistenceAdapter(persistenceAdapter); 
    broker.setPersistent(true); 
    return broker; 
} 
+0

Ich habe gerade mein Beispiel aktualisiert. Ich habe hartnäckig auf wahr gemacht und KahaDB hinzugefügt. Aber jetzt bekomme ich 305 Punkte. Irgendein Vorschlag? –

+0

Sie haben eine Instanz von KahaDBPersistenceAdapter erstellt, aber Sie richten sie nicht für den Broker ein. Wie dieser broker.setPersistenceAdapter (adapter); –

Verwandte Themen