2014-10-24 17 views
5

Ich betreibe einen Funken Streaming-24X7 und Funktion updateStateByKey, um die berechneten historischen Daten wie im Fall von NetworkWordCount Beispiel zu speichern ..Spark-Streaming UpdateStateByKey

Ich habe versucht, mich eine Datei mit 3lac Datensätze mit 1 sec Schlaf zu streamen für jeweils 1500 Datensätze. Ich bin mit 3 Arbeiter

  1. Über einen Zeitraum updateStateByKey wächst, dann wirft das Programm folgende Ausnahme

ERROR Executor: Ausnahme in Task-ID 1635 java.lang.ArrayIndexOutOfBoundsException: 3

14/10/23 21:20:43 ERROR TaskSetManager: Task 29170.0:2 failed 1 times; aborting job 
14/10/23 21:20:43 ERROR DiskBlockManager: Exception while deleting local spark dir: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141023204346-b232 
java.io.IOException: Failed to delete: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141023204346-b232/24 

14/10/23 21:20:43 ERROR Executor: Exception in task ID 8037 
java.io.FileNotFoundException: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141023204346-b232/22/shuffle_81_0_1 (No such file or directory) 
    at java.io.FileOutputStream.open(Native Method) 

Wie geht das? Ich denke, updateStateByKey sollte regelmäßig zurückgesetzt werden, da es in einer schnellen Rate wächst, bitte teilen Sie einige Beispiele, wann und wie Sie den updateStateByKey zurücksetzen .. oder gibt es ein anderes Problem? etwas Licht werfen.

Jede Hilfe wird sehr geschätzt. Danke für Ihre Zeit

Antwort

0

Haben Sie den CheckPoint ssc.checkpoint ("Pfad zum Prüfpunkt")

gesetzt