Wir haben einen Datenstrom durch ein Kafka-Thema. Ich habe das mit Spark Streaming gelesen.Spark-Streaming - Kafka- createStream - RDD zu Datenrahmen
val ssc = new StreamingContext(l_sparkcontext, Seconds(30))
val kafkaStream = KafkaUtils.createStream(ssc, "xxxx.xx.xx.com:2181", "new-spark-streaming-group", Map("event_log" -> 10))
Das funktioniert gut. Was ich möchte, ist dies eine Parkett-Datei zu schreiben, indem Sie den Stream-Daten Spalten zuweisen. So kann ich die folgende
kafkaStream.foreachRDD(rdd => {
if (rdd.count() == 0) {
println("No new SKU's received in this time interval " + Calendar.getInstance().getTime())
}
else {
println("No of SKUs received " + rdd.count())
rdd.map(record => {
record._2
}).toDF("customer_id","sku","type","event","control_group","event_date").write.mode(SaveMode.Append).format("parquet").save(outputPath)
jedoch dies auf einen Fehler gibt
java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.
Old column names (1): _1
New column names (6): customer_id, sku, type, event, control_group, event_date
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.sql.DataFrame.toDF(DataFrame.scala:224)
at org.apache.spark.sql.DataFrameHolder.toDF(DataFrameHolder.scala:36)
at kafka_receive_messages$$anonfun$main$1.apply(kafka_receive_messages.scala:77)
at kafka_receive_messages$$anonfun$main$1.apply(kafka_receive_messages.scala:69)
Was der Fehler ist, dass ich bitte mache. Sollten wir uns in der Karte aufteilen? Wenn wir das tun, bekommen wir es nicht in toDF konvertieren (".. Spalten ..")
Vielen Dank für Ihre Hilfe.
Grüße
Bala