2017-11-23 1 views
2

Ich versuche vorbereiten Anwendung für Spark-Streaming lesen (Spark-2.1, Kafka 0,10)Spark-Streaming - json von Kafka und schreiben json zu anderen Kafka Thema

Ich muss die Daten von Kafka Thema "Input" lesen, finden Daten korrigieren und Ergebnis zum Thema "Ausgabe" schreiben

Ich kann Daten von Kafka-Basis auf KafkaUtils.createDirectStream-Methode lesen.

umgewandelt ich die RDD Filter JSon und bereiten:

val messages = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    Subscribe[String, String](topics, kafkaParams) 
) 

val elementDstream = messages.map(v => v.value).foreachRDD { rdd => 

    val PeopleDf=spark.read.schema(schema1).json(rdd) 
    import spark.implicits._ 
    PeopleDf.show() 
    val PeopleDfFilter = PeopleDf.filter(($"value1".rlike("1"))||($"value2" === 2)) 
    PeopleDfFilter.show() 
} 

ich Daten von Kafka laden und schreiben zu Kafka verwenden KafkaProducer "as is":

messages.foreachRDD(rdd => { 
     rdd.foreachPartition(partition => { 
     val kafkaTopic = "output" 
     val props = new HashMap[String, Object]() 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
      "org.apache.kafka.common.serialization.StringSerializer") 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
      "org.apache.kafka.common.serialization.StringSerializer") 

     val producer = new KafkaProducer[String, String](props) 
     partition.foreach{ record: ConsumerRecord[String, String] => { 
     System.out.print("########################" + record.value()) 
     val messageResult = new ProducerRecord[String, String](kafkaTopic, record.value()) 
     producer.send(messageResult) 
     }} 
     producer.close() 
     }) 

    }) 

Aber ich kann nicht integrieren Diese beiden Aktionen> finden Sie in Json richtigen Wert und schreiben Ergebnisse an Kafka: Schreiben Sie PeopleDfFilter im JSON-Format, um Kafka-Thema "auszugeben".

Ich habe viele Eingangsnachrichten in Kafka, das ist der Grund, warum ich foreachPartition für die Erstellung von Kafka Producer verwenden möchte.

Vielen Dank für jede Beratung.

+0

Sie müssen analysieren, eigentlich 'record.value()' als JSON –

Antwort

2

Der Prozess ist sehr einfach, warum also nicht strukturierte Streaming verwenden?

import org.apache.spark.sql.functions.from_json 

spark 
    // Read the data 
    .readStream 
    .format("kafka") 
    .option("kafka.bootstrap.servers", inservers) 
    .option("subscribe", intopic) 
    .load() 
    // Transform/filter 
    .select(from_json($"value".cast("string"), schema).alias("value")) 
    .filter(...) // Add the condition 
    .select(to_json($"value").alias("value") 
    // Write back 
    .writeStream 
    .format("kafka") 
    .option("kafka.bootstrap.servers", outservers) 
    .option("subscribe", outtopic) 
    .start() 
+0

Wenn ich Verwendung strukturiertes Streaming auf Spark-2.1 ​​mit Writestream zu Kafka versuche, erhalte ich: 'java.lang.UnsupportedOperationException: Datenquelle Kafka unterstützt nicht das Streaming-Schreiben. wie ich weiß, funktioniert es von Spark 2.2.x – Tomtom

Verwandte Themen