Ich versuche, den folgenden Code von IntelliJ Idee, um Nachrichten von Kafka auf Konsole zu drucken. Aber es führt den folgenden Fehler -Spark Structured Streaming -
Exception in thread "main" org.apache.spark.sql.AnalysisException: Abfragen mit Streaming-Quellen müssen mit writeStream.start ausgeführt werden() ;; kafka
Stacktrace begann von Dataset.checkpoint und Weg nach oben. Wenn ich .checkPoint() entferne, erhalte ich einen anderen Fehler im Zusammenhang mit der Berechtigung 17/08/02 12:10:52 FEHLER StreamMetadata: Fehler beim Schreiben von Stream-Metadaten StreamMetadata (4e612f22-efff-4c9a-a47a-a36eb533e9d6) an C:/Benutzer/rp/AppData/Lokal/Temp/temporär-2f570b97-ad16-4f00-8356-d43ccb7660db/metadata java.io.IOException: (null) Eintrag in Befehlszeichenfolge: null chmod 0644 C: \ Users \ rp \ AppData \ Local \ Temp \ temporary-2f570b97-ad16-4f00-8356-d43ccb7660db \ Metadaten
def main(args : Array[String]) = {
val spark = SparkSession.builder().appName("SparkStreaming").master("local[*]").getOrCreate()
val canonicalSchema = new StructType()
.add("cid",StringType)
.add("uid",StringType)
.add("sourceSystem",
new StructType().add("id",StringType)
.add("name",StringType))
.add("name", new StructType()
.add("firstname",StringType)
.add("lastname",StringType))
val messages = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("subscribe","c_canonical")
.option("startingOffset","earliest")
.load()
.checkpoint()
.select(from_json(col("value").cast("string"),canonicalSchema))
.writeStream.outputMode("append").format("console").start.awaitTermination
}
kann mir jemand bitte helfen, zu verstehen, wo ich falsch mache?
Versuchen Sie, IntelliJ als Administrator auszuführen. –
Vielen Dank für die Antwort, aber das hat nicht funktioniert. –