2016-06-21 6 views
0

Ich brauche eine Hilfe, um einen SimpleMessageListenerContainer in einer Weise zu implementieren, dass es die Nachricht untransaktional entfernt, sobald sie vom Nachrichtenempfänger gelesen wird.Wie man Nachrichten zerstörend aus RabbitMq-Warteschlange mit SimpleMessageListenerContainer Bean

In meinem Fall, unabhängig von erfolgreichen oder nicht erfolgreichen Transaktionen, die Nachricht in der Warteschlange hängen irgendwo (nicht in der Warteschlange), und wird wiederholt bei jeder Operation, die aus der Warteschlange liest verarbeitet. Daher bleiben alle anderen Nachrichten, die in die Warteschlange eingereiht werden, nicht zugänglich, und nur die erste Nachricht wird jedes Mal erneut verarbeitet. Das andere seltsame Ding ist, dass ich die Nachrichten, die in der Warteschlange landet/Schlange stehen, nicht sehen kann, dh die Warteschlangentiefe ändert sich nie auf der Rabbit Management Konsole, sondern nur die Nachrichtenrate nimmt einen Sprung bei jedem Schreiben in die Warteschlange .

Unten ist das Code-Snippet meiner Java-Konfiguration. Es wird eine große Hilfe sein, wenn jemand hier den Fehler hinweisen können: -

@Configuration 
@EnableJpaRepositories(basePackages={"com.xxx.muppets"}) 
public class MuppetMessageConsumerConfig { 

private ApplicationContext applicationContext; 
@Value("${rabbit.queue.name}") 
private String queueName; 

@Autowired 
public void setApplicationContext(ApplicationContext applicationContext) { 
    this.applicationContext = applicationContext; 
} 

@Bean 
Queue queue() { 
    return new Queue(queueName, false); 
} 

@Bean 
TopicExchange exchange() { 
    return new TopicExchange("spring-boot-exchange"); 
} 

@Bean 
Binding binding(Queue queue, TopicExchange exchange) { 
    return BindingBuilder.bind(queue).to(exchange).with(queueName); 
} 

@Bean 
MuppetMsgReceiver muppetMsgReceiver(){ 
    return new MuppetMsgReceiver(); 
} 

@Bean 
MessageListenerAdapter listenerAdapter(MuppetMsgReceiver muppetMsgReceiver){ 
    return new MessageListenerAdapter(muppetMsgReceiver, "receiveMessage"); 
} 

@Bean 
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
    container.setAcknowledgeMode(NONE); 
    container.setConnectionFactory(connectionFactory); 
    container.setQueueNames(queueName); 
    container.setMessageListener(listenerAdapter); 
    return container; 
} 
} 

Meine Nachricht Empfänger Klasse ist wie folgt:

public class MuppetMsgReceiver { 

private String muppetMessage; 

private CountDownLatch countDownLatch; 

public MuppetMsgReceiver() { 
    this.countDownLatch = new CountDownLatch(1); 
} 

public MuppetMsgReceiver(CountDownLatch latch) { 
    this.countDownLatch = latch;   
    CountDownLatch getCountDownLatch() { 
    return countDownLatch; 
} 

public void receiveMessage(String receivedMessage) { 
    countDownLatch.countDown(); 
    this.muppetMessage = receivedMessage; 
} 

public String getMuppetMessage() { 
    return muppetMessage; 
} 
} 

Dieser vollständige Code auf dem getting started example des Frühlings basiert aber hilft nicht durch zerstörungsfreie Lesevorgänge aus der Warteschlange.

Antwort

0

AcknowledgeMode=NONE bedeutet den Broker acks die Nachricht, sobald es an den Kunden versandt, nicht, wenn der Verbraucher erhält es so Ihr beobachtete Verhalten für mich keinen Sinn macht. Es ist ziemlich ungewöhnlich, diesen Ack-Modus zu verwenden, aber die Nachricht wird gelöscht, sobald sie gesendet wird. Dies ist wahrscheinlich ein Problem mit Ihrem Code. Mit der Protokollierung von TRACE können Sie sehen, was passiert.

+0

Ich stimme Gary vollkommen zu, und das ist der Grund, den Ack Mode auf None zu setzen. Ich denke, dass es für mich nützlich sein könnte zu verstehen, wie SimpleMessageListenerContainer funktioniert. Ich hatte erwartet, dass Nachrichten nur dann aus der Warteschlange gelesen werden, wenn sie angefordert werden, aber es sieht so aus, als ob die erstellte Bean des Empfängers die Nachricht konsumiert, sobald sie auf dem Broker veröffentlicht wird. Könnten Sie mir bitte helfen, wenn dieses Verständnis korrekt ist, und wenn ja, wo werden alle gelesenen Nachrichten aufbewahrt/gespeichert? Dieser Speicher ist der Ort, der für mich nicht richtig aussieht. –

+0

Sie sind nicht "gespeichert"; Sie werden in eine Warteschlange in dem BlockingQueueConsumer geschrieben (der auf der Basis von prefetchCount dimensioniert ist, um eine OOM-Bedingung zu vermeiden, wenn der Verbraucher nicht mithalten kann). Der Consumer-Thread entfernt jede Nachricht und ruft die Listener-Bean auf. Es gibt keine Möglichkeit der erneuten Zustellung einer Nachricht mit diesem Szenario. –

+0

Der 'BlockingQueueConsumer' wird von RabbitMq verwaltet, und in diesem Fall ist es immer noch nicht sicher, warum meine Anwendung sich falsch verhält und immer nur die erste Nachricht abrufen kann. Ich werde es immer noch mit meinen Testfällen anschauen und trainieren. Falls Sie mögliche Gründe für dieses (Fehl-) Verhalten kennen, lassen Sie es mich bitte wissen. –

Verwandte Themen