167

Ich versuche Java ThreadPoolExecutor Klasse zu verwenden, um eine große Anzahl schwerer Aufgaben mit einer festen Anzahl von Threads auszuführen. Jede der Aufgaben hat viele Stellen, an denen sie aufgrund von Ausnahmen fehlschlagen kann.Behandeln von Ausnahmen von Java-ExecutorService-Tasks

Ich habe ThreadPoolExecutor unterklassifiziert und ich habe die afterExecute-Methode überschrieben, die alle nicht abgefangenen Ausnahmen beim Ausführen einer Aufgabe bereitstellen soll. Aber ich kann es nicht funktionieren lassen.

Zum Beispiel:

public class ThreadPoolErrors extends ThreadPoolExecutor { 
    public ThreadPoolErrors() { 
     super( 1, // core threads 
       1, // max threads 
       1, // timeout 
       TimeUnit.MINUTES, // timeout units 
       new LinkedBlockingQueue<Runnable>() // work queue 
     ); 
    } 

    protected void afterExecute(Runnable r, Throwable t) { 
     super.afterExecute(r, t); 
     if(t != null) { 
      System.out.println("Got an error: " + t); 
     } else { 
      System.out.println("Everything's fine--situation normal!"); 
     } 
    } 

    public static void main(String [] args) { 
     ThreadPoolErrors threadPool = new ThreadPoolErrors(); 
     threadPool.submit( 
       new Runnable() { 
        public void run() { 
         throw new RuntimeException("Ouch! Got an error."); 
        } 
       } 
     ); 
     threadPool.shutdown(); 
    } 
} 

Die Ausgabe dieses Programms ist "Alles ist in Ordnung - Situation normal!" obwohl das einzige Runnable, das an den Thread-Pool übergeben wurde, eine Ausnahme auslöst. Irgendwelche Hinweise darauf, was hier vor sich geht?

Danke!

Antwort

121

Vom docs:

Hinweis: Bei Aktionen in Aufgaben eingeschlossen sind (wie FutureTask) entweder explizit oder über Methoden wie reichen, fangen diese Task-Objekte und Rechenausnahmen pflegen, und so tun sie n ot Ursache abrupt Terminierung, und die internen Ausnahmen werden nicht an diese Methode übergeben.

Wenn Sie ein Runnable senden, wird es in eine Zukunft verpackt.

Ihre AfterExecute sollte wie folgt sein: für dieses Verhalten

protected void afterExecute(Runnable r, Throwable t) { 
     super.afterExecute(r, t); 
     if (t == null && r instanceof Future<?>) { 
     try { 
      Future<?> future = (Future<?>) r; 
      if (future.isDone()) { 
      future.get(); 
      } 
     } catch (CancellationException ce) { 
      t = ce; 
     } catch (ExecutionException ee) { 
      t = ee.getCause(); 
     } catch (InterruptedException ie) { 
      Thread.currentThread().interrupt(); // ignore/reset 
     } 
     } 
     if (t != null) { 
      System.out.println(t); 
     } 
} 
+4

Danke, ich habe diese Lösung verwendet. Zusätzlich, falls jemand interessiert ist: Andere haben vorgeschlagen, den ExecutorService nicht abzuleiten, aber ich habe es trotzdem getan, weil ich Tasks überwachen wollte, anstatt sie alle zu beenden und dann get() auf allen zu starten Futures zurückgegeben. – Tom

+1

Eine andere Herangehensweise zum Unterklassifizieren des Executors besteht in der Unterklasse von FutureTask und dem Überschreiben der 'done'-Methode – nos

+1

Tom >> Können Sie bitte Ihren Beispiel-Snippet-Code veröffentlichen, in dem Sie ExecutorService zur Überwachung von Aufgaben nach Abschluss subclasseten ... – jagamot

194

WARNUNG: Es sollte beachtet werden, dass diese Lösung den aufrufenden Thread blockiert.


Wenn Sie durch die Aufgabe geworfen Ausnahmen zu verarbeiten, dann ist es im Allgemeinen besser Callable zu verwenden, anstatt Runnable.

Callable.call() erlaubt geprüfte Ausnahmen zu werfen, und diese gehen zurück an den aufrufenden Thread propagiert:

Callable task = ... 
Future future = executor.submit(task); 
try { 
    future.get(); 
} catch (ExecutionException ex) { 
    ex.getCause().printStackTrace(); 
} 

Wenn Callable.call() eine Ausnahme auslöst, wird dies durch Future.get() in einem ExecutionException und geworfen gewickelt werden.

Dies ist wahrscheinlich der Unterklasse ThreadPoolExecutor vorzuziehen. Sie haben auch die Möglichkeit, die Aufgabe erneut einzureichen, wenn es sich um eine wiederherstellbare Ausnahme handelt.

+19

Dies sollte die akzeptierte Antwort IMHO sein. – Timo

+2

Einverstanden. Dies sollte die akzeptierte Antwort sein. – Joeblackdev

+3

_> Callable.call() darf checked exceptions auslösen, und diese werden an den aufrufenden Thread zurückgegeben: _ Beachten Sie, dass die ausgelöste Exception nur dann an den aufrufenden Thread weitergegeben wird, wenn 'future.get()' oder sein überladene Version wird aufgerufen. – nhylated

-5

Statt Subklassifizieren ThreadPoolExecutor, würde ich es mit einem ThreadFactory Instanz schaffen, die neue Themen erstellt und bietet ihnen eine UncaughtExceptionHandler

+3

Ich habe das auch versucht, aber die Methode uncaughtException scheint nie aufgerufen zu werden. Ich glaube, das liegt daran, dass ein Worker-Thread in der ThreadPoolExecutor-Klasse die Ausnahmen abfängt. – Tom

+5

Die Methode uncaughtException wird nicht aufgerufen, da die Methode submit des ExecutorService die Callable/Runnable in einer Zukunft umschließt. Die Ausnahme wird dort erfasst. –

+0

Bitte geben Sie einen Code und nicht nur Links! –

13

Die Erklärung liegt im javadoc for afterExecute:

Hinweis: Bei Aktionen werden in Aufgaben (wie FutureTask) eingeschlossen entweder explizit oder über Methoden wie reichen, fangen diese Task-Objekte und Rechenausnahmen pflegen, und so tun sie n ot Ursache abrupt Terminierung, und die internen Ausnahmen werden nicht an diese Methode übergeben.

6

Ich verwende VerboseRunnable Klasse von jcabi-log, die alle Ausnahmen schluckt und protokolliert sie. Sehr praktisch, zum Beispiel:

import com.jcabi.log.VerboseRunnable; 
scheduler.scheduleWithFixedDelay(
    new VerboseRunnable(
    Runnable() { 
     public void run() { 
     // the code, which may throw 
     } 
    }, 
    true // it means that all exceptions will be swallowed and logged 
), 
    1, 1, TimeUnit.MILLISECONDS 
); 
7

Ich habe um, indem sie es mit dem mitgelieferten runnable zum Vollstrecker vorgelegt wickeln.

CompletableFuture.runAsync(

     () -> { 
       try { 
         runnable.run(); 
       } catch (Throwable e) { 
         Log.info(Concurrency.class, "runAsync", e); 
       } 
     }, 

     executorService 
); 
1

Wenn Ihr ExecutorService von einer externen Quelle kommt (dh es ist nicht möglich ThreadPoolExecutor Unterklasse und überschreiben afterExecute()), können Sie einen dynamischen Proxy verwenden das gewünschte Verhalten zu erreichen:

public static ExecutorService errorAware(final ExecutorService executor) { 
    return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), 
      new Class[] {ExecutorService.class}, 
      (proxy, method, args) -> { 
       if (method.getName().equals("submit")) { 
        final Object arg0 = args[0]; 
        if (arg0 instanceof Runnable) { 
         args[0] = new Runnable() { 
          @Override 
          public void run() { 
           final Runnable task = (Runnable) arg0; 
           try { 
            task.run(); 
            if (task instanceof Future<?>) { 
             final Future<?> future = (Future<?>) task; 

             if (future.isDone()) { 
              try { 
               future.get(); 
              } catch (final CancellationException ce) { 
               // Your error-handling code here 
               ce.printStackTrace(); 
              } catch (final ExecutionException ee) { 
               // Your error-handling code here 
               ee.getCause().printStackTrace(); 
              } catch (final InterruptedException ie) { 
               Thread.currentThread().interrupt(); 
              } 
             } 
            } 
           } catch (final RuntimeException re) { 
            // Your error-handling code here 
            re.printStackTrace(); 
            throw re; 
           } catch (final Error e) { 
            // Your error-handling code here 
            e.printStackTrace(); 
            throw e; 
           } 
          } 
         }; 
        } else if (arg0 instanceof Callable<?>) { 
         args[0] = new Callable<Object>() { 
          @Override 
          public Object call() throws Exception { 
           final Callable<?> task = (Callable<?>) arg0; 
           try { 
            return task.call(); 
           } catch (final Exception e) { 
            // Your error-handling code here 
            e.printStackTrace(); 
            throw e; 
           } catch (final Error e) { 
            // Your error-handling code here 
            e.printStackTrace(); 
            throw e; 
           } 
          } 
         }; 
        } 
       } 
       return method.invoke(executor, args); 
      }); 
} 
3

Eine andere Lösung wäre Verwenden Sie die ManagedTask und ManagedTaskListener.

Sie benötigen ein aufrufbare oder Runnable, die die Schnittstelle ManagedTask implementiert.

Die Methode getManagedTaskListener gibt die gewünschte Instanz zurück.

public ManagedTaskListener getManagedTaskListener() { 

Und Sie implementieren in ManagedTaskListener die taskDone Methode:

@Override 
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) { 
    if (exception != null) { 
     LOGGER.log(Level.SEVERE, exception.getMessage()); 
    } 
} 

Mehr Details über managed task lifecycle and listener.

0

Dies ist wegen der AbstractExecutorService :: submit wickelt wie Ihr runnable in RunnableFuture (nichts anderes als FutureTask) unter

AbstractExecutorService.java 

public Future<?> submit(Runnable task) { 
    if (task == null) throw new NullPointerException(); 
    RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE//////// 
    execute(ftask); 
    return ftask; 
} 

Dann execute es Worker passieren und Worker.run() die folgenden nennen.

ThreadPoolExecutor.java 

final void runWorker(Worker w) { 
    Thread wt = Thread.currentThread(); 
    Runnable task = w.firstTask; 
    w.firstTask = null; 
    w.unlock(); // allow interrupts 
    boolean completedAbruptly = true; 
    try { 
     while (task != null || (task = getTask()) != null) { 
      w.lock(); 
      // If pool is stopping, ensure thread is interrupted; 
      // if not, ensure thread is not interrupted. This 
      // requires a recheck in second case to deal with 
      // shutdownNow race while clearing interrupt 
      if ((runStateAtLeast(ctl.get(), STOP) || 
       (Thread.interrupted() && 
        runStateAtLeast(ctl.get(), STOP))) && 
       !wt.isInterrupted()) 
       wt.interrupt(); 
      try { 
       beforeExecute(wt, task); 
       Throwable thrown = null; 
       try { 
        task.run();   /////////HERE//////// 
       } catch (RuntimeException x) { 
        thrown = x; throw x; 
       } catch (Error x) { 
        thrown = x; throw x; 
       } catch (Throwable x) { 
        thrown = x; throw new Error(x); 
       } finally { 
        afterExecute(task, thrown); 
       } 
      } finally { 
       task = null; 
       w.completedTasks++; 
       w.unlock(); 
      } 
     } 
     completedAbruptly = false; 
    } finally { 
     processWorkerExit(w, completedAbruptly); 
    } 
} 

Schließlich task.run(); im Aufruf obigen Code wird FutureTask.run() nennen.Hier ist der Exception-Handler-Code, wegen erhalten Sie NICHT die erwartete Ausnahme.

class FutureTask<V> implements RunnableFuture<V> 

public void run() { 
    if (state != NEW || 
     !UNSAFE.compareAndSwapObject(this, runnerOffset, 
            null, Thread.currentThread())) 
     return; 
    try { 
     Callable<V> c = callable; 
     if (c != null && state == NEW) { 
      V result; 
      boolean ran; 
      try { 
       result = c.call(); 
       ran = true; 
      } catch (Throwable ex) { /////////HERE//////// 
       result = null; 
       ran = false; 
       setException(ex); 
      } 
      if (ran) 
       set(result); 
     } 
    } finally { 
     // runner must be non-null until state is settled to 
     // prevent concurrent calls to run() 
     runner = null; 
     // state must be re-read after nulling runner to prevent 
     // leaked interrupts 
     int s = state; 
     if (s >= INTERRUPTING) 
      handlePossibleCancellationInterrupt(s); 
    } 
} 
0

Wenn Sie die Ausführung von Aufgabe überwachen möchten, können Sie spinnen 1 oder 2 Fäden (vielleicht mehr abhängig von der Last) und nutzen sie Aufgaben von einem ExecutionCompletionService Wrapper zu nehmen.

0

Dies funktioniert

  • Es ist aus SingleThreadExecutor abgeleitet, aber man kann es leicht
  • Java 8 lamdas Code anpassen, aber leicht

Es wird erstellen Executor mit einem einzigen zu beheben Thread, der viele Aufgaben bekommen kann; und wird für die aktuelle zu Ende Ausführung warten mit dem nächsten

Bei uncaugth Fehler oder eine Ausnahme beginnen die uncaughtExceptionHandler wird fangen sie

 
public final class SingleThreadExecutorWithExceptions { 

    public static ExecutorService newSingleThreadExecutorWithExceptions(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { 

     ThreadFactory factory = (Runnable runnable) -> { 
      final Thread newThread = new Thread(runnable, "SingleThreadExecutorWithExceptions"); 
      newThread.setUncaughtExceptionHandler((final Thread caugthThread,final Throwable throwable) -> { 
       uncaughtExceptionHandler.uncaughtException(caugthThread, throwable); 
      }); 
      return newThread; 
     }; 
     return new FinalizableDelegatedExecutorService 
       (new ThreadPoolExecutor(1, 1, 
         0L, TimeUnit.MILLISECONDS, 
         new LinkedBlockingQueue(), 
         factory){ 


        protected void afterExecute(Runnable runnable, Throwable throwable) { 
         super.afterExecute(runnable, throwable); 
         if (throwable == null && runnable instanceof Future) { 
          try { 
           Future future = (Future) runnable; 
           if (future.isDone()) { 
            future.get(); 
           } 
          } catch (CancellationException ce) { 
           throwable = ce; 
          } catch (ExecutionException ee) { 
           throwable = ee.getCause(); 
          } catch (InterruptedException ie) { 
           Thread.currentThread().interrupt(); // ignore/reset 
          } 
         } 
         if (throwable != null) { 
          uncaughtExceptionHandler.uncaughtException(Thread.currentThread(),throwable); 
         } 
        } 
       }); 
    } 



    private static class FinalizableDelegatedExecutorService 
      extends DelegatedExecutorService { 
     FinalizableDelegatedExecutorService(ExecutorService executor) { 
      super(executor); 
     } 
     protected void finalize() { 
      super.shutdown(); 
     } 
    } 

    /** 
    * A wrapper class that exposes only the ExecutorService methods 
    * of an ExecutorService implementation. 
    */ 
    private static class DelegatedExecutorService extends AbstractExecutorService { 
     private final ExecutorService e; 
     DelegatedExecutorService(ExecutorService executor) { e = executor; } 
     public void execute(Runnable command) { e.execute(command); } 
     public void shutdown() { e.shutdown(); } 
     public List shutdownNow() { return e.shutdownNow(); } 
     public boolean isShutdown() { return e.isShutdown(); } 
     public boolean isTerminated() { return e.isTerminated(); } 
     public boolean awaitTermination(long timeout, TimeUnit unit) 
       throws InterruptedException { 
      return e.awaitTermination(timeout, unit); 
     } 
     public Future submit(Runnable task) { 
      return e.submit(task); 
     } 
     public Future submit(Callable task) { 
      return e.submit(task); 
     } 
     public Future submit(Runnable task, T result) { 
      return e.submit(task, result); 
     } 
     public List> invokeAll(Collection> tasks) 
       throws InterruptedException { 
      return e.invokeAll(tasks); 
     } 
     public List> invokeAll(Collection> tasks, 
              long timeout, TimeUnit unit) 
       throws InterruptedException { 
      return e.invokeAll(tasks, timeout, unit); 
     } 
     public T invokeAny(Collection> tasks) 
       throws InterruptedException, ExecutionException { 
      return e.invokeAny(tasks); 
     } 
     public T invokeAny(Collection> tasks, 
           long timeout, TimeUnit unit) 
       throws InterruptedException, ExecutionException, TimeoutException { 
      return e.invokeAny(tasks, timeout, unit); 
     } 
    } 



    private SingleThreadExecutorWithExceptions() {} 
} 
Verwandte Themen