2017-07-29 9 views
0

Ich benutze Java ExecutorService (ThreadPool), um eine Aufgabe & UI zu aktualisieren, während eine bestimmte Aktivität im Vordergrund (sichtbar) ist.Java ExecutorService - Aufgabe/Callable nicht aufheben/unterbrechen

Problem: Was ich will, ist, wenn der Benutzer auf eine andere Tätigkeit schaltet ich will alle, die Aufgabe stoppen/abbrechen (ob der Warteschlange oder Laufen). Und dazu muss ich entweder ExecutorService Shutdown/ShutdownNow-Methode oder Abbrechen (true) auf Future-Objekt von ExecutorService senden Methode nach Überprüfung des Future-Objektstatus durch Aufruf von isDone(). Dies würde das entsprechende Thread-Flag für die Unterbrechung auf TRUE setzen, was ich überprüfen muss (Thread.currentThread.isInterrupted()) in meiner aufrufbaren Implementierung, um festzustellen, ob die Task/der Thread unterbrochen wurde. Problem ist, ob i ExecutorService Shutdown-Methode aufrufen oder zukünftige cancel (true) Verfahren in beiden Fällen selten 1 von 10-mal den Gewinde Unterbrechungs-Flag auf WAHR setzt die schließlich zu Speicherlecks führt usw.

Code:

Thread Singleton-Implementierung (cancelAll-to Aufgaben & shutdownExecutor-to Shutdown ExecutorService abbrechen):

private static class ThreadPoolManager { 

    private ExecutorService executorService; 
    private List<Future> queuedFutures; 
    private BlockingQueue<Runnable> blockingQueue; 

    private static ThreadPoolManager instance; 

    private ThreadPoolManager() { 
     MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)"); 
     queuedFutures = new ArrayList<>(); 
     blockingQueue = new LinkedBlockingDeque<>(); 
     executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue); 
    } 

    static { 
     instance = new ThreadPoolManager(); 
    } 

    public static void submitItemTest(Callable<Object> callable) { 
     MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test"); 
     if(instance.executorService.isShutdown()){ 
      instance=new ThreadPoolManager(); 
     } 
     Future future = instance.executorService.submit(callable); 
     instance.queuedFutures.add(future); 
    } 

    public static void submitTestAll(Callable<Object> callable) { 
     MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all"); 
     if(instance.executorService.isShutdown()){ 
      instance=new ThreadPoolManager(); 
     } 
     cancelAll(); 
     Future future = instance.executorService.submit(callable); 
     instance.queuedFutures.add(future); 
    } 

    public static void cancelAll() { 
     MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks"); 
     instance.blockingQueue.clear(); 
     for (Future future : instance.queuedFutures) { 
      if (!future.isDone()) { 
       boolean cancelled = future.cancel(true); 
       MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled); 
      } 
     } 
     instance.queuedFutures.clear(); 
    } 

    public static void shutdownExecutor(){ 
     MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool"); 
     instance.executorService.shutdownNow(); 
    } 
} 

aufrufbare Implementation (normale Iteration & wenn Klausel für die Unterbrechung zu überprüfen) :

private Callable<Object> getTestAllCallable() { 
     return new Callable<Object>() { 
      @Override 
      public Object call() { 
       for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) { 
        if (!Thread.currentThread().isInterrupted()) { 
          //someWork 

        } else { 
         MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling"); 
         return null; 
        } 
       } 
       return null; 
      } 
     }; 
    } 

Aktivität/Fragment OnStop Implementierung (für Aufgabe & Shutdown Aufruf abzubrechen):

@Override 
public void onStop() { 
    MyLogger.log(MyLogger.LOG_TYPE.INFO, "onStop called"); 
    ThreadPoolManager.cancelAll(); 
    ThreadPoolManager.shutdownExecutor(); 
    super.onStop(); 
} 

Update:

vorgenommene Änderungen:

  1. Verschoben von der Nutzung Durchführbar statt aufrufbar .

  2. Jetzt nicht Singleton für ExecutorService.

    private class ThreadPoolManager { 
    
        private ExecutorService executorService; 
        private List<Future> queuedFutures; 
        private BlockingQueue<Runnable> blockingQueue; 
    
        private ThreadPoolManager() { 
         MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)"); 
         queuedFutures = new ArrayList<>(); 
         blockingQueue = new LinkedBlockingDeque<>(); 
         executorService =getNewExecutorService(); 
        } 
    
        private ExecutorService getNewExecutorService(){ 
         return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue); 
        } 
    
        private void submitItemTest(Runnable runnable) { 
         MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test"); 
         if(executorService.isShutdown()){ 
          executorService=getNewExecutorService(); 
         } 
         Future future = executorService.submit(runnable); 
         queuedFutures.add(future); 
        } 
    
        private void submitTestAll(Runnable runnable) { 
         MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all"); 
         if(executorService.isShutdown()){ 
          executorService=getNewExecutorService(); 
         } 
         cancelAll(); 
         Future future = executorService.submit(runnable); 
         queuedFutures.add(future); 
        } 
    
        private void cancelAll() { 
         MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks"); 
         blockingQueue.clear(); 
         for (Future future : queuedFutures) { 
          if (!future.isDone()) { 
           boolean cancelled = future.cancel(true); 
           MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled); 
          } 
         } 
         queuedFutures.clear(); 
        } 
    
        private void shutdownExecutor(){ 
         MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool"); 
         executorService.shutdownNow(); 
         blockingQueue.clear(); 
         queuedFutures.clear(); 
        } 
    } 
    

die Schuldigen gefunden, aber noch nicht die Lösung. Nach 2 ist die Implementierung von Runnables 1 von denen funktioniert (isInterrupted gibt true zurück oder kommt InterupptedException und als Task beendet), aber nicht andere.

Arbeits Runnable (Ich habe es für die Prüfung):

new Runnable() { 
      @Override 
      public void run() { 
        int i=0; 
        while(!Thread.currentThread().isInterrupted()){ 
         try { 
          System.out.println(i); 
          Thread.currentThread().sleep(2000); 
         } catch (InterruptedException e) { 
          MyLogger.log(MyLogger.LOG_TYPE.DEBUG,"Interrupted"); 
          return; 
         } 
         i++; 
        } 
       } 
      } 

Problem (Ist-Code i verwenden möchten):

new Runnable(){ 
      @Override 
      public void run() { 
       for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) { 
        if (!Thread.currentThread().isInterrupted()) { 

        } else { 
         MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Thread Interrupted (Cancelled)"); 
         break; 
        } 
       } 
      } 
     }; 

Und 1 wahrscheinliche Lösung wäre Variable zu verwenden (boolean) als eine Unterbrechungsflagge innerhalb des runnable, die ich als ein letzter Ausweg betrachten werde, aber glücklich wäre, den Fehler kennen zu lernen.

Antwort

0

Solution (Way out): So schließlich ging ich auf spezielle interne Flag (boolean) als Fadenunterbrechung zu verwenden Flag, das bei jeder Iteration von MyRunnable überprüft wird (benutzerdefinierte Implementierung von runnable mit benutzerdefiniertem Flag, so dass jedem runnable ein Flag zugeordnet ist). Dann, wenn das Abbrechen von Threads unter ExecutorService (ThreadPool) erforderlich ist, iteriere ich über alle Future-Objekte und hole ihr MyRunnable zu und setze dann ihr Unterbrechungsflag (Custom-Flag) auf true, das den Thread unterbricht/schließt.

ThreadPoolManager:

private class ThreadPoolManager { 

     private ExecutorService executorService; 
     private final Map<Future,MyRunnable> queuedFutures; 
     private final BlockingQueue<Runnable> blockingQueue; 

     private ThreadPoolManager() { 
      MyLogger.log(DEBUG, "Threadpool-created(constructor)"); 
      queuedFutures = new HashMap<>(); 
      blockingQueue = new LinkedBlockingDeque<>(); 
      executorService = getNewExecutorService(); 
     } 

     private ExecutorService getNewExecutorService() { 
      return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue); 
     } 

     private void submitItemTest(MyRunnable runnable) { 
      MyLogger.log(DEBUG, "Threadpool-submitted item test"); 
      if (executorService.isShutdown()) { 
       executorService = getNewExecutorService(); 
      } 
      Future future = executorService.submit(runnable); 
      queuedFutures.put(future,runnable); 
     } 

     private void submitTestAll(MyRunnable runnable) { 
      MyLogger.log(DEBUG, "Threadpool-submitted test all"); 
      if (executorService.isShutdown()) { 
       executorService = getNewExecutorService(); 
      } 
      cancelAll(); 
      Future future = executorService.submit(runnable); 
      queuedFutures.put(future,runnable); 
     } 

     private void cancelAll() { 
      MyLogger.log(DEBUG, "ThreadPool: Cancelling all future tasks"); 
      blockingQueue.clear(); 
      for (Future future : queuedFutures.keySet()) { 
       if (!future.isDone()) { 
        queuedFutures.get(future).continueRunning=false; 
        MyLogger.log(DEBUG, "Cancelled"); 
       } 
      } 
      queuedFutures.clear(); 
     } 

     private void shutdownExecutor() { 
      cancelAll(); 
      MyLogger.log(DEBUG, "ThreadPool: Shuttingdown threadpool"); 
      executorService.shutdown(); 
     } 
    } 

MyRunnable (abstrakte Klasse, die fahrbare implementiert):

private abstract class MyRunnable implements Runnable { 
     boolean continueRunning=true; 
    } 

MyRunnable (Instanz der abstrakten Klasse MyRunnable):

new MyRunnable() { 
     @Override 
     public void run() { 
      for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) { 
       if (continueRunning) { 
         //someWork 
       } else { 
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadPool: Pool Thread Interrupted (closing down)"); 
        break; 
       } 
      } 
      System.out.println("ThreadPool: Test complete"); 
     } 
    }; 

Jetzt ruft threadPoolManager.shutdownExecutor() shutdown/unterbricht alle laufenden Threads.

0

Gemäß der Dokumentation ExecutorService wird das Herunterfahren einer ausgeführten Aufgabe nach bestem Wissen und Gewissen durchgeführt.

Daher, wenn Sie ExecutorService.shutdownNow() aufrufen, wird versuchen, alle derzeit ausgeführten Tasks herunterzufahren. Jede Aufgabe wird weiter ausgeführt, bis sie einen Punkt erreicht, an dem sie feststellt, dass sie unterbrochen wurde.

Um Ihre Fäden, um sicherzustellen, werden diesen Punkt in einem frühen Stadium zu erreichen ist es eine gute Idee, einen Scheck in Ihrer Schleife hinzufügen, ob der Faden wie folgt interuppted ist:

Thread.currentThread().isInterrupted(); 

auf jeden Indem diesen Aufruf Iteration Ihre Threads erkennt die Unterbrechung mit einem kurzen Intervall von der tatsächlichen Interupption.

Und so Ihre modifizierte Callable Code wird wie folgt aussehen:

private Callable<Object> getTestAllCallable() { 
    return new Callable<Object>() { 
     @Override 
     public Object call() { 
      for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) { 
       if(Thread.currentThread().isInterrupted()) { 
        return null; 
       } 
       if(someCondition) { 
        //someWork 
       } else { 
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling"); 
        return null; 
       } 
      } 
      return null; 
     } 
    }; 
} 

By the way, gibt es keinen Punkt bei der Verwendung eines Callable, wenn Sie nicht beabsichtigen, einen beliebigen Wert aus dem call() Methode zurückzukehren. wenn Sie die parametrisierte Typ in Ihrer Aufgabe benötigen erstellen Sie einfach eine parametrisierte Runnable wie folgt:

public class ParameterizedRunnable<T> implements Runnable { 
    private final T t; 

    public ParameterizedRunnable(T t) { 
     this.t = t; 
    } 

    public void run() { 
     //do some work here 
    } 
} 
+0

Entschuldigung, ich habe meinen Code jetzt geändert, um zu enthalten, was Sie gesagt haben, tatsächlich tat ich das bereits, wie in der Problembeschreibung angegeben, wurde aber zusammen mit anderem Code gelöscht, der nicht relevant war. –

+0

Und das löst Ihr Problem nicht? – zuckermanori

+0

Ich war aufrufbar statt mit Runnable mit dem Gedanken zu Future-Objekt zu erhalten, während die auf ExecutorService abrufbar und nun wissen, kann auch mit Runnable getan werden. Danke, dass Sie das sagen, ich werde den Code ändern, um runnable zu verwenden. –

Verwandte Themen