2016-12-14 4 views
2

meine Architektur Gründend dieses Beispiels aus:Kanäle in RabbitMQ mit Arbeitswarteschlange Szenario Leerlauf

RabbitMQ - Work Queues

Setup:

  • Arbeiter erhalten eine Nachricht zu einer Zeit
  • jeder Arbeiter lädt ein Dokument, dauert ein paar Sekunden
  • Sobald ein Worker erfolgreich ein Dokument herunterlädt, bestätigt es die Nachricht
  • , wenn ein Arbeitnehmer ein Dokument zum Download fehlschlägt, Noacks es die Nachricht für die Wieder Queuing (maximal drei Neuversuche)

ich in Engpässe in meiner Implementierung suchen, Verlangsamungen verursacht wird. Da ich noAck verwenden, um fehlgeschlagene Worker neu zu ordnen. Um diese gleichmäßig über meine Worker-Threads zu ermöglichen Ich habe gesetzt Prefetchs 1.Looking auf diese Frage: RabbitMQ work queue is blocking consumers - Sie werden sehen, was ich in den folgenden Screenshots sehen:

acks/second

channels

zu machen Bei den sicheren Arbeitern wird nur jeweils eine Nachricht zugewiesen. Ich benötige den Prefetch-Wert auf 1, aber andere sagen, dass dies dazu führt, dass die Arbeiter sequenziell und nicht parallel arbeiten.

Was bedeutet Laufen eigentlich auf Kanalebene? Ich sehe, dass die Warteschlange und die Verbindung ordnungsgemäß ausgeführt werden, aber einzelne Kanäle (einer pro Thread) sind im Leerlauf.

EDIT # 1: Dieser Hinweis zum Übergeben eines Verbindungspools an die RabbitMQ-Verbindung sieht vielversprechend aus. https://www.rabbitmq.com/api-guide.html#consumer-thread-pool ich mit Spring AMQP aber ich denke, ein ähnlicher Ansatz hier verwendet werden kann:

 /** 
    * Configure a large thread pool for concurrent channels on the physical Connection 
    */ 
    @Bean 
    public org.springframework.amqp.rabbit.connection.CachingConnectionFactory rabbitConnectionFactory() { 
     logger.info("Configuring connection factory"); 
     CachingConnectionFactory cf = new CachingConnectionFactory(); 
     cf.setAddresses(this.rabbitMQProperties.getAddresses()); 
     cf.setUsername(this.rabbitMQProperties.getUsername()); 
     cf.setPassword(this.rabbitMQProperties.getPassword()); 
     cf.setVirtualHost(this.rabbitMQProperties.getVirtualHost()); 
     //configure a large thread pool for the connection thread 
     int threads = 30; 
     logger.info(String.format("Configuring thread pool with %d threads", threads)); 
     ExecutorService connectionPool = Executors.newFixedThreadPool(threads); 
     cf.setExecutor(connectionPool); 
     logger.info(String.format("MQ cache mode: %s", cf.getCacheMode().toString())); 
     logger.info(String.format("MQ connection cache: %d", cf.getConnectionCacheSize())); 
     logger.info(String.format("MQ channel cache: %d", cf.getChannelCacheSize())); 
     return cf; 
    } 

    @Bean 
    AmqpTemplate rabbitTemplate(org.springframework.amqp.rabbit.connection.CachingConnectionFactory connectionFactory){ 
     AmqpTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); 
     return rabbitTemplate; 
    } 

Antwort

1

Apparantly in Spring AMQP der Standard-Thread-Pool für eine einzelne physische TCP/IP-Verbindung zu 5 Fäden vorbelegt:

Spring AMQP

auch zum Zeitpunkt des Schreibens, die rabbitmq-Client-Bibliothek erstellt einen festen Thread-Pool für jede Verbindung (5 threads) standardmäßig. Wenn Sie eine große Anzahl von Verbindungen verwenden, sollten Sie einen benutzerdefinierten Executor für die CachingConnectionFactory festlegen. Dann wird derselbe Executor von allen Verbindungen verwendet und seine Threads können geteilt werden. Der Thread-Pool des Executors sollte unbegrenzt sein oder für die erwartete Verwendung geeignet sein (normalerweise mindestens ein Thread pro Verbindung). Wenn mehrere Kanäle für jede Verbindung erstellt werden, wirkt sich die Poolgröße auf die Parallelität aus. Daher wäre ein ausführbarer Threadpoolpool (oder ein einfacher Cache) am besten geeignet.

Ich konnte dies replizieren, indem die Anzahl der Threads zugewiesen RabbitMQ der Verbindungspool zu ändern:

/** 
    * Expand the number of concurrent threads for a single RabbitMQ connection 
    * http://docs.spring.io/spring-amqp/reference/htmlsingle/ 
    * Also, at the time of writing, the rabbitmq-client library creates a fixed thread pool for each connection (5 threads) by default. 
    * When using a large number of connections, you should consider setting a custom executor on the CachingConnectionFactory. 
    */ 
    @Bean(name="channelPool") 
    @Scope("singleton") 
    MigrationPool rabbitConnectionPool(){ 
     int channels = 50; 
     logger.info(String.format("Configuring connection pool with %d threads", channels)); 
     return new MigrationPool(channels, channels, 0L, TimeUnit.MILLISECONDS, 
       new LinkedBlockingQueue<Runnable>()); 
    } 

    /** 
    * Configure a large thread pool for concurrent channels on the physical Connection 
    */ 
    @Bean 
    public org.springframework.amqp.rabbit.connection.CachingConnectionFactory rabbitConnectionFactory(@Qualifier("channelPool") MigrationPool connectionPool) { 
     logger.info("Configuring connection factory"); 
     CachingConnectionFactory cf = new CachingConnectionFactory(); 
     cf.setAddresses(this.rabbitMQProperties.getAddresses()); 
     cf.setUsername(this.rabbitMQProperties.getUsername()); 
     cf.setPassword(this.rabbitMQProperties.getPassword()); 
     cf.setVirtualHost(this.rabbitMQProperties.getVirtualHost()); 
     cf.setExecutor(connectionPool); 
     logger.info(String.format("MQ cache mode: %s", cf.getCacheMode().toString())); 
     logger.info(String.format("MQ connection cache: %d", cf.getConnectionCacheSize())); 
     logger.info(String.format("MQ channel cache: %d", cf.getChannelCacheSize())); 
     logger.info(String.format("MQ thread pool: %d threads", connectionPool.getMaximumPoolSize())); 
     return cf; 
    } 

In dem obigen Code habe ich die Anzahl der Threads pro Verbindung die Anzahl der virtuellen Kanäle Spiegelung dh ein realer physischer Thread für jeden virtuellen RabbitMQ-Kanal, da jeder Kanal auf einen Worker-Thread verweist, der jeweils eine Nachricht verarbeitet.Dies führt in den Kanälen nicht mehr auf den Standard-5-Verbindungen blockiert und stattdessen den vollen Nutzen aus der erweiterten Anzahl von Threads unter:

channels no longer blocking

Manipulieren die Anzahl der Threads zur Verfügung RabbitMQ der Verbindung werden Kanäle Blockierung zeigen. Einstellung auf 10 Threads und 50 Kanäle zum Beispiel öffnen.