ich mit der folgenden Bit-Code ein Problem habe, wenn eine Anwendung für Spark mit Java zu schreiben:Zünd-/Java serializable Ausgabe - org.apache.spark.SparkException: Aufgabe nicht serializable
public class BatchLayerDefaultJob implements Serializable {
private static Function <BatchLayerProcessor, Future> batchFunction = new Function<BatchLayerProcessor, Future>() {
@Override
public Future call(BatchLayerProcessor s) {
return executor.submit(s);
}
};
public void applicationRunner(BatchParameters batchParameters) {
SparkConf sparkConf = new SparkConf().setAppName("Platform Engine - Batch Job");
sparkConf.set("spark.driver.allowMultipleContexts", "true");
sparkConf.set("spark.cores.max", "1");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
List<BatchLayerProcessor> batchListforRDD = new ArrayList<BatchLayerProcessor>();
// populate List here.... Then attempt to process below
JavaRDD<BatchLayerProcessor> distData = sparkContext.parallelize(batchListforRDD, batchListforRDD.size());
JavaRDD<Future> result = distData.map(batchFunction);
result.collect(); // <-- Produces an object not serializable exception here
Also ich habe versuchte eine Reihe von Dingen ohne Erfolg einschließlich das Extrahieren der BatchFunction als eine separate Klasse außerhalb des Einflusses der Hauptklasse und ich habe auch versucht, mapPartitions statt map zu verwenden. Ich habe mehr oder weniger keine Ideen. Jede Hilfe wird geschätzt.
-Stack-Trace:
17/11/30 17:11:28 INFO DAGScheduler: Job 0 failed: collect at
BatchLayerDefaultJob.java:122, took 23.406561 s
Exception in thread "Thread-8" org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 0, not attempting to retry it. Exception during serialization:
java.io.NotSerializableException: xxxx.BatchLayerProcessor
Serialization stack:
- object not serializable (class: xxxx.BatchLayerProcessor, value: [email protected])
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: scala.collection.mutable.WrappedArray$ofRef, name: array, type: class [Ljava.lang.Object;)
- object (class scala.collection.mutable.WrappedArray$ofRef, WrappedArray([email protected]))
- writeObject data (class: org.apache.spark.rdd.ParallelCollectionPartition)
- object (class org.apache.spark.rdd.ParallelCollectionPartition, [email protected])
- field (class: org.apache.spark.scheduler.ResultTask, name: partition, type: interface org.apache.spark.Partition)
- object (class org.apache.spark.scheduler.ResultTask, ResultTask(0, 0))
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
Beifall.
EDIT :: Added BatchLayerProcessor wie gewünscht - Leicht gekürzt:
public class BatchLayerProcessor implements Runnable, Serializable {
private int interval, backMinutes;
private String scoreVal, batchjobid;
private static CountDownLatch countDownLatch;
public void run() {
/* Get a reference to the ApplicationContextReader, a singleton*/
ApplicationContextReader applicationContextReaderCopy = ApplicationContextReader.getInstance();
synchronized (BatchLayerProcessor.class) /* Protect singleton member variable from multithreaded access. */ {
if (applicationContextReader == null) /* If local reference is null...*/
applicationContextReader = applicationContextReaderCopy; /* ...set it to the singleton */
}
if (getxScoreVal().equals("")) {
applicationContextReader.getScoreService().calculateScores(applicationContextReader.getFunctions(), getInterval(), getBackMinutes(), getScoreVal(), true, getTimeInterval(), getIncludes(), getExcludes());
}
else {
applicationContextReader.getScoreService().calculateScores(applicationContextReader.getFunctions(), getInterval(), getBackMinutes(), getScoreVal(), true, getTimeInterval(), getIncludes(), getExcludes());
}
countDownLatch.countDown();
}
Bitte geben Sie den Quellcode von BatchLayerProcessor – Setop
Jetzt hinzugefügt - danke – MrAndy