2013-10-08 5 views
5

Ich möchte Nachrichten aus einer RabbitMq-Warteschlange parallel verarbeiten. Die Warteschlange ist so konfiguriert, dass sie autoAck = false ist. Ich verwende die camel-rabbitMQ-Unterstützung für camel endpoints, die Unterstützung für einen threadPoolSize-Parameter hat, aber dies hat nicht den gewünschten Effekt. Nachrichten werden weiterhin seriell aus der Warteschlange verarbeitet, auch wenn threadpoolsize = 20 ist.Rabbit Mq Java-Client parallelen Verbrauch

Vom Debugging über den Code kann ich sehen, dass der Parameter threadpoolsize verwendet wird, um einen ExecutorService zu erstellen, der verwendet wird, um wie in here beschrieben an die Kaninchen-confactfactor übergeben zu werden. Das alles sieht gut aus bis Sie in den Hasen ConsumerWorkService gelangen. Hier werden Nachrichten in Blöcken von maximal 16 Nachrichten verarbeitet. Jede Nachricht in einem Block wird seriell verarbeitet und dann, wenn mehr Arbeit zu erledigen ist, wird der Executor-Dienst mit dem nächsten Block aufgerufen. Ein Codeschnipsel davon ist unten. Von dieser Verwendung des Executor-Dienstes kann ich nicht sehen, wie die Nachrichten parallel verarbeitet werden können. Der Executorservice hat immer nur eine Aufgabe zu erledigen.

Was fehlt mir?

private final class WorkPoolRunnable implements Runnable { 

     public void run() { 
      int size = MAX_RUNNABLE_BLOCK_SIZE; 
      List<Runnable> block = new ArrayList<Runnable>(size); 
      try { 
       Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size); 
       if (key == null) return; // nothing ready to run 
       try { 
        for (Runnable runnable : block) { 
         runnable.run(); 
        } 
       } finally { 
        if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) { 
         ConsumerWorkService.this.executor.execute(new WorkPoolRunnable()); 
        } 
       } 
      } catch (RuntimeException e) { 
       Thread.currentThread().interrupt(); 
      } 
     } 
+0

Können Sie ConsumerWorkService so konfigurieren, dass eine andere Blockgröße verwendet wird? –

+1

Hi Claus, ich habe einige Änderungen an der Camel-rabbitmq-Komponente über Github als Fergus Nelson vorgenommen. Ich habe Änderungen am RabbitMqConsumer vorgenommen, um für jeden benötigten Konkurenten einen Kanal einzurichten. Ich werde eine Jira + Pull-Anfrage erstellen, wenn ich alles getestet habe. –

+0

@ mR_fr0g, wie ich verstehe, haben Sie das Problem behoben, indem Sie mehrere Kanäle in der Camel-RabbitMQ-Komponente erstellen. Könnten Sie den Link zu Ihrem Jira-Ticket angeben, eine Anfrage stellen und angeben, in welcher Camel-Version der Fix vorhanden ist? – wheleph

Antwort

3

Dokumentation RabbitMQ der ist darüber nicht sehr klar, aber, obwohl die ConsumerWorkService einen Thread-Pool verwendet wird, dieser Pool scheint nicht Nachrichten in einer Art und Weise verwendet werden, parallel zu verarbeiten:

Jeder Kanal hat einen eigenen Versand-Thread. Für den häufigsten Anwendungsfall eines Verbrauchers pro Kanal bedeutet dies, dass Verbraucher andere Verbraucher nicht aufhalten. Wenn Sie mehrere Konsumenten pro Kanal haben, müssen Sie sich darüber im Klaren sein, dass ein langjähriger Kunde die Rücksendung von Rückrufen an andere Verbraucher auf diesem Kanal verzögern kann.

(http://www.rabbitmq.com/api-guide.html)

Diese Dokumentation schlägt vor, ein Channel pro Faden und in der Tat, wenn man einfach so viele Channel s als das erforderliche Niveau der Parallelität erstellen, werden Nachrichten zwischen den zu verknüpften Verbraucher versendet werden diese Kanäle.

Ich habe mit 2 Kanälen und Verbrauchern getestet: Wenn 2 Nachrichten in der Warteschlange sind, wählt jeder Verbraucher nur eine Nachricht auf einmal. Die Blöcke von 16 Nachrichten, die Sie erwähnt haben, scheinen sich nicht zu stören, was eine gute Sache ist.

In der Tat, Spring AMQP erstellt auch als mehrere Kanäle, um Nachrichten gleichzeitig zu verarbeiten. Dies geschieht durch:

ich auch diese wie erwartet werden getestet haben.

3

Wenn Sie eine einzelne Channel -Instanz haben, wird sie ihre registrierten Benutzer seriell aufrufen, wie Sie bei der Untersuchung ConsumerWorkService richtig herausgefunden haben. Es gibt 2 Möglichkeiten, das zu überwinden:

  1. Verwenden Sie mehrere Kanäle statt einer.
  2. Verwenden Sie einen einzelnen Kanal, aber implementieren Sie die Verbraucher auf besondere Weise. Sie sollten nur eingehende Nachrichten aus der Warteschlange auswählen und sie als Aufgabe in einen internen Thread-Pool stellen.

Weitere Informationen finden Sie unter this post.