2017-11-30 3 views
0

ich mit der folgenden Bit-Code ein Problem habe, wenn eine Anwendung für Spark mit Java zu schreiben:Zünd-/Java serializable Ausgabe - org.apache.spark.SparkException: Aufgabe nicht serializable

public class BatchLayerDefaultJob implements Serializable { 

private static Function <BatchLayerProcessor, Future> batchFunction = new Function<BatchLayerProcessor, Future>() { 
    @Override 
    public Future call(BatchLayerProcessor s) { 
     return executor.submit(s); 
    } 
}; 
public void applicationRunner(BatchParameters batchParameters) { 


SparkConf sparkConf = new SparkConf().setAppName("Platform Engine - Batch Job"); 
sparkConf.set("spark.driver.allowMultipleContexts", "true"); 
sparkConf.set("spark.cores.max", "1"); 
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); 
List<BatchLayerProcessor> batchListforRDD = new ArrayList<BatchLayerProcessor>(); 

// populate List here.... Then attempt to process below 

JavaRDD<BatchLayerProcessor> distData = sparkContext.parallelize(batchListforRDD, batchListforRDD.size()); 
JavaRDD<Future> result = distData.map(batchFunction); 
result.collect(); // <-- Produces an object not serializable exception here 

Also ich habe versuchte eine Reihe von Dingen ohne Erfolg einschließlich das Extrahieren der BatchFunction als eine separate Klasse außerhalb des Einflusses der Hauptklasse und ich habe auch versucht, mapPartitions statt map zu verwenden. Ich habe mehr oder weniger keine Ideen. Jede Hilfe wird geschätzt.

-Stack-Trace:

17/11/30 17:11:28 INFO DAGScheduler: Job 0 failed: collect at 
BatchLayerDefaultJob.java:122, took 23.406561 s 
Exception in thread "Thread-8" org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 0, not attempting to retry it. Exception during serialization: 
java.io.NotSerializableException: xxxx.BatchLayerProcessor 
Serialization stack: 
- object not serializable (class: xxxx.BatchLayerProcessor, value: [email protected]) 
- element of array (index: 0) 
- array (class [Ljava.lang.Object;, size 1) 
- field (class: scala.collection.mutable.WrappedArray$ofRef, name: array, type: class [Ljava.lang.Object;) 
- object (class scala.collection.mutable.WrappedArray$ofRef, WrappedArray([email protected])) 
- writeObject data (class: org.apache.spark.rdd.ParallelCollectionPartition) 
- object (class org.apache.spark.rdd.ParallelCollectionPartition, [email protected]) 
- field (class: org.apache.spark.scheduler.ResultTask, name: partition, type: interface org.apache.spark.Partition) 
- object (class org.apache.spark.scheduler.ResultTask, ResultTask(0, 0)) 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499) 

Beifall.

EDIT :: Added BatchLayerProcessor wie gewünscht - Leicht gekürzt:

public class BatchLayerProcessor implements Runnable, Serializable { 
private int interval, backMinutes; 
private String scoreVal, batchjobid; 
private static CountDownLatch countDownLatch; 
public void run() { 
    /* Get a reference to the ApplicationContextReader, a singleton*/ 
    ApplicationContextReader applicationContextReaderCopy = ApplicationContextReader.getInstance(); 

    synchronized (BatchLayerProcessor.class) /* Protect singleton member variable from multithreaded access. */ { 
     if (applicationContextReader == null) /* If local reference is null...*/ 
      applicationContextReader = applicationContextReaderCopy; /* ...set it to the singleton */ 
    } 

    if (getxScoreVal().equals("")) { 
     applicationContextReader.getScoreService().calculateScores(applicationContextReader.getFunctions(), getInterval(), getBackMinutes(), getScoreVal(), true, getTimeInterval(), getIncludes(), getExcludes()); 
    } 
    else { 
     applicationContextReader.getScoreService().calculateScores(applicationContextReader.getFunctions(), getInterval(), getBackMinutes(), getScoreVal(), true, getTimeInterval(), getIncludes(), getExcludes()); 
    } 

    countDownLatch.countDown(); 
} 
+0

Bitte geben Sie den Quellcode von BatchLayerProcessor – Setop

+0

Jetzt hinzugefügt - danke – MrAndy

Antwort

0

Beschlossen BatchLayerProcessor zu ändern, so dass es nicht runnable ist und sich stattdessen auf Funken verlassen, um die Arbeit für mich zu tun gibt.

Verwandte Themen