2017-04-19 3 views
0

Wir haben einen Datenstrom durch ein Kafka-Thema. Ich habe das mit Spark Streaming gelesen.Spark-Streaming - Kafka- createStream - RDD zu Datenrahmen

val ssc = new StreamingContext(l_sparkcontext, Seconds(30)) 
    val kafkaStream = KafkaUtils.createStream(ssc, "xxxx.xx.xx.com:2181", "new-spark-streaming-group", Map("event_log" -> 10)) 

Das funktioniert gut. Was ich möchte, ist dies eine Parkett-Datei zu schreiben, indem Sie den Stream-Daten Spalten zuweisen. So kann ich die folgende

kafkaStream.foreachRDD(rdd => { 
    if (rdd.count() == 0) { 
    println("No new SKU's received in this time interval " + Calendar.getInstance().getTime()) 
    } 
    else { 
    println("No of SKUs received " + rdd.count()) 
    rdd.map(record => { 
     record._2 
    }).toDF("customer_id","sku","type","event","control_group","event_date").write.mode(SaveMode.Append).format("parquet").save(outputPath) 

jedoch dies auf einen Fehler gibt

java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match. 
Old column names (1): _1 
New column names (6): customer_id, sku, type, event, control_group, event_date 
    at scala.Predef$.require(Predef.scala:233) 
    at org.apache.spark.sql.DataFrame.toDF(DataFrame.scala:224) 
    at org.apache.spark.sql.DataFrameHolder.toDF(DataFrameHolder.scala:36) 
    at kafka_receive_messages$$anonfun$main$1.apply(kafka_receive_messages.scala:77) 
    at kafka_receive_messages$$anonfun$main$1.apply(kafka_receive_messages.scala:69) 

Was der Fehler ist, dass ich bitte mache. Sollten wir uns in der Karte aufteilen? Wenn wir das tun, bekommen wir es nicht in toDF konvertieren (".. Spalten ..")

Vielen Dank für Ihre Hilfe.

Grüße

Bala

Antwort

0

Dank für das Kurz aufhalten bei. Ich habe das aussortiert. Es war ein Coding-Problem. Für diejenigen, die dies in Zukunft tun wollen, bitte auf den anderen Teil ändern, wie unten

kafkaStream.foreachRDD(rdd => { 
    if (rdd.count() == 0) { 
    println("No new SKU's received in this time interval " + Calendar.getInstance().getTime()) 
    } 
    else { 
    println("No of SKUs received " + rdd.count()) 
    rdd.map(record => (record._2).split(",")) 
    }.map(r => (r(0).replace(Quote,"").toInt,r(1).replace(Quote,"").toInt,r(2),r(3),r(4),r(5))).toDF("customer_id","sku","type","event","control_group","event_date").write.mode(SaveMode.Append).format("parquet").save(outputPath) 
    }) 

Thanks again

Bala

Verwandte Themen