Ich bin neu zu Funken Streaming. Ich versuche strukturierte Funken Streaming mit lokalen CSV-Dateien. Ich bekomme die unten angegebene Ausnahme während der Verarbeitung.Warum schlägt meine Abfrage mit AnalysisException fehl?
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[file:///home/Teju/Desktop/SparkInputFiles/*.csv]
Dies ist mein Code.
val df = spark
.readStream
.format("csv")
.option("header", "false") // Use first line of all files as header
.option("delimiter", ":") // Specifying the delimiter of the input file
.schema(inputdata_schema) // Specifying the schema for the input file
.load("file:///home/Teju/Desktop/SparkInputFiles/*.csv")
val filterop = spark.sql("select tagShortID,Timestamp,ListenerShortID,rootOrgID,subOrgID,first(rssi_weightage(RSSI)) as RSSI_Weight from my_table where RSSI > -127 group by tagShortID,Timestamp,ListenerShortID,rootOrgID,subOrgID order by Timestamp ASC")
val outStream = filterop.writeStream.outputMode("complete").format("console").start()
Ich habe Cron-Job so alle 5 Minuten ich einen Eingang bekommen csv file.I Uhr versucht, durch Funken streaming.Any Hilfe geschätzt wird zu analysieren.
Wie ist die Beziehung zwischen 'df' und den anderen Datensätzen' filterop' und 'outStream'? Sie verwenden 'df' nicht im Code eingefügt. Ist das absichtlich? Ich würde sagen, dass der Code nicht so ausgeführt werden kann wie er ist. Da fehlt etwas Wichtiges. –