2017-11-12 4 views
0

Ich bin wirklich verzweifelt hier.Stream zu RDD zu DataFrame zu CSV

Was ich versuche zu tun ist, einen Strom, aggregierte Stream-Daten für einige Sekunden und nach dem Speichern als CSV-Datei zu erfassen.

val sparkSession : SparkSession = SparkSession.builder() 
    .master("local[*]") 
    .appName("Streaming") 
    .config(conf) 
    //.enableHiveSupport() 
    .getOrCreate() 

Also, ich bin der Erfassung der Strom

val lines = streamingContext.socketTextStream(HOST, PORT) 
val linesMessage = lines.map(_.split(DELIMITER)(1)) 

und zählen die Vorfälle

val counts = linesMessage.map(tag => (tag, 1)) 
     .reduceByKeyAndWindow({ (x, y) => x + y }, { (x, y) => x - y }, Seconds(EVENT_PERIOD_SECONDS*4), Seconds(EVENT_PERIOD_SECONDS)) 

die bisher funktioniert.

Nun würde Ich mag so jede windowLength in einer CSV-Datei speichern, und stucking dort:

val schema = new StructType() 
    .add(StructField("text", StringType, true)) 
    .add(StructField("count", IntegerType, true)) 

    counts.foreachRDD(rdd => 
    { 
    //rdd.saveAsTextFile(CHECKPOINT_DIR + "/output_" + sdf.format(System.currentTimeMillis())) 

    val df = sparkSession.createDataFrame(rdd.map(attributes => Row(attributes._1, attributes._2)), schema) 
    df.write.format("csv").save(CHECKPOINT_DIR + "/output_" + sdf.format(System.currentTimeMillis())) 
    }) 

Kann jemand mir helfen mit, dass bitte?

Sorry, für den Fehler:

Als ich rdd.saveAsTextFile nur führen Sie es verschiedene einfache Textdatei erzeugt, die zusammengeführt werden mussten.

Durch die createDataFrame schaffen, ist erhalten Sie diesen Fehler

17/11/12 23:06:04 ERROR JobScheduler: Error running job streaming job 1510490490000 ms.1 

java.lang.NullPointerException 
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:128) 
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:126) 
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65) 
    at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:578) 
    at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:335) 
    at main.scala.Main$$anonfun$main$scala$Main$$functionToCreateContext$1$1.apply(Main.scala:152) 
    at main.scala.Main$$anonfun$main$scala$Main$$functionToCreateContext$1$1.apply(Main.scala:146) 

Linie 146 ist die sparkSession.createDataFrame Linie

+0

Was ist das Problem, das Sie stoßen? –

+0

Warum brauchen Sie den Dataframe? Das wird das kleine Dateiproblem nicht beheben –

+0

Sie hatten Recht. Es ist eher das Problem des fehlenden Wissens;) – Stephan

Antwort

0

Mögliche Ursache und Abhilfe

Das Problem aus der sparkSession kommen könnte, die kann mit der streamingContext abgekoppelt werden.

Könnte sich lohnen, zu versuchen, die Funken Sitzung vom streamingContext zu erhalten, um sicherzustellen, beide die gleiche Konfiguration verwenden:

counts.foreachRDD(rdd => { 
    val spark = sparkSession.builder.config(streamingContext.sparkContext.getConf).getOrCreate() 
    val df = spark.createDataFrame(rdd.map(attributes => Row(attributes._1, attributes._2)), schema) 
    ... 
}) 

Partioning

Wie in den Kommentaren erwähnt, wenn Ihr Ziel ist es, Reduzieren Sie die Anzahl der von Spark erstellten Dateien. Sie können einfach repartition auf dem direkt für Ihre Funktion verwenden:

counts.foreachRDD(rdd => { 
    rdd.repartition(10) 
     .saveAsTextFile(CHECKPOINT_DIR + "/output_" + sdf 
     .format(System.currentTimeMillis())) 
}) 

repartition sollte jedoch sehr sorgfältig verwendet werden, da Sie eine gute Schätzung der Größe der resultierenden Partitionen benötigen, um zu vermeiden, dass sie zu klein oder zu groß werden.

0

I "gelöst" durch meinen Code wie folgt zu ändern:

linesFilter.window(Seconds(EVENT_PERIOD_SECONDS*WRITE_EVERY_N_SECONDS), Seconds(EVENT_PERIOD_SECONDS*WRITE_EVERY_N_SECONDS)).foreachRDD { (rdd, time) => 
    if (rdd.count() > 0) { 
     rdd 
     .coalesce(1,true) 
     .map(_.replace(DELIMITER_STREAM, DELIMITER_OUTPUT)) 
     //.map{_.mkString(DELIMITER_OUTPUT) } 
     .saveAsTextFile(CHECKPOINT_DIR + "/output/o_" + sdf.format(System.currentTimeMillis())) 
    } 
    } 
Verwandte Themen