2017-11-03 2 views
2

Ich habe einen Thread, der jedes Mal erneut gesendet werden muss, wenn der Auftrag abgeschlossen ist oder aufgrund eines Fehlers abgestürzt ist.Wie Thread erneut an ExecutorService übergeben, wenn es fehlgeschlagen ist/fertig

Der Haupt-Thread darf nicht blockiert werden.

Gewinde müssen bei Bedarf abgebrochen werden.

Was ist die beste Lösung?

public class POC { 
public static void main(String[] args) { 
    System.out.println("Init"); 
    new SomeService().waitForEvents(); 
    System.out.println("main not blocked and do other stuff"); 
} 

static class SomeService { 
    public void waitForEvents() { 
     ExecutorService executor = Executors.newSingleThreadExecutor(); 
     executor.submit(new AlwaysRunningJob("importantParamForJob")); 
     // MISSING LOGIC 
     // // if AlwaysRunningJob got error or finished, 
     // // wait 2 seconds recreate and resubmit it 
     // executor.submit(new AlwaysRunningJob("importantParamForJob")); 
    } 

    class AlwaysRunningJob implements Runnable { 
     String importantParamForJob; 

     public AlwaysRunningJob(String importantParamForJob) { 
      this.importantParamForJob = importantParamForJob; 
     } 

     @Override 
     public void run() { 
      Thread.currentThread().setName("AlwaysRunningJob Job"); 
      while (!Thread.currentThread().isInterrupted()) { 
       // keep waiting for events until 
       // exception is thrown. or something bad happened 
       try { 
        Thread.sleep(5000); 
        System.out.println("keep working on" + importantParamForJob); 
       } catch (InterruptedException e) { 
        // exit if it failed 
        return; 
       } 
      } 
      System.out.println("Finished run!"); 
     } 
    } 
} 

}

+0

Vielleicht können Sie UncaughtExceptionHandler verwenden für dieses Threads, die Ausnahme fangen und erneut einreichen der Thread nochmal? –

+0

Sie senden keine Threads an die Executoren. – Raedwald

Antwort

3

Ich würde erstreckt ThreadPoolExecutor, die die protected void afterExecute(Runnable r, Throwable t) standardmäßig implementiert wird diese Methode nicht tue alles, aber du könntest etwas tun wie:

Aber mit dieser Zukunft sind nutzlos es ist mehr wie ein Feuer und vergessen.

Als @NikitataGorbatchevski wird es nicht funktionieren, wenn Sie eine Callable verwenden. Also hier ist eine Version, die beide mit Runnable und Callable umgehen kann. Indead FutureTask kann nicht erneut ausgeführt werden, wenn ein Fehler auftritt (I wieder verwendet Code für das Anschlagen von FutureTask warten und nicht so sicher es):

public class RetryExecutor extends ThreadPoolExecutor { 
    private final long maxRetries; 
    private Map<Runnable, Integer> retries = new ConcurrentHashMap<>(); 

    public RetryExecutor(int corePoolSize, int maximumPoolSize, long maxRetries, 
         long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { 
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); 
     this.maxRetries = maxRetries; 
    } 

    @Override 
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 
     return new RetryFutureTask<>(runnable, value); 
    } 

    @Override 
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 
     return new RetryFutureTask<>(callable); 
    } 

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
     super.afterExecute(r, t); 
     if (t != null && shouldRetry(r)) { 
      retry(r); 
     } else if (t == null && r instanceof Future<?>) { 
      try { 
       Object result = ((Future<?>) r).get(); 
      } catch (CancellationException | ExecutionException e) { 
       // you should log the error 
       if (shouldRetry(r)) { 
        retry(r); 
       } else { 
        retries.remove(r); 
       } 
      } catch (InterruptedException ie) { 
       Thread.currentThread().interrupt(); // ignore/reset 
      } 
     } else { 
      retries.remove(r); 
     } 
    } 

    private boolean shouldRetry(Runnable r) { 
     final Integer nbRetries = retries.getOrDefault(r, 0); 
     return nbRetries < maxRetries; 
    } 

    private void retry(Runnable r) { 
     final Integer nbRetries = retries.getOrDefault(r, 0); 
     retries.put(r, nbRetries + 1); 
     this.execute(r); 
    } 

    private static class RetryFutureTask<V> implements RunnableFuture<V> { 
     private static final int NEW = 0; 
     private static final int RUNNING = 1; 
     private static final int ERROR = 2; 
     private static final int FINISHED = 3; 
     private static final int INTERRUPTED = 4; 
     private final AtomicInteger state = new AtomicInteger(NEW); 
     private final AtomicReference<Thread> runner = new AtomicReference<>(); 
     private final AtomicReference<WaitNode> waiters = new AtomicReference<>(); 
     private final Callable<V> callable; 
     private Exception error; 
     private V result; 

     public RetryFutureTask(Runnable runnable, V result) { 
      this.callable = Executors.callable(runnable, result); 
     } 

     public RetryFutureTask(Callable<V> callable) { 
      this.callable = callable; 
     } 

     @Override 
     public void run() { 
      try { 
       // If not already running 
       if (runner.compareAndSet(null, Thread.currentThread())) { 
        state.set(RUNNING); 
        result = this.callable.call(); 
        state.compareAndSet(RUNNING, FINISHED); 
       } 
      } catch (Exception e) { 
       error = e; 
       state.compareAndSet(RUNNING, ERROR); 
       finishCompletion(); 
      } finally { 
       runner.set(null); 
      } 
     } 

     @Override 
     public boolean cancel(boolean mayInterruptIfRunning) { 
      if (state.get() == RUNNING || state.get() == INTERRUPTED) { 
       return false; 
      } 
      try { 
       Thread t = runner.get(); 
       if (mayInterruptIfRunning && t != null) { 
        t.interrupt(); 
       } 
      } finally { 
       state.set(INTERRUPTED); 
       finishCompletion(); 
      } 
      return true; 
     } 

     @Override 
     public boolean isCancelled() { 
      return state.get() == INTERRUPTED; 
     } 

     @Override 
     public boolean isDone() { 
      return state.get() > RUNNING; 
     } 

     @Override 
     public V get() throws InterruptedException, ExecutionException { 
      if (state.get() <= RUNNING) { 
       awaitDone(false, 0L); 
      } 
      return resolve(); 
     } 

     @Override 
     public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 
      if (state.get() <= RUNNING) { 
       awaitDone(true, unit.toNanos(timeout)); 
      } 
      return resolve(); 
     } 

     private V resolve() throws ExecutionException, InterruptedException { 
      if (state.get() == ERROR) { 
       throw new ExecutionException(error); 
      } else if (state.get() == INTERRUPTED) { 
       throw new InterruptedException(); 
      } 
      return result; 
     } 

     private void finishCompletion() { 
      for (WaitNode q; (q = waiters.get()) != null;) { 
       if (waiters.compareAndSet(q, null)) { 
        for (;;) { 
         Thread t = q.thread; 
         if (t != null) { 
          q.thread = null; 
          LockSupport.unpark(t); 
         } 
         WaitNode next = q.next; 
         if (next == null) 
          break; 
         q.next = null; // unlink to help gc 
         q = next; 
        } 
       break; 
       } 
      } 
     } 

     private void awaitDone(boolean timed, long nanos) throws InterruptedException { 
      final long deadline = timed ? System.nanoTime() + nanos : 0L; 
      WaitNode q = null; 
      boolean queued = false; 
      for (; ;) { 
       if (Thread.interrupted()) { 
        removeWaiter(q); 
        throw new InterruptedException(); 
       } 

       int s = state.get(); 
       if (s > RUNNING) { 
        if (q != null) 
         q.thread = null; 
        return; 
       } else if (q == null) 
        q = new WaitNode(); 
       else if (!queued) 
        queued = waiters.compareAndSet(q.next, q); 
       else if (timed) { 
        nanos = deadline - System.nanoTime(); 
        if (nanos <= 0L) { 
         removeWaiter(q); 
         return; 
        } 
        LockSupport.parkNanos(this, nanos); 
       } else 
        LockSupport.park(this); 
      } 
     } 

     private void removeWaiter(WaitNode node) { 
      if (node != null) { 
       node.thread = null; 
       retry: 
       for (;;) {   // restart on removeWaiter race 
        for (WaitNode pred = null, q = waiters.get(), s; q != null; q = s) { 
         s = q.next; 
         if (q.thread != null) 
          pred = q; 
         else if (pred != null) { 
          pred.next = s; 
          if (pred.thread == null) // check for race 
           continue retry; 
         } 
         else if (!waiters.compareAndSet(q, s)) 
          continue retry; 
        } 
        break; 
       } 
      } 
     } 

     static final class WaitNode { 
      volatile Thread thread; 
      volatile WaitNode next; 
      WaitNode() { thread = Thread.currentThread(); } 
     } 
    } 
} 
+0

Danke JEY, deine Lösung scheint die beste für mich zu sein. –

+0

Habt ihr versucht es zu machen? Als tatsächlicher Typ von 'Runnable'' r' ist ein 'FutureTask'-Neulauf als Ergebnis wirksam, wenn er bereits berechnet wurde. Wenn Sie der Methode 'afterExecute()' eine Protokollierung hinzufügen, sehen Sie, dass diese Methode für 'nbRetries'-Zeiten aufruft und ohne die eigentliche Ausführung der ursprünglichen Aufgabe beendet wird. Wenn Sie versuchen werden, die Aufgabe mit dieser Logik erfolgreich zu beenden, erhalten Sie eine Endlosschleife, was noch schlimmer ist. –

+0

@NikitaGorbachevski verstehe deinen Kommentar nicht. Die Wiederholung wird nur auftreten, wenn während der Ausführung eine Ausnahme ausgelöst wird. Die vom ersten Senden zurückgegebene Zukunft ist jedoch das Ergebnis der ersten Ausführung und wird nicht nachher aktualisiert. Es ist mehr ein Feuer und vergessen mit Wiederholung – JEY

1

ich etwas wie dieses würde vorschlagen:

public static void main(String[] args) throws InterruptedException { 
    ExecutorService executorService = Executors.newSingleThreadExecutor(); 
    executorService.submit(new RepeatableWorker()); 

    System.out.println("Main does other work"); 
    Thread.sleep(3300); 
    System.out.println("Main work was finished, time to exit"); 

    // shutdownNow interrupts running threads 
    executorService.shutdownNow(); 
    executorService.awaitTermination(1, TimeUnit.SECONDS); 
    } 

    public static class RepeatableWorker extends Worker { 
    @Override 
    public void run() { 
     while (!Thread.currentThread().isInterrupted()) { 
     boolean error = false; 
     Exception ex = null; 
     try { 
      // In some cases it's make sense to run this method in a separate thread. 
      // For example if you want to give some time to the last worker thread to complete 
      // before interrupting it from repeatable worker 
      super.run(); 
     } catch (Exception e) { 
      error = true; 
      ex = e; 
     } 

     if (Thread.currentThread().isInterrupted()) { 
      System.out.println("worker was interrupted"); 
      // just exit as last task was interrupted 
      continue; 
     } 

     if (!error) { 
      System.out.println("worker task was finished normally"); 
     } else { 
      System.out.println("worker task was finished due to error " + ex.getMessage()); 
     } 
     // wait some time before the next start 
     try { 
      Thread.sleep(100); 
     } catch (InterruptedException e) { 
      System.out.println("Repeatable worker was interrupted"); 
      // ok we were interrupted 
      // restore interrupted status and exit 
      Thread.currentThread().interrupt(); 
     } 
     } 
     System.out.println("repeatable task was finished"); 
    } 
    } 

    public static class Worker implements Runnable { 
    @Override 
    public void run() { 
     try { 
     // emulate some work 
     Thread.sleep(500L); 
     if (new Random().nextBoolean()) { 
      throw new RuntimeException("ouch"); 
     } 
     } catch (InterruptedException e) { 
     // restore interrupted status 
     Thread.currentThread().interrupt(); 
     } 
    } 
    } 
1

Wenn du mit dem ExecutorService kleben will ich von Runnable zu Callable wechseln würden. Beim Einreichen eines Callable erhalten Sie eine Future Sie können das Ergebnis (oder die Ausnahme) des Auftragscodes in Callable erhalten.

So können Sie erkennen, ob der Auftrag erfolgreich ausgeführt wurde oder nicht, und erneut vorlegen den Job, wenn Sie benötigen:

static class SomeService { 
    public void waitForEvents() { 
     ExecutorService executor = Executors.newSingleThreadExecutor(); 
     Future<Void> future = executor.submit(new AlwaysRunningJob("importantParamForJob")); 

     try { 
      future.get(); // this call waits until the Callable has finished (or failed) 
     } catch (InterruptedException | ExecutionException e) { 
      // Error -> resubmit 
      e.printStackTrace(); 
     } 
     // No error -> do something else 
    } 

    class AlwaysRunningJob implements Callable<Void> { 
     String importantParamForJob; 

     public AlwaysRunningJob(String importantParamForJob) { 
      this.importantParamForJob = importantParamForJob; 
     } 

     @Override 
     public Void call() throws Exception { 
      Thread.currentThread().setName("AlwaysRunningJob Job"); 
      while (!Thread.currentThread().isInterrupted()) { 
       // keep waiting for events until 
       // exception is thrown. or something bad happened 
       try { 
        Thread.sleep(5000); 
        System.out.println("keep working on" + importantParamForJob); 
       } catch (InterruptedException e) { 
        // exit if it failed 
        return null; 
       } 
      } 
      System.out.println("Finished run!"); 
      return null; 
     } 
    } 
} 
+0

Der Haupt-Thread ist blockiert! – JEY

+0

Es erfüllt nicht "Der Haupt-Thread darf nicht blockiert werden." –

+0

Sie könnten einen separaten Thread verwenden, der darauf wartet, dass die Zukunft abgeschlossen wird. Um ehrlich zu sein, ist ein ExecutorService jedoch der falsche Weg. Verwenden Sie besser einen dedizierten Thread/Runable in einer Endlosschleife. Alternativ eine Time/TimerTask-Kombination. Dort müssen Sie die Aufgaben nicht neu planen, sondern können automatisch alle x Millisekunden ausgeführt werden. – JMax

Verwandte Themen