2017-05-09 3 views
2

Ich habe Fall Klasse wie folgt:Spark-Dataset Union setzt Klassenvariablen

case class Ais(NotImportant) 
    extends Serializable { 


    var flag = Ais.Flag.NotFlagged 
    var cluster = Ais.Unknown 
    var visited = false 

    override def toString(): String = { 
    s"$cluster,$flag,$visited" 
    } 
} 

Nach meinem Algorithmus läuft ich endend mit zwei Datasets vom Typ bis (Int,Ais) wo Variablen in Ais Objekte Informationen enthalten. Ich muss sie verbinden. Die wichtigsten sind für mich die Werte var cluster und var visited. Nach werden sie jedoch auf die Standardwerte zurückgesetzt.

labeledInner.foreach(println(_)) // This is fine 
println("==========") 
labeledOuter.foreach(println(_)) // This is also fine 
println("==========") 
labeledOuter.union(labeledInner).foreach(println(_)) // Here 
               // everything set to default 

Ich benutze Spark 2.1 und Scala 2.11.8.

Antwort

3

Sie sollten nicht wandelbar vars bei Klassen verwenden, wenn Funken mit - dieser nicht „überleben“ Spark Codierung, so dass jede nicht-triviale Nutzung des Dataset (wie mit union), der Codierung und Decodierung löst nicht erhalten diese Felder.

Warum? Spark verfügt über integrierte Encoder, die Objekte effizient in Byte-Arrays (und zurück) codieren. Für Fallklassen (eigentlich für alle Product s, was meistens Fallklassen und Tupel bedeutet) kodieren die Encoder nur die Case-Class Felder, die als Klassenparameter definiert sind (in Ihrem Fall nur NotImportant). Sie können sehen, indem Sie den entsprechenden Encoder für Ihren Fall-Klasse erstellen und die Überprüfung seiner Schema:

case class A(s: String) { 
    var a: Int = 0 
} 

Encoders.product[A].schema.printTreeString() 
// root 
// |-- s: string (nullable = true) 

Wie Sie sehen können - nur s überlebt, a nicht Teil des Schemas ist.

Was ist die Alternative? Wenn Sie Spark (und wirklich, Scala im Allgemeinen) verwenden, sollten Sie von veränderbaren Feldern Abstand nehmen. Versuchen Sie, Ihre Daten modellieren, um alle Felder als unveränderbar Felder enthalten, zB:

case class Ais(flag: Flag, cluster: Cluster, visited: Boolean) 

Dann zu „mutieren“, diese Objekte können Sie die copy Methode verwenden, die eine neue Instanz mit einigen schafft (oder keine) der Felder veränderte, zB:

val a = Ais(Ais.Flag.NotFlagged, Ais.Unknown, false) 
val b = a.copy(visited = true) 

werden diese Objekte sicher mit Funken zu verwenden (sie „überleben“ Serialisierung und sind unveränderlich).

+0

Danke, das bestätigt meinen Verdacht. Gibt es eine Möglichkeit, den veränderbaren Variablenstatus mit einem anderen Encoder, wie Kryo, zu erhalten? –