2017-03-10 4 views
1

Ich habe eine BlockingQueue von Runnable - Ich kann einfach alle Aufgaben mit einer der TaskExecutor Implementierungen ausführen, und alle werden parallel ausgeführt werden. aber einige Runnable hängt von anderen ab, es bedeutet, dass sie warten müssen, wenn Runnable beenden, dann können sie ausgeführt werden.So verwalten Sie Threads im Frühjahr TaskExecutor Framework

Regel ist ganz einfach: Jeder Runnable hat einen Code. Zwei Runnable mit demselben Code können nicht gleichzeitig ausgeführt werden, aber wenn der Code sich unterscheidet, sollten sie parallel ausgeführt werden. Mit anderen Worten alle laufenden Runnable müssen unterschiedliche Code haben, alle "Duplikate" sollten warten.

Das Problem ist, dass es kein Ereignis/Methode/überhaupt gibt, wenn der Thread endet. Ich baute eine solche Mitteilung in jedem Runnable kann, aber ich mag diesen Ansatz nicht, weil es vor dem Fadenenden gerade getan wird, nicht, nachdem es

java.util.concurrent.ThreadPoolExecutor hat afterExecute Verfahren beendet ist, aber es muss umgesetzt werden - Frühling Verwenden Sie nur die Standardimplementierung, und diese Methode wird ignoriert.

Auch wenn ich das mache, wird es kompliziert, weil ich zwei zusätzliche Sammlungen verfolgen muss: mit Runnable s bereits ausgeführt (keine Implementierung bietet Zugriff auf diese Informationen) und mit denen aufgeschoben, weil sie Code dupliziert haben.

Ich mag die BlockingQueue Ansatz, weil es keine Abfrage, Thread einfach aktivieren, wenn etwas Neues in der Warteschlange ist. Aber vielleicht gibt es einen besseren Ansatz, um solche Abhängigkeiten zwischen Runnable s zu verwalten, also sollte ich aufgeben mit BlockingQueue und andere Strategie verwenden?

Antwort

0

Eine alternative Strategie, die in den Sinn kommt, ist, einen separaten einzelnen Thread-Executor für jeden möglichen Code zu haben. Wenn Sie dann eine neue Runnable einreichen möchten, suchen Sie einfach nach dem richtigen Executor, der für seinen Code verwendet werden soll, und senden den Job.

Dies kann oder kann keine gute Lösung sein, abhängig davon, wie viele verschiedene Codes Sie haben. Die Hauptsache ist, dass die Anzahl der laufenden Threads so hoch wie die Anzahl der verschiedenen Codes sein kann. Wenn Sie viele verschiedene Codes haben, könnte das ein Problem sein.

Natürlich können Sie Semaphore verwenden, um die Anzahl der gleichzeitig ausgeführten Jobs einzuschränken; Sie würden immer noch einen Thread pro Code erstellen, aber nur eine begrenzte Anzahl könnte gleichzeitig ausgeführt werden. Zum Beispiel würde dies Arbeitsplätze durch Code serialise, so dass bis zu drei verschiedenen Codes gleichzeitig ausgeführt werden:

public class MultiPoolExecutor { 
    private final Semaphore semaphore = new Semaphore(3); 

    private final ConcurrentMap<String, ExecutorService> serviceMap 
      = new ConcurrentHashMap<>(); 

    public void submit(String code, Runnable job) { 
     ExecutorService executorService = serviceMap.computeIfAbsent(
       code, (k) -> Executors.newSingleThreadExecutor()); 
     executorService.submit(() -> { 
      semaphore.acquireUninterruptibly(); 
      try { 
       job.run(); 
      } finally { 
       semaphore.release(); 
      } 
     }); 
    } 
} 

Ein anderer Ansatz wäre, die Runnable zu ändern, um eine Sperre zu lösen und für Jobs überprüfen, die nach Abschluss ausgeführt werden könnten (Umfragen zu vermeiden) - etwas wie dieses Beispiel, das alle Jobs in einer Liste hält, bis sie abgeschickt werden können. Das boolesche Latch stellt sicher, dass nur ein Job für jeden Code zu einem bestimmten Zeitpunkt an den Thread-Pool gesendet wurde. Jedes Mal, wenn ein neuer Job eintrifft oder ein laufender Job ausgeführt wird, wird der Code erneut auf neue Jobs geprüft, die eingereicht werden können (CodedRunnable ist einfach eine Erweiterung von Runnable, die eine Codeeigenschaft hat).

public class SubmissionService { 
    private final ExecutorService executorService = Executors.newFixedThreadPool(5); 
    private final ConcurrentMap<String, AtomicBoolean> locks = new ConcurrentHashMap<>(); 
    private final List<CodedRunnable> jobs = new ArrayList<>(); 

    public void submit(CodedRunnable codedRunnable) { 
     synchronized (jobs) { 
      jobs.add(codedRunnable); 
     } 
     submitWaitingJobs(); 
    } 

    private void submitWaitingJobs() { 
     synchronized (jobs) { 
      for(Iterator<CodedRunnable> iter = jobs.iterator(); iter.hasNext();) { 
       CodedRunnable nextJob = iter.next(); 
       AtomicBoolean latch = locks.computeIfAbsent(
         nextJob.getCode(), (k) -> new AtomicBoolean(false)); 
       if(latch.compareAndSet(false, true)) { 
        iter.remove(); 
        executorService.submit(() -> { 
         try { 
          nextJob.run(); 
         } finally { 
          latch.set(false); 
          submitWaitingJobs(); 
         } 
        }); 
       } 
      } 
     } 
    } 
} 

Der Nachteil dieses Ansatzes ist, dass der Code durch die gesamte Liste Arbeitsplätze des Wartens scannen muss nach jeder Aufgabe abgeschlossen ist.Natürlich könnten Sie dies effizienter machen - eine abschließende Aufgabe würde eigentlich nur nach anderen Jobs mit demselben Code suchen müssen, so dass die Jobs stattdessen in einer Map<String, List<Runnable>> Struktur gespeichert werden könnten, um eine schnellere Verarbeitung zu ermöglichen.

+0

Ich mag diesen ersten Ansatz, aber es wäre besser, wenn ExecutorService zerstört wird, wenn nicht benötigt wird. – Marx

1

Wenn die Anzahl der verschiedenen Codes nicht so groß ist, ist der Ansatz mit einem separaten einzelnen Thread-Executor für jeden möglichen Code, angeboten von BarrySW19, in Ordnung. Wenn die gesamte Anzahl von Threads inakzeptabel wird, dann statt Vollstrecker einzelnen Thread, können wir einen Schauspieler verwenden (von Akka oder einem anderen ähnliche Bibliothek):

public class WorkerActor extends UntypedActor { 
    public void onReceive(Object message) { 
    if (message instanceof Runnable) { 
     Runnable work = (Runnable) message; 
     work.run(); 
    } else { 
     // report an error 
    } 
    } 
} 

Wie in der ursprünglichen Lösung, ActorRef s für WorkerActor s werden in einer HashMap gesammelt. Wenn ein entsprechend dem angegebenen Code erhalten (abgerufen oder erstellt) wird, wird Runnable job zur Ausführung mit workerActorRef.tell(job) übergeben.

Wenn Sie nicht eine Abhängigkeit der Schauspieler Bibliothek haben möchten, können Sie WorkerActor von Grund auf neu programmiert werden:

public class WorkerActor implements Runnable, Executor { 
    Executor executor=ForkJoinPool.commonPool(); // or can by assigned in constructor 
    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueu<>(); 
    boolean running = false; 

    public synchronized void execute(Runnable job) { 
    queue.put(job); 
    if (!running) { 
     executor.execute(this); // execute this worker, not job! 
     running=true; 
    } 

    public void run() { 
    for (;;) { 
     Runnable work=null; 
     synchronized (this) { 
     work = queue.poll(); 
     if (work==null) { 
      running = false; 
      return; 
     } 
     } 
     work.run(); 
    } 
    } 
} 

Wenn ein WorkerActor worker auf den angegebenen Code entspricht, erhalten (abgerufen oder erstellt), die Runnable job wird mit worker.execute(job) ausgeführt.

+0

Würden Sie hier nicht ein Problem bekommen, dass, wenn Sie viele Jobs mit dem gleichen Code in einer Reihe eingereicht haben, die meisten üblichen Pool-Threads nur in der Schleife laufen würden, bis die erste aus dem Weg geht und Jobs später für andere Codes würden blockiert werden? – BarrySW19

+0

@ BarrySW19 Jeder Benutzerauftrag wird mit dem entsprechenden Code in den WorkerActor eingereiht und belegt somit keinen Pool-Thread. Der Aktor selbst kann höchstens einen Thread belegen. Wenn also zunächst viele Jobs mit demselben Code eingereicht wurden, ist nur der Pool-Thread mit einem WorkerActor belegt, der diese Jobs sequentiell nacheinander verarbeitet. –

+0

Ah, ich bekomme es - jedes Mal, wenn ein Job eingereicht wird, verursacht der Actor eine Schleife, bis seine Warteschlange geleert ist - weitere Jobs, die während der Arbeit eingereicht werden, werden offensichtlich als Teil dieses Prozesses abgeholt. Dies vermeidet das weniger effiziente Scannen, das verwendet wird, um die nächste Aufgabe in meinem zweiten Vorschlag auszuwählen. – BarrySW19

Verwandte Themen