2016-12-28 3 views
0

Ich bin neu in diesem Thema. Ich verwende einen JMS-Listener, um einen aktiven MQ zu hören, der geteilte Nachrichten enthält. Ich muss die Warteschlange bis zur letzten Nachricht anhören und sie dann zusammen an die Benutzeroberfläche senden. Ich bin in der Lage, die Warteschlange zu hören und Nachrichten zu erfassen, aber ich weiß nicht, wie viele geteilte Nachrichten verfügbar sein werden, so dass ich nicht in der Lage bin, alles zusammen zu senden. Gibt es eine Möglichkeit, Listener dazu zu bringen, die obige Operation auszuführen? Wenn in der Warteschlange keine Nachrichten mehr verfügbar sind, erzeugt der JMS-Listener einen Nullwert? Jede Idee oder Hilfe wird sehr hilfreich sein.ActiveMQ JMSListener

Ich benutze den folgenden Code, um die Warteschlange mit JMS Listener zu hören.

private static final String ORDER_RESPONSE_QUEUE = "mail-response-queue"; 

@JmsListener(destination = ORDER_RESPONSE_QUEUE) 
public void receiveMessage(final Message<InventoryResponse> message) throws JMSException { 
    LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++"); 
    MessageHeaders headers = message.getHeaders(); 
    LOG.info("Application : headers received : {}", headers); 

    InventoryResponse response = message.getPayload(); 
    LOG.info("Application : response received : {}",response); 
    LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++"); 
} 

Kann ich Warteschlangeninformationen mit JMS Listener abrufen?

Antwort

0

von jmx haben Sie Zugriff auf Informationen über Ziele, so können Sie beispielsweise wissen, wie Nachrichten in einer Warteschlange anstehen.

Beachten Sie, dass sich das ändern kann, wenn neue Nachrichten

lange org.apache.activemq.broker.jmx.DestinationViewMBean.getQueueSize()

@MBeanInfo (value = "gesendet werden Nummer von Meldungen im Ziel, die noch konsumiert werden sollen, möglicherweise versandt, aber nicht quittiert. ")

Gibt die Anzahl der Nachrichten in diesem Ziel zurück, die nochsein müssenverbraucht Returns: Gibt die Anzahl der Meldungen in diesem Reiseziel , die noch werden sollen

verbraucht
import java.util.HashMap; 
import java.util.Map; 

import javax.management.MBeanServerConnection; 
import javax.management.MBeanServerInvocationHandler; 
import javax.management.ObjectName; 
import javax.management.remote.JMXConnector; 
import javax.management.remote.JMXConnectorFactory; 
import javax.management.remote.JMXServiceURL; 

import org.apache.activemq.broker.jmx.BrokerViewMBean; 
import org.apache.activemq.broker.jmx.QueueViewMBean; 

public class JMXGetDestinationInfos { 

    public static void main(String[] args) throws Exception { 
     JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://host:1099/jmxrmi"); 
     Map<String, String[]> env = new HashMap<>(); 
     String[] creds = {"admin", "activemq"}; 
     env.put(JMXConnector.CREDENTIALS, creds); 
     JMXConnector jmxc = JMXConnectorFactory.connect(url, env); 
     MBeanServerConnection conn = jmxc.getMBeanServerConnection(); 

     ObjectName activeMq = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost"); 

     BrokerViewMBean mbean = MBeanServerInvocationHandler.newProxyInstance(conn, activeMq, BrokerViewMBean.class, 
       true); 
     for (ObjectName name : mbean.getQueues()) { 
      if (("Destination".equals(name.getKeyProperty("destinationName")))) { 
       QueueViewMBean queueMbean = MBeanServerInvocationHandler.newProxyInstance(conn, name, 
         QueueViewMBean.class, true); 
       System.out.println(queueMbean.getQueueSize()); 
      } 
     } 
    } 
} 

Warum nicht raubend Mitteilungen, und wenn es keine Nachrichten empfangen Sie angezeigt werden ?? Sie haben unten eine Methode, die nach einem Timeout null zurückgibt, wenn keine Nachrichten empfangen wurden.

ActiveMQMessageConsumer.receive (long timeout) JMSException wirft die nächste Nachricht empfängt, die innerhalb des festgelegten Timeout-Zeit ankommt. Dieser Anruf blockiert, bis eine Nachricht eintrifft, das Zeitlimit abgelaufen ist oder dieser Nachrichtenkonsumenten geschlossen ist. Ein Timeout von Null läuft niemals ab und der Anruf blockiert für unbegrenzte Zeit. Angegeben von: Empfangen in der Schnittstelle MessageConsumer Parameter: Timeout - der Timeout-Wert (in Millisekunden), ein Timeout von Null läuft nie ab. Rückkehr: die nächste Nachricht erzeugt für diese Nachricht Verbraucher oder null, wenn das Timeout abläuft oder diese Nachricht Verbraucher gleichzeitig geschlossen

UPDATE

kann so aussehen:

import java.io.IOException; 
import java.net.MalformedURLException; 
import java.util.HashMap; 
import java.util.Map; 

import javax.management.MBeanServerConnection; 
import javax.management.MBeanServerInvocationHandler; 
import javax.management.MalformedObjectNameException; 
import javax.management.ObjectName; 
import javax.management.remote.JMXConnector; 
import javax.management.remote.JMXConnectorFactory; 
import javax.management.remote.JMXServiceURL; 

import org.apache.activemq.broker.jmx.BrokerViewMBean; 
import org.apache.activemq.broker.jmx.QueueViewMBean; 

public class JMXGetDestinationInfos { 

    private QueueViewMBean queueMbean; 

    { 
     try { 
      JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://host:1099/jmxrmi"); 
      Map<String, String[]> env = new HashMap<>(); 
      String[] creds = { "admin", "activemq" }; 
      env.put(JMXConnector.CREDENTIALS, creds); 
      JMXConnector jmxc = JMXConnectorFactory.connect(url, env); 
      MBeanServerConnection conn = jmxc.getMBeanServerConnection(); 

      ObjectName activeMq = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost"); 

      BrokerViewMBean mbean = MBeanServerInvocationHandler.newProxyInstance(conn, activeMq, BrokerViewMBean.class, 
        true); 
      for (ObjectName name : mbean.getQueues()) { 
       if (("Destination".equals(name.getKeyProperty("destinationName")))) { 
        queueMbean = MBeanServerInvocationHandler.newProxyInstance(conn, name, QueueViewMBean.class, true); 
        System.out.println(queueMbean.getQueueSize()); 
        break; 
       } 
      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 

    @JmsListener(destination = ORDER_RESPONSE_QUEUE) 
    public void receiveMessage(final Message<InventoryResponse> message, javax.jms.Message amqMessage) throws JMSException { 
     LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++"); 
     MessageHeaders headers = message.getHeaders(); 
     LOG.info("Application : headers received : {}", headers); 

     InventoryResponse response = message.getPayload(); 
     LOG.info("Application : response received : {}",response); 
     LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++"); 
     //queueMbean.getQueueSize() is real time, each call return the real size 
     ((org.apache.activemq.command.ActiveMQMessage) amqMessage).acknowledge(); 
     if(queueMbean != null && queueMbean.getQueueSize() == 0){ 
      //display messages ?? 
     } 
    } 
} 

weil getQueueSize() geben Sie die Anzahl der Nachrichten in der Ziel, die Sie sind t konsumiert werden. Möglicherweise versandt, aber unbestätigt.

Eine Lösung ist die acknowledgeMode zu org.apache.activemq.ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE für Sitzungen Schöpfung im Frühjahr DefaultMessageListenerContainer.sessionAcknowledgeModeName und bestätigt jede Nachricht einzeln und prüfen Sie nach, dass, wenn die Größe == 0 (Größe == 0 bedeutet, dass alle Nachrichten versandt werden aktualisiert und bestätigt).

+0

Vielen Dank, aber ich möchte es mit einem Listener statt Verbraucher zu tun, ist es möglich, Warteschlangeninformationen mit JMS-Listener zu bekommen? – user6543599

+0

Ich habe meinen Beitrag aktualisiert. Bitte sieh es dir an und hilf mir – user6543599