2009-11-27 13 views
5

Ich habe FutureTask von java.util.concurrent erweitert, um Callbacks bereitzustellen, um die Ausführung von Aufgaben zu verfolgen, die an eine ExecutorService übergeben werden.Erweitern von FutureTask, wie mit cancel umgehen

public class StatusTask<V> extends FutureTask<V> { 

    private final ITaskStatusHandler<V> statusHandler; 

    public StatusTask(Callable<V> callable, ITaskStatusHandler<V> statusHandler){ 
     super(callable); 
     if (statusHandler == null) 
      throw new NullPointerException("statusHandler cannot be null"); 
     this.statusHandler = statusHandler; 
     statusHandler.TaskCreated(this); 
    } 

    @Override 
    public void run() { 
     statusHandler.TaskRunning(this); 
     super.run(); 
    } 

    @Override 
    protected void done() { 
     super.done(); 
     statusHandler.TaskCompleted(this); 
    } 

} 

Nun, was ich sehe, ist, wenn die Aufgabe gestellt, aber am Ende der Warteschlange und i cancel(true); die Aufgabe - die run() Methode wird noch genannt - und die FutureTask.run() (wahrscheinlich) prüft, ob die Aufgabe abgebrochen und doesn nenne nicht das umhüllte Callable.

Sollte ich z.B.

@Override 
public void run() { 
    if(!isCancelled()) { 
    statusHandler.TaskRunning(this); 
    super.run(); 
    } 
} 

Oder soll ich noch anrufen super.run()? Beide Ansätze scheinen anfällig zu sein für Race Conditions zwischen dem Überprüfen der Annullierung und dem Tun von etwas darüber.

Antwort

3

Du hast Recht, dass ein Rennen gibt es dort. FutureTask#done() heißt höchstens einmal, wenn also die Aufgabe bereits abgebrochen wurde, bevor sie jemals über RunnableFuture#run() ausgeführt wurde, haben Sie den Anruf an FutureTask#done() verpasst.

Haben Sie einen einfacheren Ansatz in Betracht gezogen, der immer einen symmetrischen Satz gepaarter Aufrufe an ITaskStatusHandler#taskRunning() und ITaskStatusHandler#taskCompleted() ausgibt?

@Override 
public void run() { 
    statusHandler.TaskRunning(this); 
    try { 
    super.run(); 
    finally { 
    statusHandler.TaskCompleted(this); 
    } 
} 

Sobald RunnableFuture#run() heißt, es ist wahr, dass Ihre Aufgabe in Betrieb oder zumindest versuchen zu laufen. Sobald FutureTask#run() fertig ist, wird Ihre Aufgabe nicht mehr ausgeführt. Es kommt vor, dass im Falle einer Stornierung der Übergang (fast) sofort erfolgt.

zu vermeiden Versuch ITaskStatusHandler#taskRunning() anrufen, wenn die inneren Callable oder Runnable nie von FutureTask#run() aufgerufen wird, muß einige gemeinsam genutzte Struktur zwischen den Callable oder Runnable und dem FutureTask abgeleiteten Typ selbst, zu etablieren, so dass, wenn Ihre innere Funktion zuerst aufgerufen wird Sie setzen ein Flag, das der äußere FutureTask-abgeleitete Typ als Latch beobachten kann, was anzeigt, dass ja die Funktion gestartet wurde, bevor abgebrochen wurde. Bis dahin mussten Sie sich jedoch verpflichtet haben, ITaskStatusHandler#taskRunning() aufzurufen, daher ist die Unterscheidung nicht so nützlich.

Ich habe in letzter Zeit mit einem ähnlichen Design-Problem zu kämpfen, und vor auf den symmetrischen Absetzen Liquidation und nach Operationen in Methode meiner außer Kraft gesetzt FutureTask#run().

+0

Ich nehme an, der Aufruf in der finally-Klausel sollte statusHandler.TaskCompleted sein (this); Gibt es einen Fall, in dem die Methode run nicht aufgerufen würde, aber die Methode done()? – nos

+0

Danke für den Fehler. Ich habe den Anruf im finally-Block behoben. Ja, es ist möglich, dass done() aufgerufen werden kann, ohne dass run() aufgerufen wurde. Siehe FutureTask # SynC# innerCancel (boolesch). Dort, vorausgesetzt, dass die Aufgabe nicht beendet wurde, was beinhaltet, dass sie nie gestartet wurde, können Sie sehen, dass done() aufgerufen wird. Beachten Sie, dass done() für jede FutureTask-Instanz null oder einmal aufgerufen wird: null, wenn weder run() noch cancel() einmal aufgerufen werden. – seh

+0

Scheint jedoch, wenn Sie es an einen Executor übergeben done() wird aufgerufen, wenn die Aufgabe abgebrochen wird, bevor es ausgeführt wird - obwohl der Executor nicht darüber weiß. Der Executor kennt nur Runnables/Callables. So wird run() aufgerufen, wenn es schließlich runnable wird in diesem Fall FutureTask # run() tut im Wesentlichen nichts. – nos

2

Ihr Problem ist, dass Ihre zukünftigen Aufgaben immer noch ausgeführt werden, nachdem Sie auf sie abgebrochen haben, richtig?

Nachdem eine Aufgabe an den Executor-Service übergeben wurde, sollte sie vom Executor verwaltet werden. (Sie können eine einzelne Aufgabe dennoch abbrechen, wenn Sie möchten.) Sie sollten die Ausführung mit der executor shutdownNow method abbrechen. (Dadurch wird die Cancel-Methode aller übergebenen Tasks aufgerufen.) Ein Shutdown würde trotzdem alle übergebenen Tasks ausführen.

Der Executor weiß ansonsten nicht, dass eine Aufgabe abgebrochen wurde. Es wird die Methode unabhängig vom internen Zustand der zukünftigen Aufgabe aufrufen.

Der einfachste Ansatz wäre, das Executor-Framework so zu verwenden, wie es ist, und einen Callable Decorator zu schreiben.

class CallableDecorator{ 

    CallableDecorator(Decorated decorated){ 
    ... 
    } 

    setTask(FutureTask task){ 
    statusHandler.taskCreated(task); 
    } 

    void call(){ 
    try{ 
     statusHandler.taskRunning(task); 
     decorated.call(); 
    }finally{ 
     statusHandler.taskCompleted(task); 
    } 
    } 
} 

Das einzige Problem ist, dass Aufgabe nicht im Konstruktor des Dekorators sein kann. (Dies ist ein Parameter des Konstruktors für zukünftige Aufgaben.) Um diesen Zyklus zu unterbrechen, müssen Sie einen Setter oder einen Proxy verwenden, der mit der Konstruktorinjektion arbeitet. Vielleicht wird dies für den Rückruf überhaupt nicht benötigt und Sie können sagen: statusHandler.callableStarted(decorated).

Abhängig von Ihren Anforderungen müssen Sie möglicherweise Ausnahmen und Interrupts signalisieren.

Grundausführung:

class CallableDecorator<T> implements Callable<T> { 

    private final Callable<T> decorated; 
    CallableDecorator(Callable<T> decorated){ 
     this.decorated = decorated; 
    } 

    @Override public T call() throws Exception { 
     out.println("before " + currentThread()); 
     try { 
      return decorated.call(); 
     }catch(InterruptedException e){ 
      out.println("interupted " + currentThread()); 
      throw e; 
     } 
     finally { 
      out.println("after " + currentThread()); 
     } 
    } 
} 

ExecutorService executor = newFixedThreadPool(1); 
Future<Long> normal = executor.submit(new CallableDecorator<Long>(
     new Callable<Long>() { 
      @Override 
      public Long call() throws Exception { 
       return System.currentTimeMillis(); 
      } 
     })); 
out.println(normal.get()); 

Future<Long> blocking = executor.submit(new CallableDecorator<Long>(
     new Callable<Long>() { 
      @Override 
      public Long call() throws Exception { 
       sleep(MINUTES.toMillis(2)); // blocking call 
       return null; 
      } 
     })); 

sleep(SECONDS.toMillis(1)); 
blocking.cancel(true); // or executor.shutdownNow(); 

Ausgang:

before Thread[pool-1-thread-1,5,main] 
after Thread[pool-1-thread-1,5,main] 
1259347519104 
before Thread[pool-1-thread-1,5,main] 
interupted Thread[pool-1-thread-1,5,main] 
after Thread[pool-1-thread-1,5,main] 
+2

ernsthaft, möchten Sie eine einzelne Callable steuern, indem Sie den gesamten Executor töten? das würde großartig funktionieren, wenn Sie einen Executor pro Callable hätten. In diesem Fall, warum sogar Executoren verwenden? In der Regel möchten Sie Jobs selektiv abbrechen, daher die Stornomethode bei Future. – james

+0

Nein. Der Beispielcode zeigt, wie der Executor oder eine einzelne Task beendet wird. –