meine Architektur Gründend dieses Beispiels aus:Kanäle in RabbitMQ mit Arbeitswarteschlange Szenario Leerlauf
Setup:
- Arbeiter erhalten eine Nachricht zu einer Zeit
- jeder Arbeiter lädt ein Dokument, dauert ein paar Sekunden
- Sobald ein Worker erfolgreich ein Dokument herunterlädt, bestätigt es die Nachricht
- , wenn ein Arbeitnehmer ein Dokument zum Download fehlschlägt, Noacks es die Nachricht für die Wieder Queuing (maximal drei Neuversuche)
ich in Engpässe in meiner Implementierung suchen, Verlangsamungen verursacht wird. Da ich noAck verwenden, um fehlgeschlagene Worker neu zu ordnen. Um diese gleichmäßig über meine Worker-Threads zu ermöglichen Ich habe gesetzt Prefetchs 1.Looking auf diese Frage: RabbitMQ work queue is blocking consumers - Sie werden sehen, was ich in den folgenden Screenshots sehen:
zu machen Bei den sicheren Arbeitern wird nur jeweils eine Nachricht zugewiesen. Ich benötige den Prefetch-Wert auf 1, aber andere sagen, dass dies dazu führt, dass die Arbeiter sequenziell und nicht parallel arbeiten.
Was bedeutet Laufen eigentlich auf Kanalebene? Ich sehe, dass die Warteschlange und die Verbindung ordnungsgemäß ausgeführt werden, aber einzelne Kanäle (einer pro Thread) sind im Leerlauf.
EDIT # 1: Dieser Hinweis zum Übergeben eines Verbindungspools an die RabbitMQ-Verbindung sieht vielversprechend aus. https://www.rabbitmq.com/api-guide.html#consumer-thread-pool ich mit Spring AMQP aber ich denke, ein ähnlicher Ansatz hier verwendet werden kann:
/**
* Configure a large thread pool for concurrent channels on the physical Connection
*/
@Bean
public org.springframework.amqp.rabbit.connection.CachingConnectionFactory rabbitConnectionFactory() {
logger.info("Configuring connection factory");
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.rabbitMQProperties.getAddresses());
cf.setUsername(this.rabbitMQProperties.getUsername());
cf.setPassword(this.rabbitMQProperties.getPassword());
cf.setVirtualHost(this.rabbitMQProperties.getVirtualHost());
//configure a large thread pool for the connection thread
int threads = 30;
logger.info(String.format("Configuring thread pool with %d threads", threads));
ExecutorService connectionPool = Executors.newFixedThreadPool(threads);
cf.setExecutor(connectionPool);
logger.info(String.format("MQ cache mode: %s", cf.getCacheMode().toString()));
logger.info(String.format("MQ connection cache: %d", cf.getConnectionCacheSize()));
logger.info(String.format("MQ channel cache: %d", cf.getChannelCacheSize()));
return cf;
}
@Bean
AmqpTemplate rabbitTemplate(org.springframework.amqp.rabbit.connection.CachingConnectionFactory connectionFactory){
AmqpTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}