2017-06-16 4 views
0

Ich versuche, Join-Methode zwischen 2 RDD zu verwenden und es in Cassandra speichern, aber mein Code nicht funktionieren. Am Anfang bekomme ich eine große Main-Methode und alles funktioniert gut, aber wenn ich Funktion und Klasse verwende, funktioniert das nicht. Ich bin neu zu scala und FunkenScala/Spark serializable Fehler - Join nicht funktionieren

Code ist:

class Migration extends Serializable { 

    case class userId(offerFamily: String, bp: String, pdl: String) extends Serializable 
    case class siteExternalId(site_external_id: Option[String]) extends Serializable 
    case class profileData(begin_ts: Option[Long], Source: Option[String]) extends Serializable 

    def SparkMigrationProfile(sc: SparkContext) = { 

    val test = sc.cassandraTable[siteExternalId](KEYSPACE,TABLE) 
    .keyBy[userId] 
    .filter(x => x._2.site_external_id != None) 

    val profileRDD = sc.cassandraTable[profileData](KEYSPACE,TABLE) 
    .keyBy[userId] 

    //dont work 
    test.join(profileRDD) 
    .foreach(println) 

    // don't work 
    test.join(profileRDD) 
    .saveToCassandra(keyspace, table) 

    } 

Am beginig i die berühmten erhalten: Exception in thread "main" org.apache.spark.SparkException: Aufgabe nicht serializable an. . . also erweitere ich meine Hauptklasse und auch die Fallklasse aber trotzdem funktioniert das nicht.

Antwort

0

Ich denke, Sie sollten Ihre Fallklassen von Migration Klasse in dedizierte Datei und/oder Objekt verschieben. Dies sollte dein Problem lösen. Darüber hinaus sind Scala-Fallklassen standardmäßig serialisierbar.

+0

es funktioniert! Ich bin jetzt so dumm geworden. . . Kannst du mir erklären warum? – user3394825

+0

hi @ user3394825, es ist schwer zu sagen, weil ich Spark mit Cassandra nicht verwende. Basierend auf meiner Erfahrung hatte ich ähnliche Probleme bei der Verwendung von Fallklassen in anderen Klassen definiert. In Ihrer Situation könnte ein Problem mit der Erstellung von impliziten Parametern für die Funktion "cassandraTable" auftreten (https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra- connector/src/main/scala/com/datastax/spark/connector/SparkContextFunctions.scala) z 'rrf: RowReaderFactory [T], ev: ValidRDDType [T]', aber ich rate nur. Ich weiß, dass bei Verwendung von Spark SQL Encoder gibt es auch eine ähnliche Ausnahme. –

+0

Die Fallklassen sind technisch innere Klassen, die Zugriff auf eine umschließende Instanz von Migration haben. Wenn sie serialisiert werden, wird das zugehörige Migrationsobjekt ebenfalls serialisiert. Und obwohl es als serialisierbar gekennzeichnet ist, gibt es wahrscheinlich irgendwo eine Instanzvariable, die es nicht ist. Oft ist der Schuldige ein SparkContext-Objekt. –

Verwandte Themen