2

Ich habe einen Streaming-Datensatz, aus kafka lesen und zu versuchen, CSVWie definiert man das Schema des Streaming-Datasets dynamisch, um in CSV zu schreiben?

case class Event(map: Map[String,String]) 
def decodeEvent(arrByte: Array[Byte]): Event = ...//some implementation 
val eventDataset: Dataset[Event] = spark 
    .readStream 
    .format("kafka") 
    .load() 
    .select("value") 
    .as[Array[Byte]] 
    .map(decodeEvent) 

Event hält Map[String,String] innen zu schreiben und zu CSV schreiben Ich werde einige Schemata benötigen.

Sagen wir alle Felder des Typs sind String und so habe ich versucht, das Beispiel von spark repo

val columns = List("year","month","date","topic","field1","field2") 
val schema = new StructType() //Prepare schema programmatically 
columns.foreach { field => schema.add(field, "string") } 
val rowRdd = eventDataset.rdd.map { event => Row.fromSeq(
    columns.map(c => event.getOrElse(c, "") 
)} 
val df = spark.sqlContext.createDataFrame(rowRdd, schema) 

Dies gibt Fehler zur Laufzeit auf der Leitung „eventDataset.rdd“:

Verursacht durch: org.apache.spark.sql.AnalysisException: Abfragen mit Streamingquellen müssen mit writeStream.start() ;;

Below funktioniert nicht, weil ‚.map‘ eine Liste [Zeichenfolge] TUPLE nicht

eventDataset.map(event => columns.map(c => event.getOrElse(c,"")) 
.toDF(columns:_*) 

Gibt es eine Möglichkeit, dies mit programmatischem Schema und strukturierter Streaming-Datensätze zu erreichen?

Antwort

1

ich viel einfacheren Ansatz verwenden würde:

import org.apache.spark.sql.functions._ 

eventDataset.select(columns.map(
    c => coalesce($"map".getItem(c), lit("")).alias(c) 
): _*).writeStream.format("csv").start(path) 

aber wenn Sie etwas näher an der aktuellen Lösung RDD Umwandlung überspringen

import org.apache.spark.sql.catalyst.encoders.RowEncoder 

eventDataset.rdd.map(event => 
    Row.fromSeq(columns.map(c => event.getOrElse(c,""))) 
)(RowEncoder(schema)).writeStream.format("csv").start(path) 
Verwandte Themen