2017-05-20 3 views
3

Ich möchte eine Jupyter/IPython Erweiterung erstellen, um Apache Spark Jobs zu überwachen.Wie füge ich einen SparkListener aus pySpark in Python hinzu?

Spark stellt eine REST-API bereit.

Anstatt jedoch den Server abzurufen, möchte ich die Ereignisupdates durch Rückrufe gesendet werden.

Ich versuche eine SparkListener mit der SparkContext.addSparkListener() zu registrieren. Diese Funktion ist im PySpark-Objekt SparkContext in Python nicht verfügbar. Also, wie kann ich einen Python-Listener zu Scala/Java-Version des Kontexts von Python registrieren. Ist es möglich, dies durch py4j zu tun? Ich möchte, dass Python-Funktionen aufgerufen werden, wenn die Ereignisse im Listener ausgelöst werden.

Antwort

10

Es ist möglich, obwohl es ein bisschen beteiligt ist. Wir können Py4j callback mechanism verwenden, um Nachricht von einem SparkListener zu übergeben. Zuerst erstellen wir ein Scala-Paket mit allen benötigten Klassen.Verzeichnisstruktur:

. 
├── build.sbt 
└── src 
    └── main 
     └── scala 
      └── net 
       └── zero323 
        └── spark 
         └── examples 
          └── listener 
           ├── Listener.scala 
           ├── Manager.scala 
           └── TaskListener.scala 

build.sbt:

name := "listener" 

organization := "net.zero323" 

scalaVersion := "2.11.7" 

val sparkVersion = "2.1.0" 

libraryDependencies ++= List(
    "org.apache.spark" %% "spark-core" % sparkVersion, 
    "net.sf.py4j" % "py4j" % "0.10.4" // Just for the record 
) 

Listener.scala definiert eine Python-Schnittstelle wir später

package net.zero323.spark.examples.listener 

/* You can add arbitrary methods here, 
* as long as these match corresponding Python interface 
*/ 
trait Listener { 
    /* This will be implemented by a Python class. 
    * You can of course use more specific types, 
    * for example here String => Unit */ 
    def notify(x: Any): Any 
} 

Manager.scala wird verwendet, um implementieren wollen Nachrichten an Python Zuhörer:

package net.zero323.spark.examples.listener 

object Manager { 
    var listeners: Map[String, Listener] = Map() 

    def register(listener: Listener): String = { 
    this.synchronized { 
     val uuid = java.util.UUID.randomUUID().toString 
     listeners = listeners + (uuid -> listener) 
     uuid 
    } 
    } 

    def unregister(uuid: String) = { 
    this.synchronized { 
     listeners = listeners - uuid 
    } 
    } 

    def notifyAll(message: String): Unit = { 
    for { (_, listener) <- listeners } listener.notify(message) 
    } 

} 

Endlich ein einfaches SparkListener:

package net.zero323.spark.examples.listener 

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} 
import org.json4s._ 
import org.json4s.JsonDSL._ 
import org.json4s.jackson.JsonMethods._ 

/* A simple listener which captures SparkListenerTaskEnd, 
* extracts numbers of records written by the task 
* and converts to JSON. You can of course add handlers 
* for other events as well. 
*/ 
class PythonNotifyListener extends SparkListener { 
    override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { 
    val recordsWritten = taskEnd.taskMetrics.outputMetrics.recordsWritten 
    val message = compact(render(
     ("recordsWritten" -> recordsWritten) 
    )) 
    Manager.notifyAll(message) 
    } 
} 

Lets' Paket unserer Erweiterung:

sbt package 

und PySpark Sitzung startet einen generierte jar zum Klassenpfad hinzufügen und Registrierung Zuhörer:

$SPARK_HOME/bin/pyspark \ 
    --driver-class-path target/scala-2.11/listener_2.11-0.1-SNAPSHOT.jar \ 
    --conf spark.extraListeners=net.zero323.spark.examples.listener.PythonNotifyListener 

Als nächstes müssen wir ein Python-Objekt definieren, dasimplementiertSchnittstelle:

class PythonListener(object): 
    package = "net.zero323.spark.examples.listener" 

    @staticmethod 
    def get_manager(): 
     jvm = SparkContext.getOrCreate()._jvm 
     manager = getattr(jvm, "{}.{}".format(PythonListener.package, "Manager")) 
     return manager 

    def __init__(self): 
     self.uuid = None 

    def notify(self, obj): 
     """This method is required by Scala Listener interface 
     we defined above. 
     """ 
     print(obj) 

    def register(self): 
     manager = PythonListener.get_manager() 
     self.uuid = manager.register(self) 
     return self.uuid 

    def unregister(self): 
     manager = PythonListener.get_manager() 
     manager.unregister(self.uuid) 
     self.uuid = None 

    class Java: 
     implements = ["net.zero323.spark.examples.listener.Listener"] 

Start Rückruf-Server:

sc._gateway.start_callback_server() 

erstellen und Zuhörer registrieren:

listener = PythonListener() 

registrieren sie:

listener.register() 

und Test:

>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test") 
{"recordsWritten":33} 
{"recordsWritten":34} 
{"recordsWritten":33} 

Beim Verlassen Sie sollten Abschaltung der Rückruf-Server:

sc._gateway.shutdown_callback_server() 

Hinweis:

Dies sollte mit Vorsicht verwendet werden, wenn sie mit Spark-Streaming arbeiten, die intern Callback-Server verwendet.

bearbeiten:

Wenn dies zu viel Aufwand ist, könnte man nur org.apache.spark.scheduler.SparkListenerInterface definieren:

class SparkListener(object): 
    def onApplicationEnd(self, applicationEnd): 
     pass 
    def onApplicationStart(self, applicationStart): 
     pass 
    def onBlockManagerRemoved(self, blockManagerRemoved): 
     pass 
    def onBlockUpdated(self, blockUpdated): 
     pass 
    def onEnvironmentUpdate(self, environmentUpdate): 
     pass 
    def onExecutorAdded(self, executorAdded): 
     pass 
    def onExecutorMetricsUpdate(self, executorMetricsUpdate): 
     pass 
    def onExecutorRemoved(self, executorRemoved): 
     pass 
    def onJobEnd(self, jobEnd): 
     pass 
    def onJobStart(self, jobStart): 
     pass 
    def onOtherEvent(self, event): 
     pass 
    def onStageCompleted(self, stageCompleted): 
     pass 
    def onStageSubmitted(self, stageSubmitted): 
     pass 
    def onTaskEnd(self, taskEnd): 
     pass 
    def onTaskGettingResult(self, taskGettingResult): 
     pass 
    def onTaskStart(self, taskStart): 
     pass 
    def onUnpersistRDD(self, unpersistRDD): 
     pass 
    class Java: 
     implements = ["org.apache.spark.scheduler.SparkListenerInterface"] 

es erweitern:

class TaskEndListener(SparkListener): 
    def onTaskEnd(self, taskEnd): 
     print(taskEnd.toString()) 

und direkt verwenden:

>>> sc._gateway.start_callback_server() 
True 
>>> listener = TaskEndListener() 
>>> sc._jsc.sc().addSparkListener(listener) 
>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test_simple") 
SparkListenerTaskEnd(0,0,ResultTask,Success,[email protected],[email protected]) 
SparkListenerTaskEnd(0,0,ResultTask,Success,[email protected],[email protected]) 
SparkListenerTaskEnd(0,0,ResultTask,Success,[email protected]) 

Während diese Methode nicht einfach ist (mehr Verkehr zwischen JVM und Python), erfordert die Behandlung von Java-Objekten in Python-Sitzung.

Verwandte Themen