Ich versuche, Join-Methode zwischen 2 RDD zu verwenden und es in Cassandra speichern, aber mein Code nicht funktionieren. Am Anfang bekomme ich eine große Main-Methode und alles funktioniert gut, aber wenn ich Funktion und Klasse verwende, funktioniert das nicht. Ich bin neu zu scala und FunkenScala/Spark serializable Fehler - Join nicht funktionieren
Code ist:
class Migration extends Serializable {
case class userId(offerFamily: String, bp: String, pdl: String) extends Serializable
case class siteExternalId(site_external_id: Option[String]) extends Serializable
case class profileData(begin_ts: Option[Long], Source: Option[String]) extends Serializable
def SparkMigrationProfile(sc: SparkContext) = {
val test = sc.cassandraTable[siteExternalId](KEYSPACE,TABLE)
.keyBy[userId]
.filter(x => x._2.site_external_id != None)
val profileRDD = sc.cassandraTable[profileData](KEYSPACE,TABLE)
.keyBy[userId]
//dont work
test.join(profileRDD)
.foreach(println)
// don't work
test.join(profileRDD)
.saveToCassandra(keyspace, table)
}
Am beginig i die berühmten erhalten: Exception in thread "main" org.apache.spark.SparkException: Aufgabe nicht serializable an. . . also erweitere ich meine Hauptklasse und auch die Fallklasse aber trotzdem funktioniert das nicht.
es funktioniert! Ich bin jetzt so dumm geworden. . . Kannst du mir erklären warum? – user3394825
hi @ user3394825, es ist schwer zu sagen, weil ich Spark mit Cassandra nicht verwende. Basierend auf meiner Erfahrung hatte ich ähnliche Probleme bei der Verwendung von Fallklassen in anderen Klassen definiert. In Ihrer Situation könnte ein Problem mit der Erstellung von impliziten Parametern für die Funktion "cassandraTable" auftreten (https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra- connector/src/main/scala/com/datastax/spark/connector/SparkContextFunctions.scala) z 'rrf: RowReaderFactory [T], ev: ValidRDDType [T]', aber ich rate nur. Ich weiß, dass bei Verwendung von Spark SQL Encoder gibt es auch eine ähnliche Ausnahme. –
Die Fallklassen sind technisch innere Klassen, die Zugriff auf eine umschließende Instanz von Migration haben. Wenn sie serialisiert werden, wird das zugehörige Migrationsobjekt ebenfalls serialisiert. Und obwohl es als serialisierbar gekennzeichnet ist, gibt es wahrscheinlich irgendwo eine Instanzvariable, die es nicht ist. Oft ist der Schuldige ein SparkContext-Objekt. –