Ich muss eine Anwendung erstellen, die Daten von einem Datenerzeuger erhalten und verarbeiten soll. Ich habe RabbitMQ als Nachrichten-Broker gewählt. Meine Tests zeigen mir nicht die besten Ergebnisse:RabbitMQ langsam Empfangsgeschwindigkeit
Gesendet - 100 msg;
Produzieren - 100 msg/s;
Verbrauchen - 6 msg/s;
Um es zu lösen, ich setze listenerContainer.setAcknowledgeMode(AcknowledgeMode.NONE);
Aber ich muss für einige Warteschlangen bestätigen. Und ich kann es nicht parallel mit Arbeitern tun, weil die Reihenfolge der Nachrichten für die Datenverarbeitung wichtig ist.
Ist es möglich, die Empfangsgeschwindigkeit zu erhöhen, bestätigt?
Hersteller:
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("ip");
connectionFactory.setUsername("name");
connectionFactory.setPassword("pswd");
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public FanoutExchange exchange() {
return new FanoutExchange("exchange-1");
}
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(exchange());
}
...
rabbitTemplate.setExchange("exchange-1");
rabbitTemplate.convertAndSend(data);
Verbraucher:
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("ip");
connectionFactory.setUsername("name");
connectionFactory.setPassword("pswd");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
Queue queue() {
return new Queue("queue-1", false);
}
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory());
listenerContainer.setQueues(queue());
listenerContainer.setMessageListener(new Receiver());
listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
return listenerContainer;
}
...
@Override
public void onMessage(Message message) {
System.out.println("Received message: " + fromBytes(message.getBody()) + " \n Time = " + System.currentTimeMillis());
}
Getestet auf Instanz mit 2 vCPU und 4 GB Speicher.