2

My Strom produziert Aufzeichnungen des Typs Tuple2<String,String>Flink Streaming: Unexpected charaters in serialisierte String Nachrichten

.toString() Ausgang (usr12345,{"_key":"usr12345","_temperature":46.6})

wo der Schlüssel usr12345 und Wert ist {"_key":"usr12345","_temperature":46.6}

Die .print() auf den Strom Ausgängen der Wert richtig:

(usr12345,{"_key":"usr12345","_temperature":46.6})

Aber wenn ich schreibe, um den Strom zu Kafka der Schlüssel usr12345 wird (mit einem weißen Raum am Anfang) und den Wert ({"_key":"usr12345","_temperature":46.6}

Beachten Sie den Raum zu Beginn des Schlüssels und die linken Klammer am Anfang der Wert.

Sehr seltsam. Warum könnte das passieren? Hier

ist die Serialisierung Code:

TypeInformation<String> resultType = TypeInformation.of(String.class); 

KeyedSerializationSchema<Tuple2<String, String>> schema = 
     new TypeInformationKeyValueSerializationSchema<>(resultType, resultType, env.getConfig()); 

FlinkKafkaProducer010.FlinkKafkaProducer010Configuration flinkKafkaProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
     stream, 
     "topic",  
     schema, 
     kafkaProducerProperties); 
+0

Was Sie beschrieben haben, ist ein bisschen komisch, haben Sie versucht, eine Kafka-Senke zu erstellen und stream.addsink (kafkaSink) zu tun? Kann das das Problem lösen? –

+0

@BiplobBiswas Nun, ich befolgte die in Flink Kafka Dokumentation beschriebenen Anweisungen. https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-producer Demnach ist dies der richtige Weg, Java, Kafka 0.10+, zu verwenden Ich benutze. – Beckham

Antwort

4

Die TypeInformationKeyValueSerializationSchema serialisiert Daten mit Flink des benutzerdefinierten Serialisierer was bedeutet, dass das Ergebnis als Binärdaten interpretiert werden muss. Der String-Serializer von Flink schreibt die Länge der Zeichenfolge gefolgt von der Codierung aller Zeichen.

Ich würde davon ausgehen, dass Sie das Kafka-Thema mit einem einfachen String Deserializer deserialisieren. Für den Schlüssel wird die serialisierte Länge als Whitespace-Zeichen interpretiert. Für den Wert wird die Länge als '(' interpretiert.

Versuchen Sie, einen anderen Serializer zu verwenden, der Schlüssel und Wert als einfache Zeichenfolgen serialisiert oder einen kompatiblen Deserializer verwendet.

Verwandte Themen