1

Ich versuche, einen Kafka-Consumer zu machen, der ein bestimmtes Thema anhört und die konsumierte Nachricht als JSON verarbeitet. Ich habe versucht, den Ansatz zu folgen, der auf Frühlingsdokumenten here gegeben wird, aber bin nicht fähig, die Mitteilungen als JSON zu erhalten.Spring-Kafka-Deserialisierung

Dies ist mein Code für Empfänger Konfiguration:

@Configuration 
@EnableKafka 
public class ReceiverConfig { 

@Value("${kafka.bootstrap.servers}") 
private String bootstrapServers; 

@Bean 
public Map consumerConfigs() { 
    Map props = new HashMap<>(); 
    // list of host:port pairs used for establishing the initial connections 
    // to the Kakfa cluster 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); 
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    // consumer groups allow a pool of processes to divide the work of 
    // consuming and processing records 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "Waitlist"); 

    return props; 
} 

@Bean 
public ConsumerFactory consumerFactory() { 
    return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
} 

@Bean 
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(consumerFactory()); 

    return factory; 
} 

@Bean 
public Receiver receiver() { 
    return new Receiver(); 
} 

@Bean 
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = 
     new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.setMessageConverter(new StringJsonMessageConverter()); 
    return factory; 
} 
} 

Verbraucher:

public class Receiver { 

private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class); 

private CountDownLatch latch = new CountDownLatch(1); 

@KafkaListener(topics = "Reservation", 
     containerFactory = "kafkaJsonListenerContainerFactory") 
public void receiveMessage(Message<?> message) { 
    LOGGER.info("received message='{}'", message); 
    latch.countDown(); 
} 

public CountDownLatch getLatch() { 
    return latch; 
} 

} 

Wenn ich versuche, ein Thema auf dem Remote-Server zu veröffentlichen ich die folgende Fehlermeldung erhalten:

 2017-02-09 13:42:49.122 [1;31mERROR[0;39m [36mo.s.k.listener.LoggingErrorHandler[0;39m Error while processing: ConsumerRecord(topic = Reservation, partition = 0, offset = 3394, CreateTime = 1486626082480, checksum = 1777660938, serialized key size = -1, serialized value size = 2, key = null, value = hi) 
     org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hi': was expecting ('true', 'false' or 'null') 
     at [Source: hi; line: 1, column: 5] 
      at org.springframework.kafka.support.converter.StringJsonMessageConverter.extractAndConvertValue(StringJsonMessageConverter.java:81) 
      at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:82) 
      at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:157) 
      at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:68) 
      at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) 
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:764) 
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:708) 
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2500(KafkaMessageListenerContainer.java:230) 
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:975) 
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
      at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
      at java.lang.Thread.run(Thread.java:745) 
     Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hi': was expecting ('true', 'false' or 'null') 
     at [Source: hi; line: 1, column: 5] 
      at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702) 
      at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558) 
      at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2835) 
      at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1903) 
      at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:749) 
      at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834) 
      at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783) 
      at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2880) 
      at org.springframework.kafka.support.converter.StringJsonMessageConverter.extractAndConvertValue(StringJsonMessageConverter.java:78) 
      ... 11 common frames omitted 

Wenn ich jedoch den containerfactory vom Listener entferne, kann ich die Nachrichten empfangen, aber sie sind nicht in einem J SON-Format, sondern als String:

2017-02-09 15:04:58.408 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='{' 
2017-02-09 15:04:58.408 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "_eventType":"Reservation",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "_timestamp":"2017-01-23T09:19:35Z",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "_operation":"create",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "type":"excursion",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "reservationId":"46d353ac_9575_492a_9291_98d15bf4cc82",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "eventReservationLinkId":"9b0bafb4_406e_43ae_94f2_36a913ce23d2",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "master":true,' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "partySize":2,' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "startTime":"2017-01-27T08:30:00Z",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "endTime":"2017-01-27T10:00:00Z",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "timeslotId":"c2304a34_b9ba_4f3c_8e45_3e3c7677d6c2",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "variantSku":"ocean_polar_1606_FLL-640B",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "guestId":"378741",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "createdBy":"149673",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "purchaser":"143679",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "eventId":"ocean_polar_1606_FLL-640",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "scheduledEventId":"02c95434_3a99_452e_a2a8_51712683926c",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "resourceId":"",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "surpriseFlag":false,' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "venueId":"FLL001",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "status":"CONFIRMED",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "primaryId":"378741",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "partyId":"9b0bafb4_406e_43ae_94f2_36a913ce23d2"' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='}' 

Antwort

0

Ihre messsages sind einzelne Schnipsel eines JSON-Dokument

received message='{' 
received message=' "_eventType":"Reservation",' 
received message=' "_timestamp":"2017-01-23T09:19:35Z",' 
... 

Um von JSON zu konvertieren, muss es in einer Mitteilung eingekapselt werden.

+0

Das bedeutet, es ist die Verantwortung bei 'Producer' Ende. Danke @Gary! – Kuber