2016-05-31 3 views
0

Wir verwenden einen benutzerdefinierten Funke-Empfänger, der gestreamte Daten von einem bereitgestellten http-Link liest. Wenn die angegebene http-Verbindung nicht korrekt ist, schlägt der Empfänger fehl. Das Problem besteht darin, dass Spark den Empfänger ständig neu startet und die Anwendung niemals beendet wird. Die Frage ist, wie Spark angewiesen wird, die Anwendung zu beenden, wenn der Empfänger ausfällt. Spark Streaming: Wie man den Empfänger nach einem Fehler des Empfängers nicht neu startet

Dies ist ein Auszug unserer kundenspezifischen Empfänger:

def onStart() { 
    // Start the thread that receives data over a connection 
    new Thread("Receiver") { 
     override def run() { receive() } 
    }.start() 
    } 

    private def receive(): Unit = { 
    .... 
    val response: CloseableHttpResponse = httpclient.execute(req) 
    try { 
     val sl = response.getStatusLine() 
     if (sl.getStatusCode != 200){ 
     val errorMsg = "Error: " + sl.getStatusCode 
     val thrw = new RuntimeException(errorMsg) 
     stop(errorMsg, thrw) 
     } else { 
     ... 
     store(doc) 
     } 

Wir haben eine Funken Anwendung Streaming, der diesen Receiver verwendet:

val ssc = new StreamingContext(sparkConf, duration) 
val changes = ssc.receiverStream(new CustomReceiver(... 
... 
ssc.start() 
ssc.awaitTermination() 

Alles funktioniert wie erwartet, wenn der Empfänger keine Fehler hat . Wenn der Empfänger ausfällt (z. B. mit einer falschen http-Verbindung), wird Spark ständig neu gestartet und die Anwendung wird niemals beendet.

16/05/31 17:03:38 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job 
16/05/31 17:03:38 ERROR ReceiverTracker: Receiver has been stopped. Try to restart it. 

Wir wollen nur die gesamte Anwendung beenden, wenn ein Empfänger ausfällt.

+0

traurig, jetzt veraltet – Jake

Antwort

0

Es scheint, dass die Planung in Spark Streaming so funktioniert, dass ReceiverTracker einen ausgefallenen Empfänger so lange neu startet, bis ReceiverTracker nicht selbst gestoppt wird.

https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L618

ReceiverTracker zu stoppen, müssen wir die gesamte Anwendung stoppen. Daher scheint es keinen Weg zu geben, diesen Prozess von einem Empfänger selbst zu kontrollieren.

2

Es gibt eine Möglichkeit, den Lebenszyklus benutzerdefinierter empfängerbasierter Spark-Streaming-Anwendungen zu steuern. Definieren Sie den Jobfortschrittslistener für Ihre Anwendung und verfolgen Sie, was passiert.

class CustomReceiverListener extends StreamingJobProgressListener { 
    private boolean receiverStopped = false; 

    public CustomReceiverListener(StreamingContext ssc) { super(ssc);} 

    public boolean isReceiverStopped() { 
     return receiverStopped; 
    } 
    @Override 
    public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) { 
     LOG.info("Update the flag field"); 
     this.receiverStopped = true; 
    } 
} 

Und in Ihrem Treiber initialisieren einen Thread den Zustand der receiverStopped Flagge zu überwachen. Der Treiber stoppt die Stream-App, wenn dieser Thread beendet ist. (Besserer Ansatz ist die Definition einer vom Treiber definierten Callback-Methode, die die Streaming-Anwendung stoppt).

CustomReceiverListener listener = new CustomReceiverListener(ssc); 
ssc.addStreamingListener(listener); 
ssc.start(); 
Thread thread = new Thread(() -> { 
    while (!listener.isReceiverStopped()) { 
     LOG.info("Sleepy head..."); 
     try { 
      Thread.sleep(2 * 1000); /*check after 2 seconds*/ 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
}); 
thread.start(); 
thread.join(); 
LOG.info("Listener asked to die! Going to commit suicide :("); 
ssc.stop(true, false); 

Hinweis: Bei mehreren Instanzen Ihres Empfängers, die Umsetzung des CustomReceiverListener ändern, um sicherzustellen, dass alle Empfänger Instanzen gestoppt.

+0

Ich denke, das ist veraltet – Jake

1

Ich habe einen Weg gefunden, benutzerdefinierte Empfänger nach Belieben von überall in der JVM zu starten. Es ist Prototyp, aber nicht stressgetestet. Die Hierarchie der Empfänger-Supervisor usw. wird nicht heruntergefahren, aber sie werden im Wesentlichen inaktiv, bis der benutzerdefinierte Empfänger neu gestartet wird.

Es scheint, die Absichten des Spark-API

  • erstellen ein Objekt, das eine Singleton hält HashMap zu respektieren. Halten Sie den benutzerdefinierten Empfänger und seinen gewünschten Status, aktiviert oder deaktiviert, in der Karte.

;;;

case class IKodaTextSocketReceiverStatus(receiver:IKodaTextSocketReceiver,enabled:Boolean) 
{ 

} 

object IKodaTextSocketReceiver extends Logging 
{ 
    val receiverMap:mutable.HashMap[String,IKodaTextSocketReceiverStatus]= mutable.HashMap[String,IKodaTextSocketReceiverStatus]() 


    def restartReceiver(recId:String):Boolean= 
    { 
    if(receiverMap.get(recId).isDefined) 
    { 
     logger.info("Found existing receiver") 

     receiverMap.put(recId,new IKodaTextSocketReceiverStatus(receiverMap.get(recId).get.receiver,true)) 
     receiverMap.get(recId).get.receiver.onStart() 
     true 

    } 
    else 
    { 
     false 
    } 
    } 

} 

Der aktivierte boolesche Wert kann von überall eingestellt werden. Entweder von innerhalb oder außerhalb des Empfängers. Ich setze die receive Methode.Die Methode onStart prüft den aktivierten Status des Empfängers. Wenn es falsch ist, nicht neu starten, es nicht die Methode in einem neuen Thread erhalten ..... und es gibt Stille :)

class IKodaTextSocketReceiver(host: String, port: Int, receiverId:String) 
    extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { 

    IKodaTextSocketReceiver.receiverMap.put(receiverId,new IKodaTextSocketReceiverStatus(this,true)) 




    def isEnabled(): Boolean = 
    { 
    val bo = IKodaTextSocketReceiver.receiverMap.get(receiverId) 
    if(bo.isDefined) 
     { 
     bo.get.enabled 
     } 
    else 
     { 
     false 
     } 
    } 

    def onStart() { 
    // Start the thread that receives data over a connection 
    if(isEnabled()) 
    { 
     logger.info("Starting IKodaTextSocketReceiver") 
     if(!super.isStopped()) { 
     new Thread("IKodaTextSocketReceiver") { 
      override def run() { 
      receive() 
      } 
     }.start() 
     } 
     else 
     { 
      logger.info("Receiver disabled") 
     } 
    } 
    else 
     { 
     logger.warn("Restarting after stop set") 
     } 
    } 

    def onStop() { 
    // There is nothing much to do as the thread calling receive() 
    // is designed to stop by itself isStopped() returns false 

    } 



    /** Create a socket connection and receive data until receiver is stopped */ 
    private def receive() { 
    var socket: Socket = null 
    var userInput: String = null 
    var keepReceiving=true; 
    try { 
     logger.info(s"Connecting to $host : $port") 
     socket = new Socket(host, port) 
     logger.info(s"Connected to $host : $port") 
     val reader = new BufferedReader(
     new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)) 
     userInput = reader.readLine() 
     while(!isStopped && userInput != null) { 
     store(userInput) 
     userInput = reader.readLine() 
     if(userInput.contains(StreamingConstants.IKODA_END_STREAM)) 
      { 
      logger.info(s"${StreamingConstants.IKODA_END_STREAM}: Calling disabling receiver") 
      IKodaTextSocketReceiver.receiverMap.put(receiverId,new IKodaTextSocketReceiverStatus(this,false)) 
      //stop("Exiting, hopefully with elegance and dignity.") 
      } 
     } 
     reader.close() 
     socket.close() 
     logger.info("Stopped receiving") 
     restart("Trying to connect again",keepReceiving) 
    } catch { 
     case e: java.net.ConnectException => 
     restart(s"Error connecting to $host : $port"+e.getMessage,keepReceiving) 
     case t: Throwable => 
     restart("Error receiving data"+t.getMessage,keepReceiving) 
    } 
    } 

einen benutzerdefinierten Empfänger starten, indem Sie einfach überprüfen, ob es in den HashMap registriert ist. Wenn dies nicht der Fall ist, erstellen Sie eine neue. Wenn dies der Fall ist, setzen Sie einfach auf aktiviert.

 def doStream(ip: String, port: Int): Unit = { 

    try { 
     if(!IKodaTextSocketReceiver.restartReceiver("MyFirstReceiver")) 
     { 
     val ssc = getSparkStreamingContext(fieldVariables) 
     ssc.checkpoint("./ikoda/cp") 
     logger.info("open stream to " + ip + " " + port) 


     val ikReceiver = new IKodaTextSocketReceiver(ip, port, "MyFirstReceiver") 
     val lines: DStream[String] = ssc.receiverStream(ikReceiver) 


     etc etc etc