2015-05-26 12 views
6

Der Funke-Streaming-Website unter https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams erwähnt den folgenden Code ein:Funken Streaming und Verbindungspool Implementierung

dstream.foreachRDD { rdd => 
    rdd.foreachPartition { partitionOfRecords => 
    // ConnectionPool is a static, lazily initialized pool of connections 
    val connection = ConnectionPool.getConnection() 
    partitionOfRecords.foreach(record => connection.send(record)) 
    ConnectionPool.returnConnection(connection) // return to the pool for future reuse 
    } 
} 

Ich habe versucht, dies mit org.apache.commons.pool2 zu implementieren, aber die Anwendung läuft nicht mit der erwartet java.io.NotSerializableException:

15/05/26 08:06:21 ERROR OneForOneStrategy: org.apache.commons.pool2.impl.GenericObjectPool 
java.io.NotSerializableException: org.apache.commons.pool2.impl.GenericObjectPool 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
... 

ich frage mich, wie realistisch es ist, einen Verbindungspool zu implementieren, die serialisierbar ist. Ist es jemandem gelungen?

Vielen Dank.

Antwort

10

dieses „lokale Ressource“ Problem zu begegnen, was benötigt wird ist ein Singleton-Objekt - das heißt ein Objekt, das einmal instanziiert werden gerechtfertigt ist und nur einmal in der JVM. Zum Glück, Scala object bietet diese Funktionalität aus der Box.

Die zweite Sache zu berücksichtigen ist, dass dieser Singleton einen Service für alle Aufgaben auf der gleichen JVM, wo es gehostet, bietet, also muss kümmern sich um Nebenläufigkeit und Ressourcen-Management.

Lassen Sie uns versuchen zu skizzieren (*) einen solchen Dienst:

class ManagedSocket(private val pool: ObjectPool, val socket:Socket) { 
    def release() = pool.returnObject(socket) 
} 

// singleton object 
object SocketPool { 
    var hostPortPool:Map[(String, Int),ObjectPool] = Map() 
    sys.addShutdownHook{ 
     hostPortPool.values.foreach{ // terminate each pool } 
    } 

    // factory method 
    def apply(host:String, port:String): ManagedSocket = { 
     val pool = hostPortPool.getOrElse{(host,port), { 
      val p = ??? // create new pool for (host, port) 
      hostPortPool += (host,port) -> p 
      p 
     } 
     new ManagedSocket(pool, pool.borrowObject) 
    } 
} 

Dann Nutzung wird:

val host = ??? 
val port = ??? 
stream.foreachRDD { rdd => 
    rdd.foreachPartition { partition => 
     val mSocket = SocketPool(host, port) 
     partition.foreach{elem => 
      val os = mSocket.socket.getOutputStream() 
      // do stuff with os + elem 
     } 
     mSocket.release() 
    } 
} 

Ich gehe davon aus, dass die GenericObjectPool in der Frage verwendet, der Gleichzeitigkeit kümmert. Andernfalls muss der Zugriff auf jede Instanz mit einer Art von Synchronisation geschützt werden.

(*) Code zur Verfügung gestellt, um die Idee zu veranschaulichen, wie ein solches Objekt zu entwerfen - braucht zusätzlichen Aufwand, um in eine funktionierende Version umgewandelt werden.

+2

Dieser Ansatz scheint zu funktionieren. Vielen Dank. Ich bezeichne es als die bevorzugte Antwort.Für eine konkrete Implementierung dieser Antwort siehe https://gist.github.com/koen-dejonghe/39c10357607c698c0b04 – botkop

+0

Es ist besser, ein ThreadLocal in Java anstelle von Singleton zu verwenden. Mit ThreadLocal haben Sie eine Instanz pro Thread, so dass Sie Nebenläufigkeitsprobleme vermeiden. –

+0

@CarlosVerdes hängt von Ihren Bedürfnissen ab. Wenn Sie einen Cache implementieren, bedeutet das Verwenden einer ThreadLocal-Instanz das Duplizieren von Daten mit der Anzahl der möglichen Threads, die diesen Codepfad verwenden. Wenn Sie sich nur einen kleinen Datensatz oder eine Ressource merken müssen (in diesem Fall eine Verbindung), könnte dies eine Option sein. – maasg

1

Unten ist die Antwort falsch! Ich lasse die Antwort hier als Referenz, aber die Antwort ist falsch aus dem folgenden Grund. socketPool wird als lazy val deklariert, so dass es bei jeder ersten Anfrage für den Zugriff instanziiert wird. Da die SocketPool-Fallklasse nicht Serializable ist, bedeutet dies, dass sie in jeder Partition instanziiert wird. Dies macht den Verbindungspool nutzlos, da wir Verbindungen zwischen Partitionen und RDDs beibehalten möchten. Es spielt keine Rolle, ob dies als Begleitobjekt oder als Fallklasse implementiert ist. Die untere Zeile ist: Der Verbindungspool muss Serializable sein, und Apache Commons Pool ist nicht.

import java.io.PrintStream 
import java.net.Socket 

import org.apache.commons.pool2.{PooledObject, BasePooledObjectFactory} 
import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool} 
import org.apache.spark.streaming.dstream.DStream 

/** 
* Publish a Spark stream to a socket. 
*/ 
class PooledSocketStreamPublisher[T](host: String, port: Int) 
    extends Serializable { 

    lazy val socketPool = SocketPool(host, port) 

    /** 
    * Publish the stream to a socket. 
    */ 
    def publishStream(stream: DStream[T], callback: (T) => String) = { 
     stream.foreachRDD { rdd => 

      rdd.foreachPartition { partition => 

       val socket = socketPool.getSocket 
       val out = new PrintStream(socket.getOutputStream) 

       partition.foreach { event => 
        val text : String = callback(event) 
        out.println(text) 
        out.flush() 
       } 

       out.close() 
       socketPool.returnSocket(socket) 

      } 
     } 
    } 

} 

class SocketFactory(host: String, port: Int) extends BasePooledObjectFactory[Socket] { 

    def create(): Socket = { 
     new Socket(host, port) 
    } 

    def wrap(socket: Socket): PooledObject[Socket] = { 
     new DefaultPooledObject[Socket](socket) 
    } 

} 

case class SocketPool(host: String, port: Int) { 

    val socketPool = new GenericObjectPool[Socket](new SocketFactory(host, port)) 

    def getSocket: Socket = { 
     socketPool.borrowObject 
    } 

    def returnSocket(socket: Socket) = { 
     socketPool.returnObject(socket) 
    } 

} 

was können Sie wie folgt aufrufen:

val socketStreamPublisher = new PooledSocketStreamPublisher[MyEvent](host = "10.10.30.101", port = 29009) 
socketStreamPublisher.publishStream(myEventStream, (e: MyEvent) => Json.stringify(Json.toJson(e))) 
+0

Eigentlich möchten Sie Ihren Pool zu einem "Objekt" machen, das ein Singleton für jede JVM ist, auf der Executoren laufen. Auf diese Weise wird die Initialisierung nur einmal ausgeführt und kann den Status (wie Sockets) zwischen den Spark-Tasks beibehalten. – maasg

+0

Wie kann ich Eigenschaften wie Host und Port in einem Objekt konfigurieren? Weil ich genau deshalb eine Fallklasse anstelle eines Objekts verwende. – botkop

+1

Ihre Anmerkung ist korrekt - diese Version wird den Pool für jede Interaktion der mapPartitions auf jedem Executor wiederherstellen - Ich habe einige Minuten gefunden, um eine Lösung zu skizzieren, die funktionieren würde; siehe Antwort. – maasg

Verwandte Themen