2017-10-04 5 views
0

Ich versuche eine json-Nachricht mit kafka connect api in Kafka-Streams zu konsumieren. Ich habe versucht, in Google zu suchen, aber ich konnte keine wesentlichen Informationen darüber finden, wie JSON-Nachricht in Streams API lesen.JSON-Werte mit kafka konsumieren json api in Kafka-Streams verbinden: JAVA

Daher habe ich mit dem begrenzten Wissen die folgende Methode ausprobiert.

package com.kafka.api.serializers.json; 

import java.util.Properties; 

import org.apache.kafka.streams.KafkaStreams; 
import org.apache.kafka.streams.StreamsConfig; 
import org.apache.kafka.streams.kstream.ForeachAction; 
import org.apache.kafka.streams.kstream.KStream; 
import org.apache.kafka.streams.kstream.KStreamBuilder; 

import com.fasterxml.jackson.core.JsonProcessingException; 
import com.fasterxml.jackson.databind.JsonNode; 
import com.fasterxml.jackson.databind.ObjectMapper; 

public class ConsumerUtilities { 

    private static ObjectMapper om = new ObjectMapper(); 

    public static Properties getProperties() { 

     Properties configs = new Properties(); 
     configs.put(StreamsConfig.APPLICATION_ID_CONFIG, 
       "Kafka test application"); 
     configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
     configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
       "org.apache.kafka.common.serialization.ByteArraySerializer"); 
     configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
       "org.apache.kafka.connect.json.JsonDeserializer"); 
     return configs; 
    } 

    public static KStreamBuilder getStreamingConsumer() { 
     KStreamBuilder builder = new KStreamBuilder(); 
     return builder; 
    } 

    public static void printStreamData() { 
     KStreamBuilder builder = getStreamingConsumer(); 
     KStream<String, JsonNode> kStream = builder.stream("test"); 
     kStream.foreach(new ForeachAction<String, JsonNode>() { 
      @Override 
      public void apply(String key, JsonNode value) { 
       try { 
        System.out.println(key + " : " + om.treeToValue(value, Person.class)); 
       } catch (JsonProcessingException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } 
      } 

     }); 

     KafkaStreams kafkaStreams = new KafkaStreams(builder, getProperties()); 
     kafkaStreams.start(); 
    } 

} 

package com.kafka.api.serializers.json; 

import com.fasterxml.jackson.databind.JsonNode; 
import com.fasterxml.jackson.databind.ObjectMapper; 

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import java.util.Properties; 

public class ProducerUtilities { 

    private static ObjectMapper om = new ObjectMapper(); 


    public static org.apache.kafka.clients.producer.Producer<String, JsonNode> getProducer() { 
     Properties configProperties = new Properties(); 
     configProperties.put(ProducerConfig.CLIENT_ID_CONFIG, 
       "kafka json producer"); 
     configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
       "localhost:9092"); 
     configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
       "org.apache.kafka.common.serialization.ByteArraySerializer"); 
     configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
       "org.apache.kafka.connect.json.JsonSerializer"); 

     org.apache.kafka.clients.producer.Producer<String, JsonNode> producer = new KafkaProducer<String, JsonNode>(
       configProperties); 
     return producer; 
    } 

    public ProducerRecord<String,JsonNode> createRecord(Person person){ 
     JsonNode jsonNode = om.valueToTree(person); 
     ProducerRecord<String,JsonNode> record = new ProducerRecord<String,JsonNode>("test",jsonNode); 
     return record; 
    } 

} 

Wenn ich den Code ausführen i Ausnahme bin immer wie unten

[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener for group Kafka test application failed on partition assignment 
org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class org.apache.kafka.connect.json.JsonDeserializer 
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:770) 
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init>(AbstractProcessorContext.java:59) 
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init>(ProcessorContextImpl.java:40) 
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:138) 
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1078) 
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:255) 
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:245) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1147) 
    at org.apache.kafka.streams.processor.internals.StreamThread.access$800(StreamThread.java:68) 
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:184) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) 
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.connect.json.JsonDeserializer is not an instance of org.apache.kafka.common.serialization.Serde 
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248) 
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:764) 
    ... 19 more 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] Shutting down 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN. 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] Stream thread shutdown complete 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD. 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] WARN org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29] All stream threads have died. The Kafka Streams instance will be in an error state and should be closed. 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29] State transition from REBALANCING to ERROR. 
Exception in thread "Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] Failed to rebalance. 
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:543) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) 
Caused by: org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class org.apache.kafka.connect.json.JsonDeserializer 
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:770) 
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init>(AbstractProcessorContext.java:59) 
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init>(ProcessorContextImpl.java:40) 
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:138) 
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1078) 
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:255) 
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:245) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1147) 
    at org.apache.kafka.streams.processor.internals.StreamThread.access$800(StreamThread.java:68) 
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:184) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536) 
    ... 3 more 
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.connect.json.JsonDeserializer is not an instance of org.apache.kafka.common.serialization.Serde 
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248) 
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:764) 
    ... 19 more 

ich für einige Hinweise suchen, das Problem zu lösen.

Erstellt individuelle Serializer und Deserializer per Matthias Vorschlag

package com.kafka.api.utilities; 

import java.util.Properties; 

import org.apache.kafka.streams.KafkaStreams; 
import org.apache.kafka.streams.StreamsConfig; 
import org.apache.kafka.streams.kstream.ForeachAction; 
import org.apache.kafka.streams.kstream.KStream; 
import org.apache.kafka.streams.kstream.KStreamBuilder; 

import com.kafka.api.models.Person; 
import com.kafka.api.serdes.JsonDeserializer; 
import com.kafka.api.serdes.JsonSerializer; 
import org.apache.kafka.common.serialization.Serdes; 
import org.apache.kafka.common.serialization.Serde; 

public class ConsumerUtilities { 

    //private static ObjectMapper om = new ObjectMapper(); 

    public static Properties getProperties() { 

     Properties configs = new Properties(); 
     configs.put(StreamsConfig.APPLICATION_ID_CONFIG, 
       "Kafka test application"); 
     configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
//  configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
//    "org.apache.kafka.common.serialization.ByteArraySerializer"); 
//  configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
//    "org.apache.kafka.connect.json.JsonDeserializer"); 
     return configs; 
    } 

    public static KStreamBuilder getStreamingConsumer() { 
     KStreamBuilder builder = new KStreamBuilder(); 
     return builder; 
    } 

    public static void printStreamData() { 
     JsonSerializer<Person> personJsonSerializer = new JsonSerializer<>(); 
     JsonDeserializer<Person> personJsonDeserializer = new JsonDeserializer<>(Person.class); 
     Serde<Person> personSerde = Serdes.serdeFrom(personJsonSerializer, personJsonDeserializer); 

     KStreamBuilder builder = getStreamingConsumer(); 
     KStream<String, Person> kStream = builder.stream(Serdes.String(),personSerde , "test"); 
     kStream.foreach(new ForeachAction<String, Person>() { 
      @Override 
      public void apply(String key, Person value) { 
       System.out.println(key + " : " + value.toString()); 
      } 

     }); 

     KafkaStreams kafkaStreams = new KafkaStreams(builder, getProperties()); 
     kafkaStreams.start(); 
    } 

} 

package com.kafka.api.serdes; 

import java.util.Map; 

import org.apache.kafka.common.errors.SerializationException; 
import org.apache.kafka.common.serialization.Deserializer; 

import com.fasterxml.jackson.databind.ObjectMapper; 

public class JsonDeserializer<T> implements Deserializer<T>{ 

    private ObjectMapper om = new ObjectMapper(); 
    private Class<T> type; 

    /* 
    * Default constructor needed by kafka 
    */ 
    public JsonDeserializer(Class<T> type) { 
     this.type = type; 
    } 
    @Override 
    public void close() { 
     // TODO Auto-generated method stub 

    } 

    @SuppressWarnings("unchecked") 
    @Override 
    public void configure(Map<String, ?> map, boolean arg1) { 
     if(type == null){ 
      type = (Class<T>) map.get("type"); 
     } 

    } 

    @Override 
    public T deserialize(String undefined, byte[] bytes) { 
     if(bytes == null || bytes.length == 0){ 
      return null; 
     } 

     try{ 
      return om.readValue(bytes, type); 
     }catch(Exception e){ 
      throw new SerializationException(e); 
     } 
    } 

    protected Class<T> getType(){ 
     return type; 
    } 

} 

package com.kafka.api.serdes; 

import java.util.Map; 

import org.apache.kafka.common.errors.SerializationException; 
import org.apache.kafka.common.serialization.Serializer; 

import com.fasterxml.jackson.core.JsonProcessingException; 
import com.fasterxml.jackson.databind.ObjectMapper; 

public class JsonSerializer<T> implements Serializer<T> { 

    private ObjectMapper om = new ObjectMapper(); 

    @Override 
    public void close() { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void configure(Map<String, ?> config, boolean isKey) { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public byte[] serialize(String topic, T data) { 
     // TODO Auto-generated method stub 
     try { 
      return om.writeValueAsBytes(data); 
     } catch (JsonProcessingException e) { 
      throw new SerializationException(); 
     } 
    } 

} 

Ausnahme: Nachdem die Streaming-Anwendung ausführt ich die unten Ausnahme bin immer. Ich bin verwirrt.

[Kafka test application-cee84455-78ca-4a2f-881a-89b3c3a00e4b-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test application-cee84455-78ca-4a2f-881a-89b3c3a00e4b] State transition from RUNNING to ERROR. 
Exception in thread "Kafka test application-cee84455-78ca-4a2f-881a-89b3c3a00e4b-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=test, partition=0, offset=0 
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46) 
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84) 
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117) 
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:483) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:604) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:512) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) 
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null') 
at [Source: [[email protected]; line: 1, column: 11] 
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null') 
at [Source: [[email protected]; line: 1, column: 11] 
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702) 
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558) 
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524) 
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686) 
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878) 
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772) 
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834) 
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783) 
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929) 
    at com.kafka.api.serdes.JsonDeserializer.deserialize(JsonDeserializer.java:43) 
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65) 
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55) 
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:56) 
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44) 
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84) 
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117) 
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:483) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:604) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:512) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) 

Antwort

1

Streams API benötigt, Daten zu lesen und zu schreiben, und so es verwendet, um die Abstraktion einer Serde, die ein Wrapper für einen Serialisierer und Deserialisierer zugleich ist. Dies ist, was die Ausnahme im Grunde sagt.

Verursacht durch: org.apache.kafka.common.KafkaException: org.apache.kafka.connect.json.JsonDeserializer ist keine Instanz von org.apache.kafka.common.serialization.Serde

Daher müssen Sie JsonSerializer und JsonDeserialzier in eine JsonSerde einbinden und diese JsonSerde in StreamsConfig verwenden.

Der einfachste Weg, dies zu tun, ist Serdes.serdeFrom(...) Methode zu verwenden (Hinweis: Serdes - Plural). Alternativ können Sie auch die Schnittstelle Serde (Hinweis Serde - singular) direkt implementieren. Sie finden Beispiele in der Klasse Serdes zur Implementierung der Schnittstelle Serde.

+0

Nach Ihrem Vorschlag habe ich Serializer und Deserializer erstellt und verpackt. Aber ich bekomme die obige Ausnahme. Bitte helfen Sie. – dataEnthusiast

+0

Die Ausnahme kommt von Ihrem Deserializer. Ich denke, Sie müssen diesen Teil selbst debuggen - es ist kein Streams-Problem ... Sie können aus dem Stack-Trace sehen, dass Ihr Deserializer aufgerufen wird ... Scheint wie ein Schema-Problem? –