2017-11-02 4 views
0

Ich muss eine Anwendung erstellen, die Daten von einem Datenerzeuger erhalten und verarbeiten soll. Ich habe RabbitMQ als Nachrichten-Broker gewählt. Meine Tests zeigen mir nicht die besten Ergebnisse:RabbitMQ langsam Empfangsgeschwindigkeit

Gesendet - 100 msg;
Produzieren - 100 msg/s;
Verbrauchen - 6 msg/s;

Um es zu lösen, ich setze listenerContainer.setAcknowledgeMode(AcknowledgeMode.NONE); Aber ich muss für einige Warteschlangen bestätigen. Und ich kann es nicht parallel mit Arbeitern tun, weil die Reihenfolge der Nachrichten für die Datenverarbeitung wichtig ist.

Ist es möglich, die Empfangsgeschwindigkeit zu erhöhen, bestätigt?

Hersteller:

@Bean 
Queue queue() { 
    return new Queue(queueName, false); 
} 

@Bean 
public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("ip"); 
    connectionFactory.setUsername("name"); 
    connectionFactory.setPassword("pswd"); 
    return connectionFactory; 
} 

@Bean 
public AmqpAdmin amqpAdmin() { 
    return new RabbitAdmin(connectionFactory()); 
} 

@Bean 
public RabbitTemplate rabbitTemplate() { 
    return new RabbitTemplate(connectionFactory()); 
} 

@Bean 
public FanoutExchange exchange() { 
    return new FanoutExchange("exchange-1"); 
} 

@Bean 
public Binding binding(){ 
    return BindingBuilder.bind(queue()).to(exchange()); 
} 

...

rabbitTemplate.setExchange("exchange-1"); 
rabbitTemplate.convertAndSend(data); 

Verbraucher:

@Bean 
public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("ip"); 
    connectionFactory.setUsername("name"); 
    connectionFactory.setPassword("pswd"); 
    return connectionFactory; 
} 

@Bean 
public RabbitTemplate rabbitTemplate() { 
    return new RabbitTemplate(connectionFactory()); 
} 

@Bean 
Queue queue() { 
    return new Queue("queue-1", false); 
} 

@Bean 
public SimpleMessageListenerContainer listenerContainer() { 
    SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(); 
    listenerContainer.setConnectionFactory(connectionFactory()); 
    listenerContainer.setQueues(queue()); 
    listenerContainer.setMessageListener(new Receiver()); 
    listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); 
    return listenerContainer; 
} 

...

@Override 
public void onMessage(Message message) { 
    System.out.println("Received message: " + fromBytes(message.getBody()) + " \n Time = " + System.currentTimeMillis()); 
} 

Getestet auf Instanz mit 2 vCPU und 4 GB Speicher.

Antwort

0

Sie können die prefetchCount des Containers erhöhen, was die Leistung erheblich verbessert. Wenn Sie jedoch eine Nachricht ablehnen und erneut anfordern, geht die Reihenfolge verloren (die angeforderte Nachricht wird hinter den vorab abgerufenen Nachrichten stehen).