2016-04-14 13 views
1

bereitstellen I Flow bereitstellen kann die Installation von Apache Flink (mit einem Jobmanager und mehr TaskManagers) ohne Problem zu Standalone: ​​Kann nicht Fluss zu HA-Cluster von CLI Apache Flink mit Flink

bin/flink run -m example-app-1.stag.local:6123 -d -p 4 my-flow-fat-jar.jar <flow parameters> 

aber wenn ich das laufen gleichen Befehl und implementieren diesen Befehl Raise Fehler Standalone-HA-Cluster:

------------------------------------------------------------ 
The program finished with the following exception: 

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 milliseconds 
    at org.apache.flink.client.program.Client.runDetached(Client.java:406) 
    at org.apache.flink.client.program.Client.runDetached(Client.java:366) 
    at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75) 
    at org.apache.flink.client.program.Client.runDetached(Client.java:278) 
    at org.apache.flink.client.CliFrontend.executeProgramDetached(CliFrontend.java:844) 
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:330) 
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) 
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) 
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 milliseconds 
    at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:221) 
    at org.apache.flink.client.program.Client.runDetached(Client.java:403) 
    ... 7 more 
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds] 
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) 
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
    at scala.concurrent.Await$.result(package.scala:190) 
    at scala.concurrent.Await.result(package.scala) 
    at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:218) 
    ... 8 more 

Aktive Job-Manager die folgenden Fehler schreiben zu protokollieren:

2016-04-14 13:54:44,160 WARN akka.remote.ReliableDeliverySupervisor      - Association with remote system [akka.tcp://[email protected]:62784] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 
2016-04-14 13:54:46,299 WARN org.apache.flink.runtime.jobmanager.JobManager    - Discard message LeaderSessionMessage(null,TriggerSavepoint(5de582462f334caee4733c60c6d69fd7)) because the expected leader session ID Some(72630119-fd0a-40e7-8372-45c93781e99f) did not equal the received leader session ID None. 

Also, ich verstehe nicht, was einen solchen Fehler verursachen kann?

Lassen Sie mich wissen, falls erforderlich, zusätzliche Informationen.

P.S.

Die Bereitstellung über Flink Dashboard funktioniert gut für den eigenständigen HA-Cluster. Dieses Problem tritt auf, wenn ich nur über Flink CLI bereitstellen.

aktualisieren

ich klar Zookeeper, klare Verzeichnisse von Flink auf der Festplatte verwendet und erneut bereitstellen Cluster Flink Standalone-HA. Dann versuche ich den Fluss bin/flink run Befehl auszuführen. Wie Sie sehen, schreibt JobManager nur eine Zeile über das Problem (siehe flink - jobmanager-0-example-app-1.stag.local.log).

Alle JobManagers und TaskManagers verwenden die gleiche flink-conf.yaml:

jobmanager.heap.mb: 1024 
jobmanager.web.port: 8081 

taskmanager.data.port: 6121 
taskmanager.heap.mb: 2048 
taskmanager.numberOfTaskSlots: 4 
taskmanager.memory.preallocate: false 
taskmanager.tmp.dirs: /flink/data/task_manager 

blob.server.port: 6130 
blob.storage.directory: /flink/data/blob_storage 

parallelism.default: 4 

state.backend: filesystem 
state.backend.fs.checkpointdir: s3a://example-flink/checkpoints 

restart-strategy: none 
restart-strategy.fixed-delay.attempts: 2 
restart-strategy.fixed-delay.delay: 60s 

recovery.mode: zookeeper 
recovery.zookeeper.quorum: zookeeper-1.stag.local:2181,zookeeper-2.stag.local:2181,zookeeper-3.stag.local:2181 
recovery.zookeeper.path.root: /example/flink 
recovery.zookeeper.storageDir: s3a://example-flink/recovery 
recovery.jobmanager.port: 6123 

fs.hdfs.hadoopconf: /flink/conf 

So, scheint wie Standalone HA-Cluster korrekt konfiguriert.

aktualisieren 2

FYI: Ich möchte Standalone-HA-Cluster, wie here installieren. Nicht YARN HA-Cluster.

aktualisieren 3

Hier log von bin/flink CLI erstellt: flink-username-client-hostname.local.log.

+0

Können Sie das komplette Jobmanagerprotokoll einfügen? –

+0

Haben Sie die richtigen HA-Einstellungen in der 'flink-conf.yaml' festgelegt, die vom CLI verwendet wird (also in' FLINK_HOME/conf/flink-conf.yaml')? –

+0

@TillRohrmann Ich habe Protokolle und 'flink-conf.yaml' hinzugefügt. Siehst du irgendwelche Probleme? –

Antwort

2

Beim Starten eines Flink-Clusters im HA-Modus werden die JobManager-Adresse und ihre Leader-ID in den angegebenen ZooKeeper-Cluster geschrieben. Um mit der JobManager zu kommunizieren, müssen Sie nicht nur die Adresse, sondern auch die Adresse des Anführers kennen. Daher müssen Sie die folgenden Parameter in Ihrer 'flink-conf.yaml' angeben, die von der CLI gelesen wird.

recovery.mode: zookeeper 
recovery.zookeeper.quorum: address of your cluster 
recovery.zookeeper.path.root: ZK path you've started your cluster with 

Mit dieser Information weiß der Kunde, wo er die ZooKeeper Cluster finden kann und wo sie zu finden, die JobManager Adresse und ihren Führer-ID.

+0

Sie möchten sagen, dass "bin/flink" CLI Einstellungen von 'flink-conf.yaml' verwenden. Recht? –

+1

Ja, der CLI liest den 'flink-conf.yaml'. –

+0

Ich habe 'flink-conf.yaml' von meinem Standalone-HA-Cluster in meine Entwicklungsumgebung kopiert und führe' bin/flink run' aus. Danach begann mein Flow im HA-Cluster. Ich denke, das ist eine schlechte Idee, einen Teil der Einstellungen von 'flink-conf.yaml' zu lesen, da dies absolut nicht klar für das Verständnis ist und' bin/flink' sollte Argumente haben, um eine Verbindung zum Standalone-HA-Cluster herzustellen. –

Verwandte Themen