1

Ich versuche, den folgenden Code von IntelliJ Idee, um Nachrichten von Kafka auf Konsole zu drucken. Aber es führt den folgenden Fehler -Spark Structured Streaming -

Exception in thread "main" org.apache.spark.sql.AnalysisException: Abfragen mit Streaming-Quellen müssen mit writeStream.start ausgeführt werden() ;; kafka

Stacktrace begann von Dataset.checkpoint und Weg nach oben. Wenn ich .checkPoint() entferne, erhalte ich einen anderen Fehler im Zusammenhang mit der Berechtigung 17/08/02 12:10:52 FEHLER StreamMetadata: Fehler beim Schreiben von Stream-Metadaten StreamMetadata (4e612f22-efff-4c9a-a47a-a36eb533e9d6) an C:/Benutzer/rp/AppData/Lokal/Temp/temporär-2f570b97-ad16-4f00-8356-d43ccb7660db/metadata java.io.IOException: (null) Eintrag in Befehlszeichenfolge: null chmod 0644 C: \ Users \ rp \ AppData \ Local \ Temp \ temporary-2f570b97-ad16-4f00-8356-d43ccb7660db \ Metadaten

def main(args : Array[String]) = { 
val spark = SparkSession.builder().appName("SparkStreaming").master("local[*]").getOrCreate() 
    val canonicalSchema = new StructType() 
          .add("cid",StringType) 
          .add("uid",StringType) 
          .add("sourceSystem", 
           new StructType().add("id",StringType) 
               .add("name",StringType)) 
          .add("name", new StructType() 
             .add("firstname",StringType) 
             .add("lastname",StringType)) 


    val messages = spark 
        .readStream 
        .format("kafka") 
        .option("kafka.bootstrap.servers","localhost:9092") 
        .option("subscribe","c_canonical") 
        .option("startingOffset","earliest") 
        .load() 
        .checkpoint() 
.select(from_json(col("value").cast("string"),canonicalSchema)) 
.writeStream.outputMode("append").format("console").start.awaitTermination 

} 

kann mir jemand bitte helfen, zu verstehen, wo ich falsch mache?

+0

Versuchen Sie, IntelliJ als Administrator auszuführen. –

+0

Vielen Dank für die Antwort, aber das hat nicht funktioniert. –

Antwort

1
  1. Strukturiertes Streaming unterstützt Dataset.checkpoint() nicht. Es gibt ein offenes Ticket, um eine bessere Nachricht zu liefern oder es einfach zu ignorieren: https://issues.apache.org/jira/browse/SPARK-20927

  2. IOException ist wahrscheinlich, weil Sie Cygwin unter Windows nicht installieren.

+0

Während das strukturierte Streaming 'checkpoint()' nicht unterstützt, unterstützt es 'option (" checkpointLocation ","/path/to/store ")'. Was würdest du sagen, das heißt? –

+0

Die Antwort wurde aktualisiert. Leider benutzen sie das gleiche Wort, aber sie sind völlig verschiedene Dinge. – zsxwing

+0

Gibt es irgendwo die eigentliche Bedeutung von 'checkpointLocation' und wie unterscheidet es sich? –

Verwandte Themen