2017-11-09 2 views
0

Ich habe einen JmsConnectionFactory mit einem URI konfiguriert wie folgt aus:Apache Qpid JMS-Client - Einstellung Prefetch funktioniert nicht

failover:(amqps://11.22.33.44?amqp.idleTimeout=120000&transport.tcpKeepAlive=true&jms.prefetchPolicy.all=10)?failover.maxReconnectAttempts=20

Notiere die jms.prefetchPolicy.all=10 Parameter, der

auf der offiziellen documentation nach

... steuert, wie viele Nachrichten die Gegenstelle kann an den Client senden und in einem Prefetch-Puffer für jeden Verbraucher Instanz gehalten werden.

Also sollte ich nicht mehr als 10 Nachrichten im Client gepuffert sehen, oder? Nun, das funktioniert nicht.

Ich habe mit Reflexion endete in regelmäßigen Abständen die MessageQueue.size() von jedem JmsMessageConsumer drucken:

MessageConsumer messageConsumer = ... 
Field field = JmsMessageConsumer.class.getDeclaredField("messageQueue"); 
field.setAccessible(true); 
MessageQueue q = (MessageQueue) field.get(messageConsumer); 
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> System.out.println(q.size()), 10, 10, TimeUnit.SECONDS); 

Und wenn Handler meiner Nachricht (oder gesperrt) langsam sind, ich sehe Warteschlange Größen von etwas weniger als 1000 Meldungen, Dies ist die Standard-Prefetch-Größe.

Also - ist das ein Fehler? Und wie gehe ich vor, um eine andere Prefetch-Größe festzulegen?

Ich verwende qpid-jms-client, Version 0.27.0.

Antwort

1

Ihre URI falsch ist, sollte es sein:

failover://(amqps://11.22.33.44?amqp.idleTimeout=120000&transport.tcpKeepAlive=true) 
    ?jms.prefetchPolicy.all=10&failover.maxReconnectAttempts=20 

Die JMS Optionen global sind, so dass sie zusammen mit den Failover-Optionen an den äußersten Teil der URI angewendet werden. Die umschlossenen AMQP-Verbindungs-URIs enthalten nur Transportoptionen, die jede spezifische Verbindung steuern.

+0

Ah, das erklärt es. Ich werde es später ausprobieren. – Malt

0

Ich habe es herausgefunden.

JmsConnectionFactory hat eine JmsPrefetchPolicy. Ich sehe nicht, wie es soll einen URI-Parameter eingestellt werden muss, aber es kann mit JmsConnectionFactory.setPrefetchPolicy() wie folgt festgelegt werden:

JmsConnectionFactory cf = ... 
JmsDefaultPrefetchPolicy prefetchPolicy = new JmsDefaultPrefetchPolicy(); 
prefetchPolicy.setAll(123); // Set prefetch size here 
cf.setPrefetchPolicy(prefetchPolicy); 

nicht gesehen haben dies nirgends dokumentiert.

Verwandte Themen