2017-01-13 5 views
2

Ich benutze Java RabbitMQ Client. Ich veröffentliche eine Nachricht (basicPublish) und schließe dann den Kanal. Im Bereich Consumer, Ausnahme channel.basicAck Wurf:RabbitMQ Verbindung nach Anruf schließen channel.close()

com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.util.concurrent.RejectedExecutionException: Task [email protected]5de7c7 rejected from [email protected][Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 5] 

Wenn löschen channel.close(), tritt der Fehler nicht wiedergegeben. Warum Verbindung geschlossen, wenn ich den Kanal schließe?

gesendete Nachrichten zum Austausch:

Channel channel = connection.createChannel(); 
Set<String> expectedMessages = new HashSet<>(MESSAGES_COUNT); 
for (int i = 0; i < MESSAGES_COUNT; i++) { 
    String message = Integer.toString(i); 
    channel.basicPublish(
     TEST_EXCHANGE, 
     ROUTE_KEY, 
     TEXT_PLAIN, 
     message.getBytes("UTF-8") 
    ); 
    expectedMessages.add(message); 
} 
channel.close(); 

Verbraucher:

try { 
    channel.basicAck(deliveryTag, false); 
} catch (Exception e) { 
    log.error("Error during message handling: " + consumerTag, e); 
    channel.basicNack(deliveryTag, false, true); 
} 
+0

Wenn Sie in der Nähe Kanal die Verbindung nicht schließen Sie, Sie vielleicht einen anderen Fehler im Code haben. – Gabriele

+0

Was könnte der Grund für ein solches Verhalten sein? Kann ich den Kanal nicht schließen? – Dmitriy

+0

ohne Code ist es schwer zu wissen! – Gabriele

Antwort

0

Das Problem in den falschen Einstellungen connection war. Falsch:

executorService = new ThreadPoolExecutor(
      properties.minThreads, properties.maxThreads, 
      properties.maxThreadIdle, TimeUnit.SECONDS, 
      new SynchronousQueue<>() 
    ); 
    connectionFactory.setSharedExecutor(executorService); 

Für die korrekte Arbeit:

executorService = new ThreadPoolExecutor(
      properties.minThreads, properties.maxThreads, 
      properties.maxThreadIdle, TimeUnit.SECONDS, 
      new LinkedBlockingQueue<>() 
    ); 
    connectionFactory.setSharedExecutor(executorService);