My Strom produziert Aufzeichnungen des Typs Tuple2<String,String>
Flink Streaming: Unexpected charaters in serialisierte String Nachrichten
.toString()
Ausgang (usr12345,{"_key":"usr12345","_temperature":46.6})
wo der Schlüssel usr12345
und Wert ist {"_key":"usr12345","_temperature":46.6}
Die .print()
auf den Strom Ausgängen der Wert richtig:
(usr12345,{"_key":"usr12345","_temperature":46.6})
Aber wenn ich schreibe, um den Strom zu Kafka der Schlüssel usr12345
wird (mit einem weißen Raum am Anfang) und den Wert ({"_key":"usr12345","_temperature":46.6}
Beachten Sie den Raum zu Beginn des Schlüssels und die linken Klammer am Anfang der Wert.
Sehr seltsam. Warum könnte das passieren? Hier
ist die Serialisierung Code:
TypeInformation<String> resultType = TypeInformation.of(String.class);
KeyedSerializationSchema<Tuple2<String, String>> schema =
new TypeInformationKeyValueSerializationSchema<>(resultType, resultType, env.getConfig());
FlinkKafkaProducer010.FlinkKafkaProducer010Configuration flinkKafkaProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
stream,
"topic",
schema,
kafkaProducerProperties);
Was Sie beschrieben haben, ist ein bisschen komisch, haben Sie versucht, eine Kafka-Senke zu erstellen und stream.addsink (kafkaSink) zu tun? Kann das das Problem lösen? –
@BiplobBiswas Nun, ich befolgte die in Flink Kafka Dokumentation beschriebenen Anweisungen. https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-producer Demnach ist dies der richtige Weg, Java, Kafka 0.10+, zu verwenden Ich benutze. – Beckham