2016-04-15 3 views
1

Ich habe einen eigenständigen HA Flink Cluster, der jede Minute Checkpoints für meinen Flow erstellt, aber ich sehe sie nicht im Verzeichnis state.backend.fs.checkpointdir.Warum der Standalone HA Flink-Cluster keine Checkpoints in das Verzeichnis `state.backend.fs.checkpointdir` speichert?

flink-conf.yaml

jobmanager.heap.mb: 1024 
jobmanager.web.port: 8081 

taskmanager.data.port: 6121 
taskmanager.heap.mb: 2048 
taskmanager.numberOfTaskSlots: 4 
taskmanager.memory.preallocate: false 
taskmanager.tmp.dirs: /flink/data/task_manager 

blob.server.port: 6130 
blob.storage.directory: /flink/data/blob_storage 

parallelism.default: 4 

state.backend: filesystem 
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints 

restart-strategy: none 
restart-strategy.fixed-delay.attempts: 2 
restart-strategy.fixed-delay.delay: 60s 

recovery.mode: zookeeper 
recovery.zookeeper.quorum: zookeeper-1.stag.local:2181,zookeeper-2.stag.local:2181,zookeeper-3.stag.local:2181 
recovery.zookeeper.path.root: /example_staging/flink 
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery 
recovery.jobmanager.port: 6123 

fs.hdfs.hadoopconf: /flink/conf 

Wie Sie sehen können die Checkpoints zu s3a://example-staging-flink/checkpoints Verzeichnis gespeichert werden soll, aber ich sehe sie nicht:

~ s3cmd ls s3://example-staging-flink/ 
         DIR s3://example-staging-flink/recovery/ 
~ s3cmd ls s3://example-staging-flink/recovery/ 
         DIR s3://example-staging-flink/recovery/blob/ 
2016-04-15 10:33 1137280 s3://example-staging-flink/recovery/completedCheckpoint6eab84c79b02 
2016-04-15 01:23 506961 s3://example-staging-flink/recovery/completedCheckpoint9e8f3d1254aa 
2016-04-15 09:39 149987 s3://example-staging-flink/recovery/submittedJobGraph0bf82ada1dc6 
~ s3cmd ls s3://example-staging-flink/recovery/blob/ 
         DIR s3://example-staging-flink/recovery/blob/cache/ 
~ s3cmd ls s3://example-staging-flink/recovery/blob/cache/ 
2016-04-14 13:00 3023995 s3://example-staging-flink/recovery/blob/cache/blob_0b6e57360c05128b3c91d75341785df64b91217b 
2016-04-15 09:39 3066784 s3://example-staging-flink/recovery/blob/cache/blob_3ef7422ce7b5e5cbf1f031b0de1561159109d7f9 
2016-04-14 12:54 3023898 s3://example-staging-flink/recovery/blob/cache/blob_5062028a8cab14daaeb19e51f01a02da3a8e515a 
2016-04-14 12:29 3025864 s3://example-staging-flink/recovery/blob/cache/blob_7809e559953291cab482e9cf3324457ad07d6d05 

Jobmanager Protokoll hat folgende Protokolle:

2016-04-21 12:34:55,684 INFO org.apache.flink.runtime.checkpoint.SavepointStoreFactory  - Using job manager savepoint state backend. 
2016-04-25 01:13:14,569 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Initialized in '/checkpoints/a5f89242c729190e46baf409768960fb'. 
2016-04-25 01:13:14,581 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator - Create CheckpointCoordinatorDeActivator 
2016-04-25 01:13:14,583 INFO org.apache.flink.runtime.checkpoint.SavepointCoordinatorDeActivator - Create SavepointCoordinatorDeActivator 
2016-04-25 01:13:14,583 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Recovering checkpoints from ZooKeeper. 
2016-04-25 01:13:14,594 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 1 checkpoints in ZooKeeper. 
2016-04-25 01:13:14,875 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Initialized with Checkpoint 1015 @ 1461546663803 for a5f89242c729190e46baf409768960fb. Removing all older checkpoints. 
2016-04-25 01:18:15,247 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering checkpoint 1016 @ 1461547095238 
2016-04-25 01:18:18,955 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed checkpoint 1016 (in 153 ms) 
2016-04-25 01:23:15,242 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering checkpoint 1017 @ 1461547395238 
2016-04-25 01:23:17,357 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed checkpoint 1017 (in 138 ms) 
2016-04-25 01:28:15,244 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering checkpoint 1018 @ 1461547695239 
2016-04-25 01:28:18,300 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed checkpoint 1018 (in 101 ms) 

So kann anyb ody erkläre mir, warum der Standalone HA-Cluster von Apache Flink keine Checkpoints speichert?

Antwort

1

Ich habe die folgende Protokollmeldung im Task-Manager-Protokoll gefunden:

2016-05-06 10:08:30,591 INFO org.apache.flink.streaming.runtime.tasks.StreamTask   - Using user-defined state backend: MemoryStateBackend (data in heap memory/checkpoints to JobManager) 

es here erstellt wurde.

Also, ich nur vergessen, die folgende Zeile aus meinem Code zu entfernen:

env.setStateBackend(new MemoryStateBackend()); 

Wenn ich es entfernen und wieder einsetzen mein Flow der Flink starten Schreib Checkpoints Systemverzeichnis in state.backend.fs.checkpointdir Parameter angegeben Datei.

2

Flink speichert den tatsächlichen Zustand nicht in Dateien, wenn er kleiner als ein vorgegebener Schwellenwert ist. Der Standardschwellwert (einstellbar über state.backend.fs.memory-threshold) beträgt 1024 Bytes. Unterhalb dieses Schwellenwerts wird der Status zusammen mit den Prüfpunktmetadaten gespeichert.

Die Idee hinter dieser Schwelle ist, dass das Schreiben von kleinen Zustand auf Festplatte bei Verwendung von verteilten Dateisystemen relativ teuer ist. Die Metadaten müssen trotzdem geschrieben werden und speichern nur ein wenig mehr Daten.

Einstellung state.backend.fs.memory-threshold: 0 sollte immer den Status in Ihrem Prüfpunktverzeichnis unabhängig von seiner Größe schreiben.

+0

Ich sehe 'State Size' gleich etwa 8MB in Checkpoint History Liste von Flink Dashboard und ich verwende den Standardwert von' state.backend.fs.memory-threshold'. Es ist wichtig, dass dieser Zustand im Verzeichnis 'state.backend.fs.checkpointdir' gespeichert wird. Recht? –

+0

Ja, sollte es. Hast du die Config auf alle Knoten kopiert? Es wird nicht automatisch im Standalone-Modus übertragen. Alle Knoten müssen dieselbe Konfiguration haben. – mxm

+0

Ja, ich provisioniere Konfigurationen verwenden Ansible. Ich glaube, ich habe einen Grund für mein Problem gefunden. Sehen Sie sich den aktualisierten Job Manager in meiner Frage an. Es hat die Zeile 'Job-Manager-Savepoint-Status-Backend verwenden 'geschrieben [hier] (https://github.com/apache/flink/blob/b8299bf92d8e3dbe140dd89602699394019b783d/flink-runtime/src/main/java/org/apache/flink/runtime /checkpoint/SavepointStoreFactory.java#L62). Wie Sie 'savepointBackend' ==' jobmanager' sehen können, da Flink die Savepoint-Backend-Einstellungen vom 'savepoints.state.backend'-Schlüssel liest, aber ich verwende' state.backend' aus der offiziellen Dokumentation. Ich überprüfe es morgen. –

Verwandte Themen