2016-10-24 2 views
3

Unten ActiveMQ-Implementierung ist in Code vorhanden. Manchmal hört das System auf zu arbeiten und wird sehr langsam. Als ich den Thread-Dump mit JavaMelody überprüft habe - ich habe gesehen, dass zu viele Threads lange im Runnable-Zustand sind und nicht beendet werden.ActiveMQ-Transport: tcp: Thread RUNNABLE-Status - zu viele Threads hängen

ActiveMQ Version - activemq-all-5.3.0.jar

Code unten entnehmen Sie bitte:

Makler:

public class ActiveMQ extends HttpServlet { 

private static final long serialVersionUID = -1234568008764323456; 
private static final Logger logger = Logger.getLogger(ActiveMQ.class.getName()); 
public Listener listener; 

private String msgBrokerUrl = "tcp://localhost:61602"; 
public BrokerService broker = null; 
public TransportConnector connector = null; 

@Override 
public void init() throws ServletException { 

    try { 
     broker = new BrokerService(); 
     broker.setPersistent(false); 
     broker.setUseJmx(false); 
     connector = broker.addConnector(msgBrokerUrl); 
     broker.setUseShutdownHook(true); 
     System.out.println("BROKER LOADED"); 
     broker.start(); 
     broker.deleteAllMessages(); 

     listener = new Listener(); 

    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 

}

Zuhörer:

public class Listener implements MessageListener { 

private String msgQueueName = "jms/queue/MessageQueue"; 
public Session session; 
public Destination adminQueue; 

public static String id; 

public ActiveMQConnection connection; 
public MessageConsumer consumer = null; 

public Listener() { 
    try { 

     ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
       new URI("failover://(" + "tcp://localhost:61602" + "?wireFormat.cacheEnabled=false" 
         + "&wireFormat.maxInactivityDuration=0&wireFormat.tightEncodingEnabled=true)?maxReconnectDelay=1000")); 

     connection = (ActiveMQConnection) connectionFactory.createConnection(); 
     connection.start(); 
     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     adminQueue = session.createQueue(msgQueueName); 
     id = new Timestamp(new Date().getTime()).toString(); 
     consumer = this.session.createConsumer(this.adminQueue, "ID='" + id + "'"); 
     consumer.setMessageListener(this); 
    } catch (JMSException e) { 
     e.printStackTrace(); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 

@SuppressWarnings("unchecked") 
@Override 
public void onMessage(Message message) { 
    TextMessage msg = (TextMessage) message; 
    try { 
     String xmlMsg = msg.getText(); 
     // business logic 
    } catch (JMSException ex) { 
     ex.printStackTrace(); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 

Hersteller:

public class Producer { 
private static String url = "tcp://localhost:61602"; 
private static String msgQueueName = "jms/queue/MessageQueue"; 

public ConnectionFactory connectionFactory = null; 
public Connection connection = null; 
public Session session = null; 
public Destination destination = null; 

public Producer() { 
    connectionFactory = new ActiveMQConnectionFactory(url); 
} 

public void sendResponse(String xml, DataBean objDataBean) { 
    sendToQueue(xml, msgQueueName, objDataBean); 
} 

private void sendToQueue(String xml, String msgQueueName, DataBean obj) { 

    try { 
     Connection connection = connectionFactory.createConnection(); 
     connection.start(); 
     Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     Destination destination = session.createQueue(msgQueueName); 
     MessageProducer producer = session.createProducer(destination); 
     TextMessage message = session.createTextMessage(xml); 
     message.setJMSExpiration(1000); 
     message.setStringProperty(obj.getMsgKey(), obj.getMsgValue()); 
     producer.send(message); 

     xml = null; 
     session.close(); 
     connection.close(); 

    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 

public static void main(String[] args) { 

    for (int msg = 0; msg < 20; msg++) { 
     DataBean obj = getData(); 
     new Producer().sendResponse(xml, obj); 
     ; 
    } 
} 

}

Hanging Themen Ausnahmedetails:

Typ 1:

ActiveMQ Transport: tcp:///127.0.0.1:41818 
java.net.SocketInputStream.socketRead0(Native Method) 
java.net.SocketInputStream.read(SocketInputStream.java:152) 
java.net.SocketInputStream.read(SocketInputStream.java:122) 
org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50) 
org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58) 
java.io.DataInputStream.readInt(DataInputStream.java:387) 
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:272) 
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210) 
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202) 
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185) 
java.lang.Thread.run(Thread.java:745) 
Typ 210

2:

ActiveMQ Transport: tcp://localhost/127.0.0.1:61602 
java.net.SocketInputStream.socketRead0(Native Method) 
java.net.SocketInputStream.read(SocketInputStream.java:152) 
java.net.SocketInputStream.read(SocketInputStream.java:122) 
org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50) 
org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58) 
java.io.DataInputStream.readInt(DataInputStream.java:387) 
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:272) 
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210) 
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202) 
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185) 
java.lang.Thread.run(Thread.java:745) 

Bitte könnten Sie einige Hinweise zu diesem Thema für weitere Untersuchungen geben.

Edit: Ich lese einige Beiträge im Internet und folgerte, dass ich activemq JAR-Datei und implementieren Timeout aktualisieren müssen, aber wenn ich das Lesen über Timeout gestartet Einstellung dann bekam ich verwirrt, ob ich Timeout in Erzeuger- und Verbraucher oder Failover gesetzt sollte oder auf Nachrichten- oder Broker-Service. Das Timeout bei jeder Komponente hat einen anderen Zweck als die Zeitüberschreitung unter Berücksichtigung des obigen Codes und der Ausnahme.

+0

Alles, was Sie beschrieben haben, ist normal. Wenn Sie zu viele RUNNABLE haben, bedeutet dies, dass Sie zu viele Threads für die CPU-Leistung haben, die Sie haben.Hinweis: Wenn Thread auf "socketRead0" wartet, bedeutet das, dass der Thread nichts zu tun hat (er wartet darauf, dass der andere send etwas schreibt) –

+0

Warum zu viele Port-Nummern generiert - ich meine für jeden Thread eine eindeutige Port-Nummer. – user1800979

+1

tcp: //localhost/127.0.0.1: 61602 - Das ist wieder ein Problem. – user1800979

Antwort

1

Das Erstellen einer Verbindung ist sehr teuer und wenn Sie es schließen, wird der Anschluss für bis zu 3 Minuten beibehalten, um sicherzustellen, dass er ordnungsgemäß heruntergefahren wird.

Sie möchten Verbindungen nur erstellen, wenn Sie Leistungsprobleme wirklich vermeiden müssen. Ich schlage vor, dass Sie die Verbindung einmal erstellen und diese Verbindung geöffnet lassen, es sei denn, Sie erhalten einen Fehler. Dies kann die Leistung um 2 bis 3 Größenordnungen verbessern.

Dies ist ein gutes Leistungsabstimmungsmuster, das in vielen Fällen zutrifft;

  • nur erstellen und zerstören teure Ressourcen, wenn Sie wirklich müssen.
  • Operationen, die Sie häufig durchführen, sollten auf ein Minimum beschränkt sein. d. h. wiederhole so oft wie möglich.
+1

Laut dem obigen Code wird die Listenerverbindung nur einmal erstellt und läuft weiter, wobei die onMessage-Methode fortlaufend aufgerufen wird, aber die Producer-Verbindung für jeden Lauf erstellt wird der Schleife von der Hauptmethode. Produzent ist das Problem, wo ich nur einmal Verbindung herstellen muss. Bitte beraten. – user1800979

Verwandte Themen