Ich bin wirklich verzweifelt hier.Stream zu RDD zu DataFrame zu CSV
Was ich versuche zu tun ist, einen Strom, aggregierte Stream-Daten für einige Sekunden und nach dem Speichern als CSV-Datei zu erfassen.
val sparkSession : SparkSession = SparkSession.builder()
.master("local[*]")
.appName("Streaming")
.config(conf)
//.enableHiveSupport()
.getOrCreate()
Also, ich bin der Erfassung der Strom
val lines = streamingContext.socketTextStream(HOST, PORT)
val linesMessage = lines.map(_.split(DELIMITER)(1))
und zählen die Vorfälle
val counts = linesMessage.map(tag => (tag, 1))
.reduceByKeyAndWindow({ (x, y) => x + y }, { (x, y) => x - y }, Seconds(EVENT_PERIOD_SECONDS*4), Seconds(EVENT_PERIOD_SECONDS))
die bisher funktioniert.
Nun würde Ich mag so jede windowLength in einer CSV-Datei speichern, und stucking dort:
val schema = new StructType()
.add(StructField("text", StringType, true))
.add(StructField("count", IntegerType, true))
counts.foreachRDD(rdd =>
{
//rdd.saveAsTextFile(CHECKPOINT_DIR + "/output_" + sdf.format(System.currentTimeMillis()))
val df = sparkSession.createDataFrame(rdd.map(attributes => Row(attributes._1, attributes._2)), schema)
df.write.format("csv").save(CHECKPOINT_DIR + "/output_" + sdf.format(System.currentTimeMillis()))
})
Kann jemand mir helfen mit, dass bitte?
Sorry, für den Fehler:
Als ich rdd.saveAsTextFile nur führen Sie es verschiedene einfache Textdatei erzeugt, die zusammengeführt werden mussten.
Durch die createDataFrame schaffen, ist erhalten Sie diesen Fehler
17/11/12 23:06:04 ERROR JobScheduler: Error running job streaming job 1510490490000 ms.1
java.lang.NullPointerException
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:128)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:126)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:578)
at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:335)
at main.scala.Main$$anonfun$main$scala$Main$$functionToCreateContext$1$1.apply(Main.scala:152)
at main.scala.Main$$anonfun$main$scala$Main$$functionToCreateContext$1$1.apply(Main.scala:146)
Linie 146 ist die sparkSession.createDataFrame Linie
Was ist das Problem, das Sie stoßen? –
Warum brauchen Sie den Dataframe? Das wird das kleine Dateiproblem nicht beheben –
Sie hatten Recht. Es ist eher das Problem des fehlenden Wissens;) – Stephan