Ich versuche vorbereiten Anwendung für Spark-Streaming lesen (Spark-2.1, Kafka 0,10)Spark-Streaming - json von Kafka und schreiben json zu anderen Kafka Thema
Ich muss die Daten von Kafka Thema "Input" lesen, finden Daten korrigieren und Ergebnis zum Thema "Ausgabe" schreiben
Ich kann Daten von Kafka-Basis auf KafkaUtils.createDirectStream-Methode lesen.
umgewandelt ich die RDD Filter JSon und bereiten:
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val elementDstream = messages.map(v => v.value).foreachRDD { rdd =>
val PeopleDf=spark.read.schema(schema1).json(rdd)
import spark.implicits._
PeopleDf.show()
val PeopleDfFilter = PeopleDf.filter(($"value1".rlike("1"))||($"value2" === 2))
PeopleDfFilter.show()
}
ich Daten von Kafka laden und schreiben zu Kafka verwenden KafkaProducer "as is":
messages.foreachRDD(rdd => {
rdd.foreachPartition(partition => {
val kafkaTopic = "output"
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
partition.foreach{ record: ConsumerRecord[String, String] => {
System.out.print("########################" + record.value())
val messageResult = new ProducerRecord[String, String](kafkaTopic, record.value())
producer.send(messageResult)
}}
producer.close()
})
})
Aber ich kann nicht integrieren Diese beiden Aktionen> finden Sie in Json richtigen Wert und schreiben Ergebnisse an Kafka: Schreiben Sie PeopleDfFilter im JSON-Format, um Kafka-Thema "auszugeben".
Ich habe viele Eingangsnachrichten in Kafka, das ist der Grund, warum ich foreachPartition für die Erstellung von Kafka Producer verwenden möchte.
Vielen Dank für jede Beratung.
Sie müssen analysieren, eigentlich 'record.value()' als JSON –