2016-07-01 12 views
1

Hallo Ich entwickle Spring-Boot-RabbitMQ Version 1.6.Ich habe einige Fragen bei der Entwicklung der Anwendung. Lesen Sie die Dokumentation und durchsuchen Sie andere Fragen zum Stapelüberlauf, aber ich kann nicht wenige Dinge klarstellen (möglicherweise wegen meinem schlechten Speicher). Es wäre toll, wenn jemand meine Fragen beantwortet.Umgang mit Verbindungen im Frühjahr-Boot-RabbitMQ

1) Zur Zeit habe ich 4-Producers und 4-Consumers.Producer kann Millionen von Nachrichten oder Events produzieren, so dass eine einzige Verbindung für beide Hersteller & Verbraucher Verbraucher blockieren, um die Nachrichten zu konsumieren.So, was ich dachte, ist Erstellen von separaten Verbindungen für Hersteller und Verbraucher, so dass beide nicht blockiert werden und zu einer Leistungsverbesserung führen. Bricht ich diesen Ansatz ab?

2) Ich verwende CachingConnectionFactory, um Verbindung mit SimpleRabbitListenerContainerFactory .While machen Aufruf dieser Fabrik zu schaffen, ob es sich für uns neue Verbindung zurück? Wenn wir also CachingConnectionFactory verwenden tun wir wirklich schreiben müssen eine separate Verbindung Fabriken für beide Hersteller & consumer.Please mein zu finden unter

1) Konfigurationsklasse

@Configuration 
@EnableRabbit 
public class RabbitMqConfiguration{ 

@Autowired 
private CachingConnectionFactory cachingConnectionFactory; 

@Value("${concurrent.consumers}") 
public int concurrent_consumers; 

@Value("${max.concurrent.consumers}") 
public int max_concurrent_consumers; 

@Bean 
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { 
     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
     factory.setConnectionFactory(cachingConnectionFactory); 
     factory.setConcurrentConsumers(concurrent_consumers); 
     factory.setMaxConcurrentConsumers(max_concurrent_consumers); 
     factory.setMessageConverter(jsonMessageConverter()); 
     return factory; 
    } 

@Bean 
public MessageConverter jsonMessageConverter() 
{ 
    final Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); 
    return converter; 
} 

} 

2) Producer Klasse

@Configuration 
public class TaskProducerConfiguration extends RabbitMqConfiguration { 

@Value("${queue1}") 
public String queue1; 

@Value("${queue2}") 
public String queue2; 

@Value("${queue3}") 
public String queue1; 

@Value("${queue4}") 
public String queue2; 

@Value("${spring.rabbit.exchange}") 
public String exchange; 

@Autowired 
private CachingConnectionFactory cachingConnectionFactory; 

@Primary 
@Bean 
public RabbitTemplate getQueue1Template() 
{ 
    RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory); 
    template.setRoutingKey(this.queue1); 
    template.setMessageConverter(jsonMessageConverter()); 
    return template; 
} 

@Bean 
public RabbitTemplate getQueue2Template() 
{ 
    RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory); 
    template.setRoutingKey(this.queue2); 
    template.setMessageConverter(jsonMessageConverter()); 
    return template; 
} 

@Bean 
public RabbitTemplate getQueue3Template() 
{ 
    RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory); 
    template.setRoutingKey(this.queue3); 
    template.setMessageConverter(jsonMessageConverter()); 
    return template; 
} 

@Bean 
public RabbitTemplate getQueue4Template() 
{ 
    RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory); 
    template.setRoutingKey(this.queue4); 
    template.setMessageConverter(jsonMessageConverter()); 
    return template; 
} 
@Bean(name="queue1Bean") 
public Queue queue1() 
{ 
    return new Queue(this.queue1); 
} 

@Bean(name="queue2Bean") 
public Queue queue2() 
{ 
    return new Queue(this.queue2); 
} 

@Bean(name="queue3Bean") 
public Queue queue3() 
{ 
    return new Queue(this.queue3); 
} 

@Bean(name="queue4Bean") 
public Queue queue4() 
{ 
    return new Queue(this.queue4); 
} 

@Bean 
TopicExchange exchange() { 
    return new TopicExchange(exchange); 
} 

@Bean 
List<Binding> bindings(Queue queue1Bean,Queue queue2Bean,Queue queue3Bean,Queue queue4Bean, TopicExchange exchange) { 
    List<Binding> bindingList = new ArrayList<Binding>(); 
    bindingList.add(BindingBuilder.bind(queue1Bean).to(exchange).with(this.queue1)); 
    bindingList.add(BindingBuilder.bind(queue2Bean).to(exchange).with(this.queue2)); 
    bindingList.add(BindingBuilder.bind(queue3Bean).to(exchange).with(this.queue3)); 
    bindingList.add(BindingBuilder.bind(queue4Bean).to(exchange).with(this.queue4)); 
    return bindingList; 
} 

} 

3) Empfängerklasse (Just Gemeinsamer einen Empfänger Klasse Rest der 3-Empfänger Klassen ein und derselben sind außer Warteschlangennamen & Routing-Taste).

@Component 
public class Queue1Receiver { 

@Autowired 
private TaskProducer taskProducer; 

@Value("${queue1}") 
public String queue1; 

@RabbitListener(id="queue1",containerFactory="rabbitListenerContainerFactory",queues = "#{queue1Bean}") 
public void handleQueue1Message(TaskMessage taskMessage,@Header(AmqpHeaders.CONSUMER_QUEUE) String queue) 
{ 
    System.out.println("Queue::"+queue); 
    System.out.println("CustomerId: " + taskMessage.getCustomerID()); 
    if(taskMessage.isHasQueue2()){ 
     taskProducer.sendQueue2Message(taskMessage); 
    } 
    if(taskMessage.isHasQueue3()){ 
     taskProducer.sendQueue3Message(taskMessage); 
    } 
    if(taskMessage.isHasQueue4()){ 
     taskProducer.sendQueue4Message(taskMessage); 
    } 
} 

@Bean 
public Queue queue1Bean() { 
    // This queue has the following properties: 
    // name: my_durable,durable: true,exclusive: false,auto_delete: false 
    return new Queue(queue1, true, false, false); 
} 

} 

Ihre Hilfe sollte spürbar sein.

Hinweis: Down Wähler bitte registrieren Sie Ihren Kommentar vor der Abstimmung, damit ich in Zukunft den Fehler vermeiden kann.

Edited basierend auf Kommentare von Gary Russell: 1) RabbitMqConfiguration

@Configuration 
@EnableRabbit 
public class RabbitMqConfiguration{ 

@Value("${concurrent.consumers}") 
public int concurrent_consumers; 

@Value("${max.concurrent.consumers}") 
public int max_concurrent_consumers; 

@Bean 
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { 
     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
     factory.setConnectionFactory(connectionFactory()); 
     factory.setConcurrentConsumers(concurrent_consumers); 
     factory.setMaxConcurrentConsumers(max_concurrent_consumers); 
     factory.setMessageConverter(jsonMessageConverter()); 
     return factory; 
    } 

@Bean 
public CachingConnectionFactory connectionFactory() 
{ 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); 
    connectionFactory.setUsername("guest"); 
    connectionFactory.setPassword("guest"); 
    connectionFactory.setCacheMode(CacheMode.CONNECTION); 
    return connectionFactory; 
} 


@Bean 
public MessageConverter jsonMessageConverter() 
{ 
    final Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); 
    return converter; 
} 


} 

Antwort

3

eine einzige Verbindung für beide Hersteller & Verbraucher

die messages` verbrauchen blockiert Verbraucher mit Was führt Sie dazu, das zu glauben? Eine einzelne Verbindung wird im Allgemeinen gut sein. Wenn Sie wirklich separate Verbindungen wünschen, ändern Sie die Verbindungsfactory cacheMode zu CONNECTION.

+0

@Gray Russell Vielen Dank für Ihre Antwort.Praktisch Produzent kann mehr als eine Million Nachrichten zu einer Zeit, so in diesem Fall Single Connection ist genug? Auch beim Lesen einige Blogs, die sie angegeben separate Verbindung für Hersteller und Konsumenten, das bin ich hat die Frage hier gepostet. – VelNaga

+0

was eigentlich cacheMode = CONNECTION wird tun? Jetzt habe ich meinen Code geändert. Geht es jetzt gut? @Bean öffentliche CachingConnectionFactory consumerConnectionFactory() { CachingConnectionFactory connectionFactory = neue CachingConnectionFactory ("localhost"); connectionFactory.setUsername ("Gast"); connectionFactory.setPassword ("Gast"); connectionFactory.setCacheMode (CacheMode.CONNECTION); Rückkehr connectionFactory; } – VelNaga

+0

Im Frühjahr-Boot-Application.properties haben die folgenden Eigenschaften spring.rabbitmq.cache.connection.mode = CHANNEL spring.rabbitmq.cache.channel.size konfiguriert = 50 spring.rabbitmq.cache.channel.checkout- timeout = 1000 concurrent.consumers = 3 max.concurrent.consumers = 10 Sieht es gut aus? – VelNaga

0

Sie können Verbindungspooling in demselben Fall verwenden, wenn Sie die Poolgröße beibehalten, um das Problem zu lösen.Wie in der obigen Antwort vorgeschlagen, verwenden sowohl Hersteller als auch Verbraucher die gleiche Verbindung, sodass das Pooling Ihnen helfen kann.

Verwandte Themen