2016-03-28 6 views
5

Ich versuche zu vermeiden "while (true)" -Lösung, wenn ich warte, bis meine Spark Apache Job erledigt ist, aber ohne Erfolg.Wie man während des Startens von einer anderen Anwendung auf Apache Funke Launcher Job ordnungsgemäß wartet?

Ich habe Spark-Anwendung, die einige Daten verarbeiten und ein Ergebnis in die Datenbank eingeben soll, ich rufe es aus meinem Frühling Service und möchte warten, bis der Job erledigt ist.

Beispiel:

Launcher mit Methode:

@Override 
public void run(UUID docId, String query) throws Exception { 
    launcher.addAppArgs(docId.toString(), query); 

    SparkAppHandle sparkAppHandle = launcher.startApplication(); 

    sparkAppHandle.addListener(new SparkAppHandle.Listener() { 
     @Override 
     public void stateChanged(SparkAppHandle handle) { 
      System.out.println(handle.getState() + " new state"); 
     } 

     @Override 
     public void infoChanged(SparkAppHandle handle) { 
      System.out.println(handle.getState() + " new state"); 
     } 
    }); 

    System.out.println(sparkAppHandle.getState().toString()); 
} 

Wie man richtig warten, bis Zustand handler "Finished" wird.

+0

konnten Sie das lösen? – gaurav5430

Antwort

2

Ich verwende auch SparkLauncher aus einer Spring-Anwendung. Hier ist eine Zusammenfassung des Ansatzes, den ich gewählt habe (anhand von Beispielen im JavaDoc).

Der @Service, der zum Starten des Jobs verwendet wird, implementiert auch SparkHandle.Listener und übergibt eine Referenz an sich selbst über .startApplication, z.

... 
... 
@Service 
public class JobLauncher implements SparkAppHandle.Listener { 
... 
... 
... 
private SparkAppHandle launchJob(String mainClass, String[] args) throws Exception { 

    String appResource = getAppResourceName(); 

    SparkAppHandle handle = new SparkLauncher() 
     .setAppResource(appResource).addAppArgs(args) 
     .setMainClass(mainClass) 
     .setMaster(sparkMaster) 
     .setDeployMode(sparkDeployMode) 
     .setSparkHome(sparkHome) 
     .setConf(SparkLauncher.DRIVER_MEMORY, "2g") 
     .startApplication(this); 

    LOG.info("Launched [" + mainClass + "] from [" + appResource + "] State [" + handle.getState() + "]"); 

    return handle; 
} 

/** 
* Callback method for changes to the Spark Job 
*/ 
@Override 
public void infoChanged(SparkAppHandle handle) { 

    LOG.info("Spark App Id [" + handle.getAppId() + "] Info Changed. State [" + handle.getState() + "]"); 

} 

/** 
* Callback method for changes to the Spark Job's state 
*/ 
@Override 
public void stateChanged(SparkAppHandle handle) { 

    LOG.info("Spark App Id [" + handle.getAppId() + "] State Changed. State [" + handle.getState() + "]"); 

} 

Mit diesem Ansatz kann man Maßnahmen ergreifen, wenn der Status auf „nicht bestanden“, „FERTIG“ oder „getötet“.

Ich hoffe, dass diese Informationen für Sie hilfreich sind.

+0

Ich bin auch mit dem gleichen Problem konfrontiert. Ich habe es mit der Art und Weise versucht, wie man OP benutzt (neues anonymen Listener-Objekt erstellen) und wie man es beschreibt. In beiden Fällen wurden Listener-Methoden nicht aufgerufen. – Reddy

+0

@Reddy: Konnten Sie das funktionieren? Ich habe auch das gleiche Problem. Die Anwendung wird sofort mit appID und state beendet. Funktioniert, wenn ich Thread.sleep() explizit aufruft. Vielen Dank! – Tariq

1

Ich implementiert mit CountDownLatch, und es funktioniert wie erwartet.

... 
final CountDownLatch countDownLatch = new CountDownLatch(1); 
SparkAppListener sparkAppListener = new SparkAppListener(countDownLatch); 
SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener); 
Thread sparkAppListenerThread = new Thread(sparkAppListener); 
sparkAppListenerThread.start(); 
long timeout = 120; 
countDownLatch.await(timeout, TimeUnit.SECONDS);  
    ... 

private static class SparkAppListener implements SparkAppHandle.Listener, Runnable { 
    private static final Log log = LogFactory.getLog(SparkAppListener.class); 
    private final CountDownLatch countDownLatch; 
    public SparkAppListener(CountDownLatch countDownLatch) { 
     this.countDownLatch = countDownLatch; 
    } 
    @Override 
    public void stateChanged(SparkAppHandle handle) { 
     String sparkAppId = handle.getAppId(); 
     State appState = handle.getState(); 
     if (sparkAppId != null) { 
      log.info("Spark job with app id: " + sparkAppId + ",\t State changed to: " + appState + " - " 
        + SPARK_STATE_MSG.get(appState)); 
     } else { 
      log.info("Spark job's state changed to: " + appState + " - " + SPARK_STATE_MSG.get(appState)); 
     } 
     if (appState != null && appState.isFinal()) { 
      countDownLatch.countDown(); 
     } 
    } 
    @Override 
    public void infoChanged(SparkAppHandle handle) {} 
    @Override 
    public void run() {} 
} 
Verwandte Themen