2016-06-10 13 views
0

Mein Code verwendet readTextFile, um Protokolldateien zu lesen, und wenn ich den jar in Flink (/opt/flink-1.0.3/bin/flink run -m yarn-cluster -yn 2 /home/flink/flink-json-0.1.jar) ausführe, verarbeitet es erfolgreich die Zeilen darin und stoppt meine Anwendung, anstatt auf neue Zeilen zu warten. Brauche ich einen param dafür?Warum flink stoppt meine Stream-Anwendung?

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val stream = env.readTextFile("hdfs:///test/ignicion.io") 

Vielen Dank im Voraus

Antwort

2

Sie sind für

StreamExecutionEnvironment.readFileStream(String filePath, long intervalMillis, WatchType watchType) 

Für die WatchType suchen, haben Sie folgende Optionen

  • ONLY_NEW_FILES,
  • REPROCESS_WITH_APPENDED,
  • PROCESS_ONLY_APPENDED;

Der Strom von

StreamExecutionEnvironment.readTextFile(String filePath, String charsetName) 

wird nach dem Lesen aller Dateien abgeschlossen sein. Ich denke, es ist hauptsächlich für lokale Tests während der Entwicklung.

+0

Gleiches Ergebnis: 'env.readFileStream (" hdfs: ///test/ignicion.io ", 100, FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)' – jag

+0

Meinst du, dass es auch sofort stoppt? – snntrable

+0

Entschuldigung, es funktioniert wie du geschrieben hast ... – jag

Verwandte Themen