2017-08-09 7 views
0

Ich verwende Apache Beam 2.0.0 und den FlinkRunner (scala 2.10) derselben Version. Ich teste gerade gegen einen In-Process-Flink-Master (Standardkonfiguration) mit der FlinkRunner-Abhängigkeit, die zur Laufzeit Flink 1.2.1 bringt (Blick auf den MVN-Dependency-Tree).Apache Beam/Flink ExceptionInChainedStubException

Was ist der beste Weg, herauszufinden, was eigentlich schief gelaufen ist, wenn es "Benutzerausnahmen" gibt? Das ist keine Frage darüber, was ich diesmal falsch gemacht habe; sondern wie man im Allgemeinen sagt, wie man mehr Informationen aus Beam oder Flink bekommt. Hier ist Stacktrace:

Exception in thread "main" java.lang.RuntimeException: Pipeline execution failed 
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:122) 
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295) 
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281) 
at com.mapfit.flow.data.environment.MFEnvironment.run(MFEnvironment.java:70) 
at com.mapfit.flow.main.Scratch.main(Scratch.java:35) 
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:910) 
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853) 
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853) 
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) 
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException 
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) 
at org.apache.beam.sdk.transforms.MapElements$1$auxiliary$PCieS8xh.invokeProcessElement(Unknown Source) 
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:197) 
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:158) 
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65) 
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:118) 
at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) 
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) 
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException 
at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:82) 
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) 
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$MultiDoFnOutputManager.output(FlinkDoFnFunction.java:165) 
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.outputWindowedValue(SimpleDoFnRunner.java:355) 
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:629) 
at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:122) 

Hinweis das völlige Fehlen von etwas Code im Zusammenhang schrieb ich (außer meinem Ruf pipeline.run()).

Ich habe den Quellcode für jedes meiner verbundenen Gläser heruntergeladen, und ich trat in die ChainedFlatMapDriver, die eine Ausnahme in Zeile 82 warf, und endete schließlich mit Blick auf eine EOFException durch einen Aufruf in Java-Objekt Serialisierung generiert (meine Werte verwenden Standard-Coder). Ich dachte, ich würde auf etwas stoßen, aber es scheint, dass die Ursache der EOFException in SimpleCollectingOutputView Zeile 79 liegt, die oft geworfen wird und oft als Routine-Ausführung für Flink geschluckt wird.

Irgendwelche Hinweise von jedem, der weiß, wie man Flink dazu bringt, Fehlerinformationen zu offenbaren?

gefunden Weitere Informationen nach Debugging:

Just found more info after walking through more Flink code in the debugger: java.lang.InterruptedException 
at java.lang.Object.wait(Native Method) 
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:168) 
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:138) 
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131) 
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) 
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) 
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) 
at org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction.flatMap(FlinkMultiOutputPruningFunction.java:46) 
at org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction.flatMap(FlinkMultiOutputPruningFunction.java:30) 
at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) 
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) 
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$MultiDoFnOutputManager.output(FlinkDoFnFunction.java:165) 
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.outputWindowedValue(SimpleDoFnRunner.java:355) 
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:629) 
at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:122) 
at org.apache.beam.sdk.transforms.MapElements$1$auxiliary$vuuNRtio.invokeProcessElement(Unknown Source) 
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:197) 
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:158) 
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65) 
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:118) 
at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) 
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) 
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665) 
at java.lang.Thread.run(Thread.java:745) 

Antwort

1

Werfen Sie einen Blick auf diese beiden Links: EOFException related to memory segments during run of Beam pipeline on Flink

https://issues.apache.org/jira/browse/BEAM-2831

habe ich ähnliche Ausnahmen zu sehen, während der Strahl auf Garn auf flinkrunner läuft. Vorgeschlagene Coder auf der Seite der Ausgabe geholfen.

Abgesehen davon empfehle ich, Logger ausgiebig zu verwenden, bis Ihre Pipeline reibungslos läuft. In Garn-Logs kann mit dem Befehl "yarn logs" abgerufen werden. Ich weiß nicht über Ihren Fall (in-Prozess Flink Master), aber Sie sollten in der Lage sein, einige Logs zu schreiben, nehme ich an ...

+0

Ich werde dies als die akzeptierte Antwort markieren, seit ich diesen Fall (einen halben Monat zuvor die von dir erwähnten Links existierten), wie üblich konnte niemand helfen; und dann um das Problem im Grunde auf die gleiche Weise wie im Link erwähnt gearbeitet, und dann dieses Ticket vernachlässigt :) – mephicide