2013-10-17 16 views
5

Ich versuche, einen einfachen Stream Verarbeitung Spark-Job, die eine Liste von Nachrichten (JSON-formatiert), die jeweils zu einem Benutzer gehören, zählen die Nachrichten jedes Benutzers zählen und die Top Ten drucken Benutzer.NotSerializableException beim Sortieren in Spark

Wenn ich jedoch den Vergleicher> zum Sortieren der reduzierten Zählerwerte defi- niere, schlägt das Ganze fehl, wenn java.io.NotSerializableException ausgelöst wird.

Meine Maven Abhängigkeit für Spark:

<groupId>org.apache.spark</groupId> 
<artifactId>spark-core_2.9.3</artifactId> 
<version>0.8.0-incubating</version> 

Der Java-Code ich benutze:

public static void main(String[] args) { 

    JavaSparkContext sc = new JavaSparkContext("local", "spark"); 

    JavaRDD<String> lines = sc.textFile("stream.sample.txt").cache(); 

    JavaPairRDD<String, Long> words = lines 
     .map(new Function<String, JsonElement>() { 
      // parse line into JSON 
      @Override 
      public JsonElement call(String t) throws Exception { 
       return (new JsonParser()).parse(t); 
      } 

     }).map(new Function<JsonElement, String>() { 
      // read User ID from JSON 
      @Override 
      public String call(JsonElement json) throws Exception { 
       return json.getAsJsonObject().get("userId").toString(); 
      } 

     }).map(new PairFunction<String, String, Long>() { 
      // count each line 
      @Override 
      public Tuple2<String, Long> call(String arg0) throws Exception { 
       return new Tuple2(arg0, 1L); 
      } 

     }).reduceByKey(new Function2<Long, Long, Long>() { 
      // count messages for every user 
      @Override 
      public Long call(Long arg0, Long arg1) throws Exception { 
       return arg0 + arg1; 
      } 

     }); 

    // sort result in a descending order and take 10 users with highest message count 
    // This causes the exception 
    List<Tuple2<String, Long>> sorted = words.takeOrdered(10, new Comparator<Tuple2<String, Long>>(){ 

     @Override 
     public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) { 
      return -1 * o1._2().compareTo(o2._2()); 
     } 

    }); 

    // print result 
    for (Tuple2<String, Long> tuple : sorted) { 
     System.out.println(tuple._1() + ": " + tuple._2()); 
    } 

} 

Der resultierende Stack-Trace:

java.lang.reflect.InvocationTargetException 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:601) 
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297) 
    at java.lang.Thread.run(Thread.java:722) 
Caused by: org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: net.imagini.spark.test.App$5 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:670) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:668) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:668) 
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:376) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) 
    at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) 

ich durch die Spark-API ging Dokumentation aber konnte nichts finden, was mir die richtige Richtung zeigen würde. Mache ich etwas falsch oder ist das ein Fehler in Spark? Jede Hilfe würde gerne geschätzt werden.

+0

UPDATE: Anscheinend alles kocht auf das Comparator-Objekt, das als zweites Argument an * takeOrdered() * übergeben wird. Da der Komparator-Schnittstelle, um nicht Serializable erweitern diese Arbeit, die Sie einen ‚serializable‘ Komparator erstellen müssen zu machen: 'öffentliche Schnittstelle SerializableComparator erweitert Vergleicher , Serializable {}' Anschließend verläuft ein Objekt, das implementiert diese Schnittstelle wie der Vergleicher die ursprüngliche Ausnahme verhindert. Zugegeben, das ist wahrscheinlich nicht die eleganteste Lösung für dieses Problem und ich würde definitiv einige Vorschläge begrüßen :) –

Antwort

2

Als @ vanco.anton zu anspielte, Sie so etwas wie die folgenden mit Hilfe von Java 8 funktionalen Schnittstellen können:

public interface SerializableComparator<T> extends Comparator<T>, Serializable { 

    static <T> SerializableComparator<T> serialize(SerializableComparator<T> comparator) { 
    return comparator; 
    } 

} 

Und dann in Ihrem Code:

import static SerializableComparator.serialize; 
... 
rdd.top(10, serialize((a, b) -> -a._2.compareTo(b._2))); 
Verwandte Themen