Unten ActiveMQ-Implementierung ist in Code vorhanden. Manchmal hört das System auf zu arbeiten und wird sehr langsam. Als ich den Thread-Dump mit JavaMelody überprüft habe - ich habe gesehen, dass zu viele Threads lange im Runnable-Zustand sind und nicht beendet werden.ActiveMQ-Transport: tcp: Thread RUNNABLE-Status - zu viele Threads hängen
ActiveMQ Version - activemq-all-5.3.0.jar
Code unten entnehmen Sie bitte:
Makler:
public class ActiveMQ extends HttpServlet {
private static final long serialVersionUID = -1234568008764323456;
private static final Logger logger = Logger.getLogger(ActiveMQ.class.getName());
public Listener listener;
private String msgBrokerUrl = "tcp://localhost:61602";
public BrokerService broker = null;
public TransportConnector connector = null;
@Override
public void init() throws ServletException {
try {
broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
connector = broker.addConnector(msgBrokerUrl);
broker.setUseShutdownHook(true);
System.out.println("BROKER LOADED");
broker.start();
broker.deleteAllMessages();
listener = new Listener();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Zuhörer:
public class Listener implements MessageListener {
private String msgQueueName = "jms/queue/MessageQueue";
public Session session;
public Destination adminQueue;
public static String id;
public ActiveMQConnection connection;
public MessageConsumer consumer = null;
public Listener() {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
new URI("failover://(" + "tcp://localhost:61602" + "?wireFormat.cacheEnabled=false"
+ "&wireFormat.maxInactivityDuration=0&wireFormat.tightEncodingEnabled=true)?maxReconnectDelay=1000"));
connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
adminQueue = session.createQueue(msgQueueName);
id = new Timestamp(new Date().getTime()).toString();
consumer = this.session.createConsumer(this.adminQueue, "ID='" + id + "'");
consumer.setMessageListener(this);
} catch (JMSException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
@SuppressWarnings("unchecked")
@Override
public void onMessage(Message message) {
TextMessage msg = (TextMessage) message;
try {
String xmlMsg = msg.getText();
// business logic
} catch (JMSException ex) {
ex.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
Hersteller:
public class Producer {
private static String url = "tcp://localhost:61602";
private static String msgQueueName = "jms/queue/MessageQueue";
public ConnectionFactory connectionFactory = null;
public Connection connection = null;
public Session session = null;
public Destination destination = null;
public Producer() {
connectionFactory = new ActiveMQConnectionFactory(url);
}
public void sendResponse(String xml, DataBean objDataBean) {
sendToQueue(xml, msgQueueName, objDataBean);
}
private void sendToQueue(String xml, String msgQueueName, DataBean obj) {
try {
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(msgQueueName);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage(xml);
message.setJMSExpiration(1000);
message.setStringProperty(obj.getMsgKey(), obj.getMsgValue());
producer.send(message);
xml = null;
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
for (int msg = 0; msg < 20; msg++) {
DataBean obj = getData();
new Producer().sendResponse(xml, obj);
;
}
}
}
Hanging Themen Ausnahmedetails:
Typ 1:
ActiveMQ Transport: tcp:///127.0.0.1:41818
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:152)
java.net.SocketInputStream.read(SocketInputStream.java:122)
org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
java.io.DataInputStream.readInt(DataInputStream.java:387)
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:272)
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210)
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202)
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
java.lang.Thread.run(Thread.java:745)
Typ 210
2:
ActiveMQ Transport: tcp://localhost/127.0.0.1:61602
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:152)
java.net.SocketInputStream.read(SocketInputStream.java:122)
org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
java.io.DataInputStream.readInt(DataInputStream.java:387)
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:272)
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210)
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202)
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
java.lang.Thread.run(Thread.java:745)
Bitte könnten Sie einige Hinweise zu diesem Thema für weitere Untersuchungen geben.
Edit: Ich lese einige Beiträge im Internet und folgerte, dass ich activemq JAR-Datei und implementieren Timeout aktualisieren müssen, aber wenn ich das Lesen über Timeout gestartet Einstellung dann bekam ich verwirrt, ob ich Timeout in Erzeuger- und Verbraucher oder Failover gesetzt sollte oder auf Nachrichten- oder Broker-Service. Das Timeout bei jeder Komponente hat einen anderen Zweck als die Zeitüberschreitung unter Berücksichtigung des obigen Codes und der Ausnahme.
Alles, was Sie beschrieben haben, ist normal. Wenn Sie zu viele RUNNABLE haben, bedeutet dies, dass Sie zu viele Threads für die CPU-Leistung haben, die Sie haben.Hinweis: Wenn Thread auf "socketRead0" wartet, bedeutet das, dass der Thread nichts zu tun hat (er wartet darauf, dass der andere send etwas schreibt) –
Warum zu viele Port-Nummern generiert - ich meine für jeden Thread eine eindeutige Port-Nummer. – user1800979
tcp: //localhost/127.0.0.1: 61602 - Das ist wieder ein Problem. – user1800979