0

Ich verwende Spring-Boot 1.4.2 und rabbitMQ 1.6.5.RELEASE (nicht mit Spring-Boot-Starter-Kaninchen). Ich habe mehrere Micro-Services in meinem Projekt und die meisten Micro-Services enthalten rabbitMQ-Konsumenten. Einer unter dem Projekt wird eine Nachricht produzieren. Nur wenige Verbraucher hören auf, die Nachricht aus der Warteschlange abzuholen, obwohl die Nachricht in der Warteschlange verfügbar ist. Wenn ich diesen bestimmten Verbraucher neu starte, beginnt er damit, diese Nachricht zu konsumieren. Wenn ich einen der Consumer-Micro-Dienste erneut vertreibe, hören wieder wenige Verbraucher auf anderen Komponenten auf, die Nachricht aus der Warteschlange zu verbrauchen, aber in der Warteschlange kann ich die Nachricht sehen.Spring boot rabbitMQ Verbraucher stoppt abholen Nachrichten wegen Json Konvertierung Fehler

Nach den Protokollen der Suche in i

com.sample.global.bookStateUpdate.consumer.BookFailConsumer.execute(com.vodafone.smartlife.provisioning.common.model.Message) 
throws com.sample.global.bookStateUpdate.consumer.BookFailConsumer.exception.ConsumerException' threw exception 
org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:138) 
org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:105) 
org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:778) 
org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:701) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:99) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:191) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1213) 
org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:682) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1191) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1175) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1200(SimpleMessageListenerContainer.java:99) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1338)\n\tat java.lang.Thread.run(Thread.java:745)\n 
Caused by: java.lang.NullPointerException: null 
com.sample.global.bookStateUpdate.consumer.BookFailConsumer.execute(BookFailConsumer.java:54) 
sun.reflect.GeneratedMethodAccessor149.invoke(Unknown Source)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
java.lang.reflect.Method.invoke(Method.java:498)org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197) 
    org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) 
org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:49) 
org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:125)... 12 common frames omitted\n 

So die unten stehende Ausgabe findet es scheint Jackson nicht meine Botschaft in json umwandeln könnte. Also in meinem Verbraucher anstatt das eigentliche Objekt zu konsumieren habe ich verbraucht org.springframework.amqp.core.Message dann habe ich das manuell in mein benutzerdefiniertes Objekt umgewandelt, das funktioniert.

Darf ich wissen, warum Spring-Hase die Nachricht nicht in Json konvertieren konnte?

finden Sie die folgenden Konfigurations & Verbraucher Dateiänderung

package com.sample.global.rabbit.configuration; 

import org.springframework.amqp.rabbit.annotation.EnableRabbit; 
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; 
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; 
import org.springframework.amqp.rabbit.core.RabbitTemplate; 
import org.springframework.amqp.rabbit.test.RabbitListenerTest; 
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; 
import org.springframework.amqp.support.converter.MessageConverter; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.context.annotation.Primary; 

@Configuration 
@EnableRabbit 
@RabbitListenerTest(capture = true, spy = true) 
public class RabbitMqConfiguration { 

@Bean 
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { 
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
    factory.setConnectionFactory(connectionFactory()); 
    factory.setConcurrentConsumers(15); 
    factory.setMaxConcurrentConsumers(15); 
    factory.setMessageConverter(jsonMessageConverter()); 

    return factory; 
} 

@Bean 
public CachingConnectionFactory connectionFactory() 
{ 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("http://localhost:15672"); 
    connectionFactory.setUsername("guest"); 
    connectionFactory.setPassword("guest"); 
    connectionFactory.setRequestedHeartBeat(10); 
    return connectionFactory; 
} 

@Bean 
public MessageConverter jsonMessageConverter() 
{ 
    return new Jackson2JsonMessageConverter(); 
} 

@Bean(name = "MainTemplate") 
@Primary 
public RabbitTemplate rabbitTemplate() 
{ 
    RabbitTemplate template = new RabbitTemplate(connectionFactory()); 
    template.setMessageConverter(jsonMessageConverter()); 
    return template; 
} 

} 

Consumer

package com.sample.global.rabbit.consumer; 

import org.springframework.amqp.core.Message; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.amqp.rabbit.annotation.Exchange; 
import org.springframework.amqp.rabbit.annotation.Queue; 
import org.springframework.amqp.rabbit.annotation.QueueBinding; 
import org.springframework.amqp.rabbit.annotation.RabbitListener; 
import org.springframework.amqp.rabbit.core.RabbitTemplate; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.beans.factory.annotation.Qualifier; 
import org.springframework.stereotype.Component; 

@Component 
public class BookFailConsumer { 

private static final Logger logger = LoggerFactory.getLogger(BookFailConsumer.class); 

private static final ObjectMapper objectMapper = new ObjectMapper(); 

@Autowired 
@Qualifier(value = "MainTemplate") 
private RabbitTemplate rabbitTemplate; 

@RabbitListener(
     id = "book", 
     bindings = @QueueBinding(
       value = @Queue(value = "sample.queue", durable = "true"), 
       exchange = @Exchange(value = "sample.exchange", durable = "true", delayed = "true"), 
       key = "sample.queue" 
     ) 
) 
public void handle(Message messageObject) { 
    com.sample.global.rabbit.consumer.model.Message message = null; 
    try { 
     message = convertMessageBodyToTransaction(messageObject.getBody()); 
     //After manual JSON conversion it works fine. 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 

public Message convertMessageBodyToTransaction(byte[] messageBody) throws BadRequestException { 
    com.sample.global.rabbit.consumer.model.Message message = null; 
    String body = null; 
    try { 
     body = new String(messageBody, "UTF-8"); 
     logger.debug("Message body converted successfully to string: {}", body); 
    } catch (UnsupportedEncodingException e) { 
     throw new BadRequestException(e); 
    } 
    try { 
     message = objectMapper.readValue(body, Message.class); 
     logger.debug("Message body mapped successfully to Message object: {}", message.toString()); 
    } catch (Exception e){ 
     logger.error("Message conversion failed for the following message body: {}", body); 
     throw new BadRequestException(e); 
    } 
    return message; 
} 

}

Gibt es eine Möglichkeit manuelle Konvertierung zu vermeiden? Jeder Hinweis wäre nützlich, um dieses Problem zu beheben

+0

Sie müssen die komplette Stack-Trace anzeigen - Sie sollten auch die seltsame Formatierung beheben. –

+0

@GaryRussell Ich habe die vollständige Stack-Trace aktualisiert und ein bisschen formatiert.Ihre Hilfe sollte spürbar sein – VelNaga

+0

@GaryRussell Soll ich noch mehr Code posten? – VelNaga

Antwort

0

Das Protokoll zeigt der Grund ist ein Nullpointer in BookFailConsumer.execute

Also diese Verbraucher erhielten die Nachricht und nicht die BookConsumer, wie Sie scheinen zu erwarten. Sie müssen überprüfen, warum der BookFailConsumer aufgerufen wurde und was dort falsch läuft. (Sie haben den Code von BookFailConsumer nicht gepostet)

+0

Vielen Dank für Ihren Kommentar. Eigentlich ist es ein Tippfehler beim Posten des Codes. Ich habe nur einen Verbraucher, der "BookFailConsumer" ist. Ich habe keinen Verbraucher namens "BookConsumer" auch ich habe erwähnt, ich kann "org.springframework.amqp.core.Message" in Verbraucher zu bekommen. – VelNaga

Verwandte Themen