Der Funke-Streaming-Website unter https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams erwähnt den folgenden Code ein:Funken Streaming und Verbindungspool Implementierung
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
Ich habe versucht, dies mit org.apache.commons.pool2 zu implementieren, aber die Anwendung läuft nicht mit der erwartet java.io.NotSerializableException:
15/05/26 08:06:21 ERROR OneForOneStrategy: org.apache.commons.pool2.impl.GenericObjectPool
java.io.NotSerializableException: org.apache.commons.pool2.impl.GenericObjectPool
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
...
ich frage mich, wie realistisch es ist, einen Verbindungspool zu implementieren, die serialisierbar ist. Ist es jemandem gelungen?
Vielen Dank.
Dieser Ansatz scheint zu funktionieren. Vielen Dank. Ich bezeichne es als die bevorzugte Antwort.Für eine konkrete Implementierung dieser Antwort siehe https://gist.github.com/koen-dejonghe/39c10357607c698c0b04 – botkop
Es ist besser, ein ThreadLocal in Java anstelle von Singleton zu verwenden. Mit ThreadLocal haben Sie eine Instanz pro Thread, so dass Sie Nebenläufigkeitsprobleme vermeiden. –
@CarlosVerdes hängt von Ihren Bedürfnissen ab. Wenn Sie einen Cache implementieren, bedeutet das Verwenden einer ThreadLocal-Instanz das Duplizieren von Daten mit der Anzahl der möglichen Threads, die diesen Codepfad verwenden. Wenn Sie sich nur einen kleinen Datensatz oder eine Ressource merken müssen (in diesem Fall eine Verbindung), könnte dies eine Option sein. – maasg