2016-09-19 1 views
0

Ich bin eine Anwendung mit ActiveMQ erstellen, wo ich einen Produzenten und einen Verbraucher habe. In der Consumer Iam mit einem MessageListener, um Nachrichten von Producer asynchron abhören, die mit einer Methode namens OnMessage (Message-Nachricht) erfolgt ist. Aber bevor ich die Nachrichten konsumiere, möchte ich eine Zustandsprüfung durchführen und dann die Nachrichten konsumieren. Ich möchte keinen synchronen Nachrichtenverbrauch verwenden, da dies gegen mein Design verstößt.Wie pausiere und setze ich den asynchronen Verbrauch von JMS-Nachrichten fort

void initialize() throws JMSException { 
      this.connection = this.connectionFactory.createConnection(); 
      this.connection.start(); 
      final Session session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
      final Destination destination = session.createQueue("testQ"); 
      this.consumer = session.createConsumer(destination); 
      this.consumer.setMessageListener(this); 
} 

prüfen Zustand hier wie Erkennung von Internet-Verbindung usw.

public void onMessage(final Message message) { 
     Preconditions.checkNotNull(message); 

     if (!(message instanceof TextMessage)) { 
      _LOG.error("The message is not of type TextMessage but of type {} so we could not process", message.getClass().getSimpleName()); 
      throw new IllegalArgumentException("This type '" + message.getClass().getSimpleName() + "' of message could not been handled"); 
     } 

     try { 
      final String messageType = message.getStringProperty("messageType"); 
      Preconditions.checkNotNull(messageType); 
      _LOG.info("The MessageType is {}", messageType); 

      final String msg = ((TextMessage) message).getText(); 
      Preconditions.checkNotNull(msg); 
      _LOG.debug(msg); 

      process(messageType, msg); 
     } catch (final JMSException e) { 
      _LOG.error("We could not read the message", e); 
     } 
    } 

Jeder Code Beispiel wäre toll.

Antwort

1

Es ist nicht klar, warum Sie Prüfungen durchführen möchten, denn wenn es ein Problem mit der Verbindung gibt, wird Ihre Anwendung über diesen Fehler informiert. Sie können einen ExceptionListener für JMS Connection einrichten, der aufgerufen wird, wenn bei der Verbindung ein Problem auftritt.

Die onMessage wird nicht aufgerufen, wenn ein Verbindungsproblem vorliegt.

Auch ich würde die connection.start() -Methode Anruf nach dem Einrichten der Nachricht Listener für Verbraucher schieben. Ein Aufruf von connection.start() zeigt dem Messaging-Provider an, dass die Anwendung zum Empfang von Nachrichten bereit ist.

Es gibt connection.stop(), um die Nachrichtenzustellung anzuhalten/zu stoppen. Sie können connection.start() erneut ausgeben, um die Nachrichtenzustellung fortzusetzen.


-Update auf der Grundlage Ihrer Antwort

Ich schlage vor, Sie eine Sitzung mit Client-Modus verwenden, bestätigen.

final Session session = this.connection.createSession(false, Session.CLIENT_ACKNOWLEDGE) 

dann in onMessage Methode, wenn es keine Internet-Verbindung ist, bestätigt die Nachricht nicht. Die Nachricht wird erneut gesendet, wenn sie nicht abgelaufen ist.

+0

Danke für die reply.Right Ich werde mein Szenario beschreiben. Nehmen Sie die Nachricht, die ich aus activemq Warteschlange konsumiere, möchte ich es an einen Webservice senden und vor dem Senden an einen Webservice iam prüfen, ob die Internetverbindung vorhanden ist oder nicht. Wenn die Internetverbindung nicht vorhanden ist, möchte ich Pause die Nachrichten verbrauchen von MessageListener, der schließlich im activemq wegen der Ablaufzeit ablaufen wird (die entsprechend meinem Entwurf ist) und wenn die Internetverbindung Verbindung ist, sende ich die Nachrichten an webservice.Can Sie helfen mir, indem Sie meinen onMessage Code redigieren, den ich oben gepostet habe. –

+0

public void onMessage (letzte Nachricht Nachricht) { if (isNetavailable() == false) \t \t { \t \t \t try { \t \t \t \t this.connection.stop(); \t \t \t} catch (JMSException e) { \t \t \t \t e.printStackTrace(); \t \t \t} \t \t} \t \t else if (isNetavailable() == true) \t \t { \t \t \t try { \t \t \t \t this.connection.start(); \t \t \t} catch (JMSException e1) { \t \t \t \t e1.printStackTrace(); \t \t \t} \t \t} } Meine Frage ist hier, nachdem ich connection.stop() in onMessage berufen, wenn keine Internetverbindung ist, dann wird die onMessage das nächste Mal aufgerufen werden connection.start() –

+0

eschallungssysteme auszuführen void onMessage (letzte Nachricht) { if (isReachable == false) {return;} else –

1

In der asynchronen Nachrichtenübermittlung können Sie die Bedingung vor der Verarbeitung der Nachricht immer überprüfen. Sobald die JMS-Nachricht an Ihren Listener-Code (d. H. OnMessage-Methode) empfangen wurde, liegt es an Ihnen, die weitere Richtung anzugeben. Ich hatte ein Problem gelöst, bei dem ich die JMS-Nachricht mit meinem db-Inhalt überprüfen wollte.

Kontrolle der Internet-Verbindung am Leben ist von URLConnection an den Webservice-URL so etwas wie unten machen:

public static boolean isReachable(String targetUrl) throws IOException 
{ 
    try { 
    HttpURLConnection httpUrlConnection = (HttpURLConnection) new URL(
     targetUrl).openConnection(); 
    httpUrlConnection.setRequestMethod("HEAD"); 


    int responseCode = httpUrlConnection.getResponseCode(); 

    return responseCode == HttpURLConnection.HTTP_OK; 
} catch (UnknownHostException noInternetConnection) 
    { 
    return false; 
    } 
} 

Und dann in Ihrem onMessage Methode rufen Sie die Methode als

public void onMessage(final Message message) { 
    if(isReachable){ 
    Preconditions.checkNotNull(message); 

    if (!(message instanceof TextMessage)) { 
     _LOG.error("The message is not of type TextMessage but of type {} so we could not process", message.getClass().getSimpleName()); 
     throw new IllegalArgumentException("This type '" + message.getClass().getSimpleName() + "' of message could not been handled"); 
    } 

    try { 
     final String messageType = message.getStringProperty("messageType"); 
     Preconditions.checkNotNull(messageType); 
     _LOG.info("The MessageType is {}", messageType); 

     final String msg = ((TextMessage) message).getText(); 
     Preconditions.checkNotNull(msg); 
     _LOG.debug(msg); 

     process(messageType, msg); 
    } catch (final JMSException e) { 
     _LOG.error("We could not read the message", e); 
    } 
    } 
} 
+0

Eine Abfrage über AUTOACKNOWLEDGE.Iam mit AUTOACKNOWLEDGE & wie Sie in OnMessage sehen gibt es eine Methode Prozess (messageType, msg). Wenn process() Exception wirft wird die activeMQ versuchen, die gleiche Nachricht kontinuierlich zu senden? In meinem Fall nehme ich iam sending message to process() & es gibt kein internet dass die nachricht an webservice inside process() gesendet wird 7 mal mit 1 sek difference wenn ich eine ConnectionRefused Exception oder 404 error etc bekomme, dh process (messageType, msg) wird 7 mal aufgerufen wenn ich einen 200OK-Prozess bekomme (MessageType, msg) wird dieser einmal aufgerufen. Warum ist das & was ich tun muss um es zu vermeiden. –

+0

Wartet activeMQ auf automatische Bestätigung von OnMessage, die es aufgrund von Ausnahmen im Prozess nicht erhält (messageType, msg), in denen der Prozess eine Retrofit.client.Response.Ist, dass es erneut versucht, Nachrichten 7 mal zu senden. –

Verwandte Themen