2017-06-22 2 views
1

Wir verwenden Spark 2.x mit Scala für ein System mit 13 verschiedenen ETL-Operationen. 7 von ihnen sind relativ einfach und werden jeweils von einer einzigen Domänenklasse gesteuert. Sie unterscheiden sich hauptsächlich durch diese Klasse und einige Nuancen in der Handhabung der Last.Polymorphismus mit Spark/Scala, Datasets und Fallklassen

Eine vereinfachte Version der Belastungsklasse wird wie folgt, für die Zwecke dieses Beispiels sagen, dass es 7 Pizzabeläge, hier ist geladen Pepperoni:

object LoadPepperoni { 
    def apply(inputFile: Dataset[Row], 
      historicalData: Dataset[Pepperoni], 
      mergeFun: (Pepperoni, PepperoniRaw) => Pepperoni): Dataset[Pepperoni] = { 
    val sparkSession = SparkSession.builder().getOrCreate() 
    import sparkSession.implicits._ 

    val rawData: Dataset[PepperoniRaw] = inputFile.rdd.map{ case row : Row => 
     PepperoniRaw(
      weight = row.getAs[String]("weight"), 
      cost = row.getAs[String]("cost") 
     ) 
    }.toDS() 

    val validatedData: Dataset[PepperoniRaw] = ??? // validate the data 

    val dedupedRawData: Dataset[PepperoniRaw] = ??? // deduplicate the data 

    val dedupedData: Dataset[Pepperoni] = dedupedRawData.rdd.map{ case datum : PepperoniRaw => 
     Pepperoni(value = ???, key1 = ???, key2 = ???) 
    }.toDS() 

    val joinedData = dedupedData.joinWith(historicalData, 
     historicalData.col("key1") === dedupedData.col("key1") && 
     historicalData.col("key2") === dedupedData.col("key2"), 
     "right_outer" 
    ) 

    joinedData.map { case (hist, delta) => 
     if(/* some condition */) { 
     hist.copy(value = /* some transformation */) 
     } 
    }.flatMap(list => list).toDS() 
    } 
} 

Mit anderen Worten: die Klasse eine Reihe von durchführt Operationen auf den Daten, die Operationen sind meist die gleichen und immer in der gleichen Reihenfolge, aber können leicht variieren je nach Belag, wie würde die Zuordnung von "Raw" zu "Domain" und die Merge-Funktion.

Um dies für 7 Beläge (d. H. Pilz, Käse, etc), würde ich lieber nicht einfach die Klasse kopieren und einfügen und ändern Sie alle Namen, weil die Struktur und Logik für alle Lasten gemeinsam ist. Stattdessen würde ich eher eine generische „Load“ Klasse mit generischen Typen definieren, wie folgt aus:

object Load { 
    def apply[R,D](inputFile: Dataset[Row], 
      historicalData: Dataset[D], 
      mergeFun: (D, R) => D): Dataset[D] = { 
    val sparkSession = SparkSession.builder().getOrCreate() 
    import sparkSession.implicits._ 

    val rawData: Dataset[R] = inputFile.rdd.map{ case row : Row => 
... 

Und für jede Klasse spezifische Betrieb wie Abbildung von „raw“ auf „Domäne“ oder Verschmelzung, haben ein Merkmal oder abstrakte Klasse, die die Besonderheiten implementiert. Dies wäre ein typisches Abhängigkeits-Injektions-/Polymorphismus-Muster.

Aber ich habe ein paar Probleme. Ab Spark 2.x werden Encoder nur für native Typen und Fallklassen bereitgestellt, und es gibt keine Möglichkeit, eine Klasse als Fallklasse generisch zu identifizieren. Daher ist die abgeleitete toDS() - und andere implizite Funktionalität nicht verfügbar, wenn generische Typen verwendet werden.

Auch wie in this related question of mine erwähnt, ist die Fallklasse copy nicht verfügbar, wenn Sie Generics verwenden.

Ich habe andere Entwurfsmuster untersucht, die mit Scala und Haskell wie Typklassen oder Ad-hoc-Polymorphie gemeinsam sind, aber das Hindernis ist das Spark-Dataset, das grundsätzlich nur an Fallklassen arbeitet, die nicht abstrakt definiert werden können.

Es scheint, dass dies ein häufiges Problem in Spark-Systemen wäre, aber ich kann keine Lösung finden. Jede Hilfe wird geschätzt.

Antwort

2

Die implizite Konvertierung, die .toDS ermöglicht ist:

implicit def rddToDatasetHolder[T](rdd: RDD[T])(implicit arg0: Encoder[T]): DatasetHolder[T] 

(von https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SQLImplicits)

Sie sind genau in korrigieren, dass es in Spielraum für Encoder[T] jetzt keine implizite Wert ist, dass Sie Ihre Anwendung Methode gemacht haben generisch, so dass diese Konvertierung nicht passieren kann. Aber Sie können einfach einen als impliziten Parameter akzeptieren!

object Load { 
    def apply[R,D](inputFile: Dataset[Row], 
      historicalData: Dataset[D], 
      mergeFun: (D, R) => D)(implicit enc: Encoder[D]): Dataset[D] = { 
... 

Dann wird bei Zeit Sie Anruf die Last, mit einem bestimmten Typ, sollte es in der Lage sein, einen Encoder für diesen Typen zu finden. Beachten Sie, dass Sie auch im aufrufenden Kontext import sparkSession.implicits._ haben müssen.

Edit: ein ähnlicher Ansatz würde die impliziten newProductEncoder[T <: Product](implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[T]): Encoder[T] zu ermöglichen, wird durch begrenzende Ihre Art zu arbeiten (apply[R, D <: Product]) und eine implizites JavaUniverse.TypeTag[D] als Parameter akzeptieren.

+0

Vielen Dank, dass ist eine Art Geistgebläse lass mich das versuchen.Ich bekomme ähnliche Fehler mit aggregateByKey() und auch mit copy() (siehe den entsprechenden Frage-Link in meinem Beitrag), gibt es eine ähnliche Magie, die dafür die erforderliche Implementierung in den Geltungsbereich bringen kann? –

+0

Allgemeiner würde der Ansatz sein, das spezifische implizite, das fehlt, aufzuspüren und es zu einem Parameter zu machen, der bis zur Aufrufstelle reicht, wo der Typ vollständig spezifiziert ist. Im Fall von aggregateByKey sieht es so aus, als ob Sie einen 'ClassTag [D]' (https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions benötigen). Ich werde mir deine Frage zum 'copy' separat ansehen und dort antworten, wenn ich gute Ideen habe –