2017-07-28 2 views
0

Ich bin neu zu Funken Streaming. Ich versuche strukturierte Funken Streaming mit lokalen CSV-Dateien. Ich bekomme die unten angegebene Ausnahme während der Verarbeitung.Warum schlägt meine Abfrage mit AnalysisException fehl?

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; 
FileSource[file:///home/Teju/Desktop/SparkInputFiles/*.csv] 

Dies ist mein Code.

val df = spark 
    .readStream 
    .format("csv") 
    .option("header", "false") // Use first line of all files as header 
    .option("delimiter", ":") // Specifying the delimiter of the input file 
    .schema(inputdata_schema) // Specifying the schema for the input file 
    .load("file:///home/Teju/Desktop/SparkInputFiles/*.csv") 

val filterop = spark.sql("select tagShortID,Timestamp,ListenerShortID,rootOrgID,subOrgID,first(rssi_weightage(RSSI)) as RSSI_Weight from my_table where RSSI > -127 group by tagShortID,Timestamp,ListenerShortID,rootOrgID,subOrgID order by Timestamp ASC") 
val outStream = filterop.writeStream.outputMode("complete").format("console").start() 

Ich habe Cron-Job so alle 5 Minuten ich einen Eingang bekommen csv file.I Uhr versucht, durch Funken streaming.Any Hilfe geschätzt wird zu analysieren.

+1

Wie ist die Beziehung zwischen 'df' und den anderen Datensätzen' filterop' und 'outStream'? Sie verwenden 'df' nicht im Code eingefügt. Ist das absichtlich? Ich würde sagen, dass der Code nicht so ausgeführt werden kann wie er ist. Da fehlt etwas Wichtiges. –

Antwort

-1

Fügen Sie .writeStream.start zu Ihrem df hinzu, wie die Ausnahme Ihnen sagt.

Lesen Sie die docs für weitere Details.

+0

Wo sollte '.writeStream.start' im Code eingefügt werden? Ich sehe nicht, wie es das Problem lösen könnte. Pflege um zu erarbeiten? –

+0

@JacekLaskowski Das Datenframe '' 'df''' wird nie explizit' '' writeStream.start''ed. Ich vermute, dass die Abfrage "vollständig" sein muss, ohne dass implizite/Out-of-Band-Abfragen die Schleife schließen. Ich vermute, dass das gestreamte Laden des CSV nie ausgelöst wird, da es keine Verbindung zwischen Eingabe und Ausgabe gibt. Ich vermute, dass das Ersetzen von '' 'spark.sql''' mit' '' df.select'' 'dieses Problem behebt. –

0

(Dies ist keine Lösung, aber mehr ein Kommentar, aber angesichts seiner Länge endete es hier. Ich werde es eine Antwort machen, nachdem ich genügend Informationen für die Untersuchung gesammelt habe).


Meine Vermutung ist, dass Sie etwas falsch auf df tun, die Sie nicht in Ihrer Frage aufgenommen haben.

Da die Fehlermeldung über FileSource mit dem Pfad wie folgt ist und es ist ein Streaming-Dataset, das df sein muss, die im Spiel ist.

filesource [file: ///home/Teju/Desktop/SparkInputFiles/*.csv]

die anderen Linien I erraten , dass Sie das Streaming-Daten-Set als eine temporäre Tabelle registrieren (dh my_table), die Sie dann in spark.sql verwenden, um SQL und writeStream auf der Konsole auszuführen.

df.createOrReplaceTempView("my_table") 

Wenn das stimmt, der Code, den Sie in der Frage enthalten haben, ist unvollständig und tut nicht den Grund für den Fehler zeigen.

Verwandte Themen