2016-02-09 12 views
10

Warum funktioniert die Mustererkennung in Spark nicht wie in Scala? Siehe Beispiel unten ... Die Funktion f() versucht, eine Übereinstimmung für die Klasse zu finden, die in der Scala REPL funktioniert, aber in Spark fehlschlägt und alle "???" ergibt. f2() ist eine Problemumgehung, die das gewünschte Ergebnis in Spark mit .isInstanceOf() erhält, aber ich verstehe, dass schlechte Form in Scala.Fallklassengleichheit in Apache Spark

Jede Hilfe auf Muster, die den richtigen Weg in diesem Szenario in Spark übereinstimmen, würde sehr geschätzt werden.

abstract class a extends Serializable {val a: Int} 
case class b(a: Int) extends a 
case class bNull(a: Int=0) extends a 

val x: List[a] = List(b(0), b(1), bNull()) 
val xRdd = sc.parallelize(x) 

Versuch Pattern-Matching, die in Scala REPL funktioniert aber nicht in Funken

def f(x: a) = x match { 
    case b(n) => "b" 
    case bNull(n) => "bnull" 
    case _ => "???" 
} 

Abhilfe, die Funktionen in Spark, aber schlechte Form ist (glaube ich)

def f2(x: a) = { 
    if (x.isInstanceOf[b]) { 
     "b" 
    } else if (x.isInstanceOf[bNull]) { 
     "bnull" 
    } else { 
     "???" 
    } 
} 

Ergebnisse anzeigen

xRdd.map(f).collect     //does not work in Spark 
             // result: Array("???", "???", "???") 
xRdd.map(f2).collect     // works in Spark 
             // resut: Array("b", "b", "bnull") 
x.map(f(_))       // works in Scala REPL  
             // result: List("b", "b", "bnull") 

Versionen verwendet ... Funken Ergebnisse laufen in Funkenschale (Spark-1.6 auf AWS EMR-4.3) Scala REPL in SBT 0.13.9 (Scala 2.10.5)

Antwort

15

Dies ist ein bekanntes Problem mit Funken REPL. Sie können weitere Details in SPARK-2620 finden. Es betrifft mehrere Operationen in Spark REPL einschließlich der meisten Transformationen auf PairwiseRDDs. Zum Beispiel:

case class Foo(x: Int) 

val foos = Seq(Foo(1), Foo(1), Foo(2), Foo(2)) 
foos.distinct.size 
// Int = 2 

val foosRdd = sc.parallelize(foos, 4) 
foosRdd.distinct.count 
// Long = 4 

foosRdd.map((_, 1)).reduceByKey(_ + _).collect 
// Array[(Foo, Int)] = Array((Foo(1),1), (Foo(1),1), (Foo(2),1), (Foo(2),1)) 

foosRdd.first == foos.head 
// Boolean = false 

Foo.unapply(foosRdd.first) == Foo.unapply(foos.head) 
// Boolean = true 

Was macht es noch schlimmer ist, dass die Ergebnisse auf der Datenverteilung abhängig:

sc.parallelize(foos, 1).distinct.count 
// Long = 2 

sc.parallelize(foos, 1).map((_, 1)).reduceByKey(_ + _).collect 
// Array[(Foo, Int)] = Array((Foo(2),2), (Foo(1),2)) 

Die einfachste Sache, die Sie tun können, definieren und Paket erforderlich Fallklassen außerhalb REPL. Jeder Code, der direkt mit spark-submit eingereicht wurde, sollte ebenfalls funktionieren. In Scala 2.11+ können Sie ein Paket direkt in der REPL mit paste -raw erstellen.

scala> :paste -raw 
// Entering paste mode (ctrl-D to finish) 

package bar 

case class Bar(x: Int) 


// Exiting paste mode, now interpreting. 

scala> import bar.Bar 
import bar.Bar 

scala> sc.parallelize(Seq(Bar(1), Bar(1), Bar(2), Bar(2))).distinct.collect 
res1: Array[bar.Bar] = Array(Bar(1), Bar(2)) 
+0

Danke zero323! Ich sehe die Erwähnung des Musterabgleichs, der dort in der Spark-Shell nicht funktioniert, aber keine Details ... Sie sagen, wenn ich meine Fall-Klassen in einem Jar definiere, kann ich ihnen in der REPL ein Muster zuordnen? Danke noch einmal! – kmh

+1

Genau. Außerhalb definieren, jar erstellen, in den 'CLASSPATH' aufnehmen und importieren. – zero323

+0

Perfekt! Danke noch einmal! – kmh