2014-12-30 6 views
7

Ich benutze Funken mit Cassandra, und ich have a JavaRDD<String> von Clients. Und für jeden Client, möchte ich wie folgt aus cassandra seine Wechselwirkungen wählen:JavaSparkContext nicht serialisierbar

avaPairRDD<String, List<InteractionByMonthAndCustomer>> a = client.mapToPair(new PairFunction<String, String, List<InteractionByMonthAndCustomer>>() { 
     @Override 
     public Tuple2<String, List<InteractionByMonthAndCustomer>> call(String s) throws Exception {    
      List<InteractionByMonthAndCustomer> b = javaFunctions(sc) 
        .cassandraTable(CASSANDRA_SCHEMA, "interaction_by_month_customer") 
        .where("ctid =?", s) 
        .map(new Function<CassandraRow, InteractionByMonthAndCustomer>() { 
         @Override 
         public InteractionByMonthAndCustomer call(CassandraRow cassandraRow) throws Exception { 
          return new InteractionByMonthAndCustomer(cassandraRow.getString("channel"), 
            cassandraRow.getString("motif"), 
            cassandraRow.getDate("start"), 
            cassandraRow.getDate("end"), 
            cassandraRow.getString("ctid"), 
            cassandraRow.getString("month") 
          ); 
         } 
        }).collect(); 
      return new Tuple2<String, List<InteractionByMonthAndCustomer>>(s, b); 
     } 
    }); 

Dafür ich verwende ein JavaSparkContext sc. Aber ich habe diesen Fehler:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) 
at org.apache.spark.rdd.RDD.map(RDD.scala:270) 
at org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:99) 
at org.apache.spark.api.java.JavaRDD.mapToPair(JavaRDD.scala:32) 
at fr.aid.cim.spark.dao.GenrateCustumorJourney.AllCleintInteractions(GenrateCustumorJourney.java:91) 
at fr.aid.cim.spark.dao.GenrateCustumorJourney.main(GenrateCustumorJourney.java:75) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:483) 
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) 
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) 
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) 
... 14 more 

Ich denke, dass der JavaSparkContext serialisierbar sein muss. Aber wie kann ich es bitte serialisierbar machen?

Vielen Dank.

Antwort

12

Nein, JavaSparkContext ist nicht serialisierbar und soll es nicht sein. Es kann nicht in einer Funktion verwendet werden, die an Remote-Mitarbeiter gesendet wird. Hier referenzieren Sie nicht explizit, aber eine Referenz wird trotzdem serialisiert, weil Ihre anonyme innere Klassenfunktion nicht static ist und daher einen Verweis auf die umschließende Klasse hat.

Versuchen Sie, Ihren Code mit dieser Funktion als Standalone-Objekt static neu zu schreiben.

0

Sie können SparkContext nicht verwenden und andere RDDs in einem Executor erstellen (Kartenfunktion einer RDD).

Sie müssen die Cassandra RDD (sc.cassandraTable) im Treiber erstellen und dann eine Verbindung zwischen diesen beiden RDDs (Client RDD und Cassandra Tabelle RDD).

+0

Es stimmt, Code sollte in jedem Falle nicht (Spark-Transformation innerhalb Transformation verbietet etc ..) –

0

erklären es mit transient Stichwort:

private transient JavaSparkContext sparkContext; 
Verwandte Themen