2017-12-05 3 views
0

Ich versuche, lange zu verwenden, wie Art der Nachrichten-Taste, aber ich bekommeNachrichten-Taste wie lange in Kafka Bäche

Exception in thread "kafka_stream_app-f236aaca-3f90-469d-9d32-20ff694806ff-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize key for record. topic=test, partition=0, offset=0 
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:38) 
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84) 
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117) 
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519) 
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8 

ich überprüft und die data.length ist 7.

In streamsConfiguration I

streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName()); 

und ich verwende

KStream<Long, GenericRecord> stream = builder.stream(topic); 

Ich habe versucht, die Nachricht über eine einfache Anwendung zu senden und auch mit kafka-avro-console-producer festgelegt haben:

/opt/confluent-3.3.0/bin/kafka-avro-console-producer \ 
--broker-list localhost:9092 \ 
--topic test \ 
--property key.separator=, \ 
--property parse.key=true \ 
--property key.schema='{"type":"long"}' \ 
--property value.schema='{"type":"string"}' \ 
--property schema.registry.url=http://localhost:8081 

mit Nachricht

123,"293" 

die Verwendung kafka-avro-console-consumer ich die Meldung verbrauchen kann und sehen (mit --property print.key=true, dass der Schlüssel gesendet korrekt 123 ist)

Jede Idee, was falsch sein könnte, wenn die Nachricht Decodierung?

Antwort

0

Da Sie kafka-avro-console-producer verwenden, wird der Schlüssel nicht als einfacher Long serialisiert, sondern als Avro-Typ. Daher müssen Sie eine entsprechende Avro Serde mit demselben Schema verwenden, das Sie für den Schreibpfad verwendet haben (z. B. '{"type":"long"}").

Auch Ihr Rückgabetyp wird nicht Long sondern ein Avro-Typ sein.