2017-03-09 3 views
0

bin aktive MQ 5.10 Version verwenden, ich habe es mit Wso2esb für die Nachrichtenverarbeitung konfiguriert.Active MQ tcp Verbindungsfehler

Nach ca. 7-10 Tagen wirft Active MQ tcp connections failure exception, da ESB keine erfolgreiche tcp-Verbindung empfängt, kann es keine Nachrichten in der Warteschlange senden.

In diesem Fall den Server neu starten und wieder wird es für 7-10 Tage laufen und das Gleiche wiederholt sich.

meine Fragen ist

was kann der genaue Grund für die aktive MQ sein beendet erfolgreich TCP-Verbindung geben ..?

warum nach dem Neustart des Servers es in den Normalzustand zurückkehrt ..?

gibt es eine beste Lösung, um über dieses Problem kommen ..

Speicherkonfiguration in activemq.xml Datei

<systemUsage> 
    <systemUsage sendFailIfNoSpace="true"> 
     <memoryUsage> 
       <memoryUsage limit="1430 mb"/> 
      </memoryUsage> 
      <storeUsage> 
       <storeUsage limit="300 gb"/> 
      </storeUsage> 
      <tempUsage> 
       <tempUsage limit="100 gb"/> 
      </tempUsage> 
    </systemUsage> 
</systemUsage> 

Hier ist der Proxy-Dienst der Nachricht in der Warteschlange wird platzieren. In dem Proxy-Dienst wird die Benutzer-Authentifizierung überprüfen, ob seine wahre des Benutzer die Nachricht in der Warteschlange platzieren kann, hier verwende ich eine Klasse Vermittler, wo, dass sie mit dem aktiven MQ verbinden und legen Nachricht

<proxy xmlns="http://ws.apache.org/ns/synapse" 
     name="JmsStore2.0" 
     transports="https http" 
     startOnLoad="true" 
     trace="disable" 
     statistics="enable"> 
    <description/> 
    <target> 
     <inSequence onError="fault"> 
     <property name="messageType" value="application/json" scope="axis2"/> 
     <property name="FORCE_ERROR_ON_SOAP_FAULT" value="true"/> 
     <property name="jmsuri" value="tcp://0.0.0.0:61616"/> 
     <property name="jmsqueue" expression="get-property('transport', 'jmsqueue')"/> 
     <property name="readingspayload" expression="$body" type="OM"/> 
     <property name="username" expression="get-property('transport', 'username')"/> 
     <property name="password" expression="get-property('transport', 'password')"/> 
     <property name="PartyBranchID" 
        expression="//FieldValue/text()" 
        scope="default" 
        type="STRING"/> 
     <property name="Body" expression="$body" scope="default" type="STRING"/> 
     <property name="usercode" 
        expression="fn:substring-before(get-property('username'),'|')" 
        scope="default" 
        type="STRING"/> 
     <property name="clientid" 
        expression="fn:substring-after(get-property('username'),'|')" 
        scope="default" 
        type="STRING"/> 
     <property name="requestMsgId" 
        expression="get-property('MessageID')" 
        scope="default" 
        type="STRING"/> 
     <property name="client_ip_address" 
        expression="get-property('axis2','REMOTE_ADDR')" 
        scope="default" 
        type="STRING"/> 

     <payloadFactory media-type="xml"> 
      <format> 
       <send xmlns=""> 
        <username>$1</username> 
        <password>$2</password> 
       </send> 
      </format> 
      <args> 
       <arg evaluator="xml" expression="get-property('username')"/> 
       <arg evaluator="xml" expression="get-property('password')"/> 
      </args> 
     </payloadFactory> 
     <send receive="JmsStore_Seq"> 
      <endpoint> 
       <address uri="http://localhost:8282/services/Login2.0" format="soap11"> 
        <suspendOnFailure> 
        <errorCodes>101500,101501,101506,101507,101508,101503,50000</errorCodes> 
        <initialDuration>30</initialDuration> 
        <progressionFactor>1.0</progressionFactor> 
        <maximumDuration>300</maximumDuration> 
        </suspendOnFailure> 
       </address> 
      </endpoint> 
     </send> 
     </inSequence> 
     <outSequence onError="fault"> 
     <send/> 
     </outSequence> 
    </target> 
</proxy> 

Sequenz:

<sequence xmlns="http://ws.apache.org/ns/synapse" 
      name="JmsStore_Seq" 
      trace="disable"> 
    <property name="FORCE_ERROR_ON_SOAP_FAULT" value="true"/> 
    <property xmlns:ns="http://org.apache.synapse/xsd" 
      name="Authentication" 
      expression="//Authentication/text()"/> 
    <property xmlns:ns="http://org.apache.synapse/xsd" 
      name="UserId" 
      expression="//UserId/text()" 
      scope="default" 
      type="STRING"/> 
    <property xmlns:ns="http://org.apache.synapse/xsd" 
      name="WorkOUid" 
      expression="//WorkOUid/text()"/> 
    <property xmlns:ns="http://org.apache.synapse/xsd" 
      name="WorkPartyBranchId" 
      expression="//WorkPartyBranchId/text()"/> 

    <filter xmlns:ns="http://org.apache.synapse/xsd" 
      xpath="get-property('Authentication')=''"> 
     <then> 
     <payloadFactory media-type="xml"> 
      <format> 
       <ResponseJSON xmlns=""> 
        <Exception>Service trying to connect inactive service</Exception> 
        <Status>101503</Status> 
       </ResponseJSON> 
      </format> 
      <args/> 
     </payloadFactory> 
     <property name="messageType" value="application/json" scope="axis2"/> 
     <property name="HTTP_METHOD" value="POST" scope="axis2" type="STRING"/> 
     <property name="RESPONSE" value="true" scope="default" type="STRING"/> 
     <property name="NO_ENTITY_BODY" scope="axis2" action="remove"/> 
     <send/> 
     </then> 
     <else> 
     <filter xpath="get-property('Authentication')='false'"> 
      <then> 
       <payloadFactory media-type="xml"> 
        <format> 
        <ResponseJSON xmlns=""> 
         <Exception>Authentication Failed</Exception> 
         <Status>401</Status> 
        </ResponseJSON> 
        </format> 
        <args/> 
       </payloadFactory> 
       <property name="messageType" value="application/json" scope="axis2"/> 
       <property name="HTTP_METHOD" value="POST" scope="axis2" type="STRING"/> 
       <property name="RESPONSE" value="true" scope="default" type="STRING"/> 
       <property name="NO_ENTITY_BODY" scope="axis2" action="remove"/> 
       <send/> 
      </then> 
      <else> 
       <property name="jmspayload" 
         expression="get-property('readingspayload')" 
         type="OM"/> 
       <property name="ResponseJSON" expression="$body/ResponseJSON" type="OM"/> 
       <property name="jmsuri" expression="get-property('jmsuri')"/> 
       <property name="jmsqueue" expression="get-property('jmsqueue')"/> 
       <payloadFactory media-type="xml"> 
        <format> 
        <PLData> 
         <JMpayload>$1</JMpayload> 
         <AuthData>$2</AuthData> 
         <LogData> 
          <usercode>$3</usercode> 
          <clientid>$4</clientid> 
          <requestMsgId>$5</requestMsgId> 
         </LogData> 
        </PLData> 
        </format> 
        <args> 
        <arg evaluator="xml" expression="get-property('jmspayload')"/> 
        <arg evaluator="xml" expression="get-property('ResponseJSON')"/> 
        <arg evaluator="xml" expression="get-property('usercode')"/> 
        <arg evaluator="xml" expression="get-property('clientid')"/> 
        <arg evaluator="xml" expression="get-property('requestMsgId')"/> 
        </args> 
       </payloadFactory> 
       <class name="in.youtility.esb.custommediators.JMSStoreMediator"/> 
       <payloadFactory media-type="xml"> 
        <format> 
        <ResponseJSON xmlns=""> 
         <Body> 
          <Datalist> 
           <Data>Successfully stored</Data> 
          </Datalist> 
         </Body> 
         <Status>200</Status> 
        </ResponseJSON> 
        </format> 
        <args/> 
       </payloadFactory> 
       <property name="messageType" value="application/json" scope="axis2"/> 
       <header name="To" action="remove"/> 
       <property name="NO_ENTITY_BODY" scope="axis2" action="remove"/> 
       <property name="RESPONSE" value="true"/> 
       <send/> 
      </else> 
     </filter> 
     </else> 
    </filter> 
    <description/> 
</sequence> 

Klasse Mediator:

public class JMSStoreMediator extends AbstractMediator implements 

ManagedLifecycle {

Connection connection; 

public boolean mediate(MessageContext msgCtx) { 


    try { 
     boolean topic=false; 
     String jmsuri=""+msgCtx.getProperty("jmsuri"); 
     String t=""+msgCtx.getProperty("topic"); 

     if(t.isEmpty()){ 

      topic=false; 
     } 
     else { 

      topic=Boolean.valueOf(t); 
     } 

     ConnectionFactory factory= new ActiveMQConnectionFactory(jmsuri); 
     connection = factory.createConnection(); 
     connection.start(); 
     Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     Destination destination=null; 
     if(!topic)destination= session.createQueue(""+msgCtx.getProperty("jmsqueue")); 
     else destination= session.createTopic(""+msgCtx.getProperty("jmsqueue")); 
     MessageProducer producer = session.createProducer(destination); 
     producer.setDeliveryMode(DeliveryMode.PERSISTENT); 

     String xml = ""+msgCtx.getEnvelope().getBody().toStringWithConsume(); 

     if(topic){ 

      JSONObject obj=XML.toJSONObject(xml); 
      JSONObject ar=obj.getJSONObject("soapenv:Body"); 
      ar.remove("xmlns:soapenv"); 
      xml=ar.toString(); 
     } 
     TextMessage message = session.createTextMessage(xml); 
     producer.send(message); 

    } catch (Exception e) { 

     log.info("LogLocation = "+getClass().getName()+",Error in storing message in JMS stacktrace is :"+e.toString()); 
     ((Axis2MessageContext) msgCtx).setProperty(NhttpConstants.HTTP_SC, 500); 
     handleException("Error while storing in the message store", msgCtx); 

    } 
    finally { 
     try { 
      connection.close(); 

     } catch (JMSException e) { 
      log.info("LogLocation = "+getClass().getName()+",Error in closing JMS connection stacktrace is :"+e.toString()); 
     } 
    } 
     log.info("LogLocation = "+getClass().getName()+",ProxyName = "+msgCtx.getProperty("proxy.name")+ 
      ",Usercode = "+msgCtx.getProperty("usercode")+",Clientid = "+msgCtx.getProperty("clientid")+ 
      ",requestMsgId = "+msgCtx.getProperty("requestMsgId")+",Position = END"); 

    return true; 
} 
+0

Ist die Anzahl der Nachrichten in ActiveMQ, Tag für Tag wächst? Wie verwenden Sie AMQ mit einem JMS-Proxy-Dienst, einem MessageStore? –

+0

hi @ Jean-Michel danke für die Antwort, benutze ESB-Proxy-Dienst, der eine Nachricht in der Warteschlange und anderen JMS-Proxy-Dienst, der diese Warteschlange zuhören und verarbeiten wird. Häufig kommen Nachrichten in die Warteschlange und werden ohne Probleme verarbeitet, bis Active MQ eine erfolgreiche TCP-Verbindung mit ESB herstellt. – user4045063

+0

Ich poste eine erste Antwort, aber sagen Sie mir, ob die Anzahl der Nachrichten in ActiveMQ wächst, wenn sie fehlschlägt (Tausende von Nachrichten in den Warteschlangen) –

Antwort

0

Sie sollten Ihre proxys conf teilen, aber man kann immer überprüfen, dass:

  • Sie die Cachelevel festgelegt haben, bei jedem Scan neue Verbindungen zu vermeiden: <parameter name="transport.jms.CacheLevel">consumer</parameter>
  • Sie gefragt haben alle Nachrichten zu konsumieren ein Jeder Scan: <parameter name="transport.jms.MaxMessagesPerTask">-1</parameter>
+0

Ich habe meinen Proxy-Code geteilt, den Sie sich angesehen haben. – user4045063