2015-12-02 12 views
5

Kann ich verschiedene Grad der Parallelität für verschiedene Teile der Aufgabe in unserem Programm in Flink setzen? Wie interpretiert Flink den folgenden Beispielcode? Die zwei benutzerdefinierten Praktiker MyPartitioner1, MyPartitioner2, teilen die Eingabedaten zwei 4 und 2 Partitionen.Grad der Parallelität in Apache Flink

partitionedData1 = inputData1 
    .partitionCustom(new MyPartitioner1(), 1); 
env.setParallelism(4); 
DataSet<Tuple2<Integer, Integer>> output1 = partitionedData1 
    .mapPartition(new calculateFun()); 

partitionedData2 = inputData2 
    .partitionCustom(new MyPartitioner2(), 2); 
env.setParallelism(2); 
DataSet<Tuple2<Integer, Integer>> output2 = partitionedData2 
    .mapPartition(new calculateFun()); 

bekomme ich folgende Fehler für diesen Code:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) 
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:221) 
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:80) 
    at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) 
    at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:92) 
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) 
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) 
    at java.lang.Thread.run(Unknown Source) 

Antwort

5

ExecutionEnvironment.setParallelism() die Parallelität für das gesamte Programm setzt, das heißt, alle Betreiber des Programms.

Sie können die Parallelität für jeden einzelnen Operator festlegen, indem Sie die Methode setParallelism() für den Operator aufrufen.

Die ArrayIndexOutOfBoundsException wird ausgelöst, weil Ihr benutzerdefinierter Partitionierer wahrscheinlich aufgrund des unerwarteten Grads der Parallelität eine ungültige Partitionsnummer zurückgibt. Der benutzerdefinierte Partitionierer empfängt die tatsächliche Parallelität des Empfängers als einen Parameter in seiner partition(K key, int numPartitions) Methode.

Verwandte Themen