2016-12-19 2 views
0

Ich muss eine verschlüsselte Nachricht von einem Kafka-Thema lesen. Mein aktueller Code, die Strings aus dem Thema liest sieht wie folgt aus:Binäre Daten von Kafka lesen Thema in Spark Job

JavaInputDStream<ConsumerRecord<String, String>> istream1 = KafkaUtils.createDirectStream(
        jssc, 
        LocationStrategies.PreferConsistent(), 
        ConsumerStrategies.<String, String>Subscribe(Arrays.asList(topics), kafkaParams) 
       ); 

was ich ändern soll ich mit diesem Code tue Byte-Arrays aus der kafka Warteschlange zu lesen, um sicherzustellen, werden die verschlüsselten Daten nicht in dem Prozess beschädigt werden der Typumwandlung. Während ich den obigen Code aus dem Spark-Programmführer nahm, im nicht der Lage, diese API in der KafkaUtils zu finden: http://spark.apache.org/docs/2.0.1/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html

Antwort

0

Sie ein gutes Beispiel für Kafka Verbindung here sehen können.

Aber was Sie wollen, ist eine ByteArray von Ihrem Kafka Topic zu nehmen.

Diese Verbindung benötigen Sie einen JavaPairInputDStream wie diese zu erstellen:

import org.apache.kafka.common.serialization.ByteArrayDeserializer 
JavaPairInputDStream<String, Array[Byte]> messages = KafkaUtils.createDirectStream(
    jssc, 
    String.class, 
    String.class, 
    StringDecoder.class, 
    ByteArrayDeserializer.class, 
    kafkaParams, 
    topicsSet 
); 
+0

Vielen Dank für die Antwort. Ich werde das ausprobieren. Aber können Sie mir sagen, warum wir eine ganz andere API dafür brauchen und warum wir einen JavaPairInputStream erstellen müssen. Nur so verstehe ich, was ich mache. –

+0

Ich bin nicht in der Lage, den Code mit dem obigen Snippet zu kompilieren. Eclipse erkennt die Version von createDirectStream nicht, die Sie aufgelistet haben. Es sieht nur die Version, die ich benutzt habe. Ich verwende Kafka010 mit Spark 2.0.1. Ich habe das Spark-Kafka-Streaming-Jar aus dem Maven-Repository heruntergeladen. –

+0

Ich denke, vielleicht könnte das anders sein. Aber Sie müssen die Byte-Deserialisierung setzen –

Verwandte Themen