2017-04-08 1 views
0

Ich brauche einen Tester für Scala Spark-Filter, mit Tester Java-Prädikat-Schnittstelle implementieren und bestimmten Klassennamen durch Argumente erhalten. Ich istSpark Scala Dynamische Erstellung von Serializable-Objekt

val tester = Class.forName(qualifiedName).newInstance().asInstanceOf[Predicate[T]] 
var filtered = rdd.filter(elem => tester.test(elem)) 

Das Problem wie dieses, etwas zu tun, dass ich zur Laufzeit eine Spark „TaskNotSerializable Exception“, weil meine spezifischen Prädikatsklasse haben, ist nicht Serializable.

Wenn ich

val tester = Class.forName(qualifiedName).newInstance() 
      .asInstanceOf[Predicate[T] with Serializable] 
var filtered = rdd.filter(elem => tester.test(elem)) 

Ich bekomme den gleichen Fehler. Wenn ich Tester in rdd.filter erstellen nennen es funktioniert:

var filtered = rdd.filter { elem => 
    val tester = Class.forName(qualifiedName).newInstance() 
      .asInstanceOf[Predicate[T] with Serializable] 
    tester.test(elem) 
} 

Aber ich würde ein einzelnes Objekt (vielleicht auszustrahlen) zum Testen erstellen. Wie kann ich auflösen?

Antwort

0

Sie müssen nur die Klasse implementieren Serializable. Beachten Sie, dass die asInstanceOf[Predicate[T] with Serializable] Umwandlung eine Lüge ist: Sie prüft nicht, ob der Wert Serializable ist, weshalb der zweite Fall nicht sofort während der Umwandlung einen Fehler erzeugt und der letzte "erfolgreich" ist.

Aber ich würde ein einzelnes Objekt (vielleicht zu Broadcast) zum Testen erstellen.

Sie können nicht. Broadcast oder nicht, Deserialisierung erstellt neue Objekte auf Worker-Knoten. Sie können jedoch nur eine einzige Instanz auf jeder Partition zu erstellen:

var filtered = rdd.mapPartitions { iter => 
    val tester = Class.forName(qualifiedName).newInstance() 
      .asInstanceOf[Predicate[T]] 
    iter.filter(tester.test) 
} 

Es wird tatsächlich eine bessere Leistung als die tester Serialisierung, sendet es und Deserialisieren wäre es, da es streng weniger Arbeit.

+0

Oh, danke für den Vorschlag! Aber ich bekomme zwei Fehler: 'Kein ClassTag für T verfügbar Fehler in einer Anwendung mit Standardargumenten aufgetreten. ' nicht genug Argumente für die Methode mapPartitions: (impliziten Beweis $ 6: scala.reflect.ClassTag [T]) org. apache.spark.rdd.RDD [T]. Nicht spezifizierter Wertparameter belegt $ 6. In einer Anwendung, die Standardargumente enthält, ist ein Fehler aufgetreten. Ich habe preservePartitions boolean (wie false) hinzugefügt, aber ich kann nicht verstehen, was gefragt wird (Beweis ist in der API undokumentiert) – Andrean

+0

Einige Spark-Operationen erfordern implizite 'ClassTag'-Parameter. Eine Beschreibung von 'ClassTags' finden Sie unter http://docs.scala-lang.org/overviews/reflection/typetags-manifests.html oder suchen Sie einfach nach weiteren Informationen. Wenn Sie annehmen, dass 'T' ein Typparameter Ihrer Methode/Klasse ist, sollten Sie nur': ClassTag' hinzufügen (dh wenn dieser Code in 'def foo [T] (...)' steht, ersetzen Sie ihn es mit 'def foo [T: ClassTag]'). Kompilieren und aktualisieren Sie alle Anrufer, die ähnlich einen Kompilierungsfehler geben. –

+0

mmh welche Funktion sollte foo [T] ..? Ich benutze test (T) -Methode von java.util.function.Predicate zu filtern, die gefilterten Elemente sind auch eine Java-Klasse, so dass ich nicht weiß, wo der classTag Marker – Andrean