0

Wie speichere ich Kafka-Spark-Streaming-Nachrichten: Datenrahmen in einzelne DateiWie Kafka-Spark-Streaming-Nachrichten: Datenrahmen in einzelne Datei

ich eine Anwendung speichern entwickelt haben, die die Nachrichten mit Kafka- Spark-Streaming-Prozess verbraucht wird.

Sobald die Daten empfangen werden, wird es in Datenrahmen umgewandelt.

Dann Streaming-Datenrahmen wird als Textdatei gespeichert, hier ist der Datenrahmen in jeder Datei für jede Kafka-Stream-Nachricht gespeichert, unten ist der Code, den ich verwendet habe, um Datenrahmen in Textdatei zu speichern, dies speichert Daten um die Textdatei für jede Nachricht zu multiplizieren.

DF.coalesce(1).write.format("com.databricks.spark.csv").mode("append") 
           .save("path") 

Hier ist die Anforderung Ich mag würde Datum Rahmen erreichen, ist das Streaming müssen als eine einzige Datei für jede kafka Nachricht gespeichert werden, wenn möglich, bitte mich mit der Lösung zu helfen.

Vielen Dank im Voraus

Antwort

0

Unten Code könnte Ihnen helfen. Erzeugen Sie einfach die Liste der RDD und verbinden Sie sie dann.

var dStreamRDDList = new ListBuffer[RDD[String]] 
dStream.foreachRDD(rdd => 
    { 
     dStreamRDDList += rdd 
    }) 
val joinRDD = ssc.sparkContext.union(dStreamRDDList) 
//then convert joinRDD to DataFrame (DF) 
DF.coalesce(1).write.format("com.databricks.spark.csv").mode("append") 
          .save("path") 
Verwandte Themen