Hallo Ich versuche, einen Testfall für die Implementierung der Failover-Unterstützung für ActiveMQ zu schreiben.Verlorene Nachricht beim Stoppen und Neustarten von Embedded ActiveMQ
Hier ist der Code
val brokerA = createBroker("A")
brokerA.start
val failoverUrl = s"failover:(vm://BrokerA?create=false)" +
s"?randomize=false&maxReconnectAttempts=-1&reconnectSupported=true"
val cFactory = new ActiveMQConnectionFactory(failoverUrl)
val qConnection = getQueueConnection
val session = createQueueSession(qConnection)
private def totalReadMessagesCount(queueReceiver: QueueReceiver) = {
val messages = Iterator.continually(Option(queueReceiver.receive(2000))).takeWhile(_.isDefined).flatten.toSeq
messages.size
}
private def getReceiver = {
val queueConnection = getQueueConnection
queueConnection.start()
val queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE)
val queueReceiver = createQueueReceiver(queueSession, brokerA.getBrokerName)
queueReceiver
}
def getQueueConnection =cFactory.createQueueConnection("admin", "")
def createBroker(name:String) = {
val broker = new BrokerService()
val adaptor = new KahaDBPersistenceAdapter()
broker.setBrokerName("Broker" + name)
broker.addConnector(getBrokerUrl)
broker.setPersistent(true)
broker.setUseJmx(false)
broker.setUseShutdownHook(false)
broker
}
def getBrokerUrl = "tcp://localhost:0"
val queueReceiver: QueueReceiver = getReceiver
val messageCount = 500
(1 to messageCount) map {count =>
//Calling method to send message to ActiveMQ
if(count == 200){
brokerA.stop()
brokerA.waitUntilStopped()
brokerA.start(true)
}
}
val totalCount = totalReadMessagesCount(queueReceiver)
println(s"Read ${totalCount} messages")
assert(totalCount == messageCount)
Ich bin in der Lage mit activeMQ nach dem Neustart wieder zu verbinden, aber totalCount
anzeigt 300 statt 500. Es scheint vorherige Nachrichten verloren gehen. Allerdings, wenn ich das gleiche Szenario im nicht eingebetteten Modus ausführen. Ich kann alle Nachrichten erhalten.
Bitte helfen Sie mir, wie kann ich jede Nachricht verlieren verhindern, während eingebettete aktive mq neu zu starten.
Sie benötigen 2 (zwei) Instanzen von activeMQ haben den Failover zu testen. Das Failover bedeutet, dass ein Client, der nicht mit einem bestimmten Server sprechen kann, den nächsten aus der Liste der Server in der Verbindungszeichenfolge versucht. Sie haben nur einen Server in Ihrer Verbindungszeichenfolge. Siehe die Dokumentation hier: http://activemq.apache.org/failover-transport-reference.html – zloster