Ich habe einen Weg gefunden, benutzerdefinierte Empfänger nach Belieben von überall in der JVM zu starten. Es ist Prototyp, aber nicht stressgetestet. Die Hierarchie der Empfänger-Supervisor usw. wird nicht heruntergefahren, aber sie werden im Wesentlichen inaktiv, bis der benutzerdefinierte Empfänger neu gestartet wird.
Es scheint, die Absichten des Spark-API
- erstellen ein Objekt, das eine Singleton hält
HashMap
zu respektieren. Halten Sie den benutzerdefinierten Empfänger und seinen gewünschten Status, aktiviert oder deaktiviert, in der Karte.
;;;
case class IKodaTextSocketReceiverStatus(receiver:IKodaTextSocketReceiver,enabled:Boolean)
{
}
object IKodaTextSocketReceiver extends Logging
{
val receiverMap:mutable.HashMap[String,IKodaTextSocketReceiverStatus]= mutable.HashMap[String,IKodaTextSocketReceiverStatus]()
def restartReceiver(recId:String):Boolean=
{
if(receiverMap.get(recId).isDefined)
{
logger.info("Found existing receiver")
receiverMap.put(recId,new IKodaTextSocketReceiverStatus(receiverMap.get(recId).get.receiver,true))
receiverMap.get(recId).get.receiver.onStart()
true
}
else
{
false
}
}
}
Der aktivierte boolesche Wert kann von überall eingestellt werden. Entweder von innerhalb oder außerhalb des Empfängers. Ich setze die receive
Methode.Die Methode onStart
prüft den aktivierten Status des Empfängers. Wenn es falsch ist, nicht neu starten, es nicht die Methode in einem neuen Thread erhalten ..... und es gibt Stille :)
class IKodaTextSocketReceiver(host: String, port: Int, receiverId:String)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
IKodaTextSocketReceiver.receiverMap.put(receiverId,new IKodaTextSocketReceiverStatus(this,true))
def isEnabled(): Boolean =
{
val bo = IKodaTextSocketReceiver.receiverMap.get(receiverId)
if(bo.isDefined)
{
bo.get.enabled
}
else
{
false
}
}
def onStart() {
// Start the thread that receives data over a connection
if(isEnabled())
{
logger.info("Starting IKodaTextSocketReceiver")
if(!super.isStopped()) {
new Thread("IKodaTextSocketReceiver") {
override def run() {
receive()
}
}.start()
}
else
{
logger.info("Receiver disabled")
}
}
else
{
logger.warn("Restarting after stop set")
}
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
var socket: Socket = null
var userInput: String = null
var keepReceiving=true;
try {
logger.info(s"Connecting to $host : $port")
socket = new Socket(host, port)
logger.info(s"Connected to $host : $port")
val reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput)
userInput = reader.readLine()
if(userInput.contains(StreamingConstants.IKODA_END_STREAM))
{
logger.info(s"${StreamingConstants.IKODA_END_STREAM}: Calling disabling receiver")
IKodaTextSocketReceiver.receiverMap.put(receiverId,new IKodaTextSocketReceiverStatus(this,false))
//stop("Exiting, hopefully with elegance and dignity.")
}
}
reader.close()
socket.close()
logger.info("Stopped receiving")
restart("Trying to connect again",keepReceiving)
} catch {
case e: java.net.ConnectException =>
restart(s"Error connecting to $host : $port"+e.getMessage,keepReceiving)
case t: Throwable =>
restart("Error receiving data"+t.getMessage,keepReceiving)
}
}
einen benutzerdefinierten Empfänger starten, indem Sie einfach überprüfen, ob es in den HashMap
registriert ist. Wenn dies nicht der Fall ist, erstellen Sie eine neue. Wenn dies der Fall ist, setzen Sie einfach auf aktiviert.
def doStream(ip: String, port: Int): Unit = {
try {
if(!IKodaTextSocketReceiver.restartReceiver("MyFirstReceiver"))
{
val ssc = getSparkStreamingContext(fieldVariables)
ssc.checkpoint("./ikoda/cp")
logger.info("open stream to " + ip + " " + port)
val ikReceiver = new IKodaTextSocketReceiver(ip, port, "MyFirstReceiver")
val lines: DStream[String] = ssc.receiverStream(ikReceiver)
etc etc etc
traurig, jetzt veraltet – Jake