Ich versuche, einen einfachen Stream Verarbeitung Spark-Job, die eine Liste von Nachrichten (JSON-formatiert), die jeweils zu einem Benutzer gehören, zählen die Nachrichten jedes Benutzers zählen und die Top Ten drucken Benutzer.NotSerializableException beim Sortieren in Spark
Wenn ich jedoch den Vergleicher> zum Sortieren der reduzierten Zählerwerte defi- niere, schlägt das Ganze fehl, wenn java.io.NotSerializableException ausgelöst wird.
Meine Maven Abhängigkeit für Spark:
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.9.3</artifactId>
<version>0.8.0-incubating</version>
Der Java-Code ich benutze:
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext("local", "spark");
JavaRDD<String> lines = sc.textFile("stream.sample.txt").cache();
JavaPairRDD<String, Long> words = lines
.map(new Function<String, JsonElement>() {
// parse line into JSON
@Override
public JsonElement call(String t) throws Exception {
return (new JsonParser()).parse(t);
}
}).map(new Function<JsonElement, String>() {
// read User ID from JSON
@Override
public String call(JsonElement json) throws Exception {
return json.getAsJsonObject().get("userId").toString();
}
}).map(new PairFunction<String, String, Long>() {
// count each line
@Override
public Tuple2<String, Long> call(String arg0) throws Exception {
return new Tuple2(arg0, 1L);
}
}).reduceByKey(new Function2<Long, Long, Long>() {
// count messages for every user
@Override
public Long call(Long arg0, Long arg1) throws Exception {
return arg0 + arg1;
}
});
// sort result in a descending order and take 10 users with highest message count
// This causes the exception
List<Tuple2<String, Long>> sorted = words.takeOrdered(10, new Comparator<Tuple2<String, Long>>(){
@Override
public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
return -1 * o1._2().compareTo(o2._2());
}
});
// print result
for (Tuple2<String, Long> tuple : sorted) {
System.out.println(tuple._1() + ": " + tuple._2());
}
}
Der resultierende Stack-Trace:
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297)
at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: net.imagini.spark.test.App$5
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:670)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:668)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:668)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:376)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
ich durch die Spark-API ging Dokumentation aber konnte nichts finden, was mir die richtige Richtung zeigen würde. Mache ich etwas falsch oder ist das ein Fehler in Spark? Jede Hilfe würde gerne geschätzt werden.
UPDATE: Anscheinend alles kocht auf das Comparator-Objekt, das als zweites Argument an * takeOrdered() * übergeben wird. Da der Komparator-Schnittstelle, um nicht Serializable erweitern diese Arbeit, die Sie einen ‚serializable‘ Komparator erstellen müssen zu machen: 'öffentliche Schnittstelle SerializableComparator erweitert Vergleicher , Serializable {}' Anschließend verläuft ein Objekt, das implementiert diese Schnittstelle wie der Vergleicher die ursprüngliche Ausnahme verhindert. Zugegeben, das ist wahrscheinlich nicht die eleganteste Lösung für dieses Problem und ich würde definitiv einige Vorschläge begrüßen :) –