2017-11-18 6 views
0

In meinem Code muss ich eine Aufgabe ausführen, die Rekursions- und parallele Stream-Verarbeitung nutzt, um tief in einen Baum möglicher Spielzüge einzutauchen und zu entscheiden, was der beste Zug ist. Dies erfordert viel Zeit. Um zu verhindern, dass der Benutzer zu lange auf den Computer wartet, um zu "denken", möchte ich eine Zeit von sagen wir 1000 Millisekunden einstellen. Wenn die beste Bewegung nicht innerhalb von 1000 ms gefunden wird, spielt der Computer eine zufällige Bewegung. Mein Problem ist, dass, obwohl ich Abbrechen auf Future (mit möglicherweise auf True gesetzt), die Aufgabe nicht unterbrochen wird und die beschäftigten Threads im Hintergrund ausgeführt werden. Ich habe versucht, in regelmäßigen Abständen nach isInterrupted() auf dem aktuellen zu überprüfen und dann versuchen, auszusteigen, aber das hat nicht geholfen. Irgendwelche Ideen?Wie kann ich die Zukunft einer Multithread-Task abbrechen?

Unten ist mein Code:

public Move bestMove() { 
    ExecutorService executor = Executors.newSingleThreadExecutor(); 
    Callable<Move> callable =() -> bestEntry(bestMoves()).getKey(); 
    Future<Move> future = executor.submit(callable); 
    try { 
     return future.get(1000, TimeUnit.MILLISECONDS); 
    } catch (InterruptedException e) { 
     System.exit(0); 
    } catch (ExecutionException e) { 
     throw new RuntimeException(e); 
    } catch (TimeoutException e) { 
     future.cancel(true); 
     return randomMove(); 
    } 
    return null; 
} 

private Move randomMove() { 
    Random random = new Random(); 
    List<Move> moves = state.possibleMoves(); 
    return moves.get(random.nextInt(moves.size())); 
} 

private <K> Map.Entry<K, Double> bestEntry(Map<K, Double> map) { 
    List<Map.Entry<K, Double>> list = new ArrayList<>(map.entrySet()); 
    Collections.sort(list, (e1, e2) -> (int) (e2.getValue() - e1.getValue())); 
    return list.get(0); 
} 

private <K> Map.Entry<K, Double> worstEntry(Map<K, Double> map) { 
    List<Map.Entry<K, Double>> list = new ArrayList<>(map.entrySet()); 
    Collections.sort(list, (e1, e2) -> (int) (e1.getValue() - e2.getValue())); 
    return list.get(0); 
} 

private Map<Move, Double> bestMoves() { 
    Map<Move, Double> moves = new HashMap<>(); 
    state.possibleMoves().stream().parallel().forEach(move -> { 
     if (!Thread.currentThread().isInterrupted()) { 
      Game newState = state.playMove(move); 
      Double score = newState.isTerminal() ? newState.utility() 
        : worstEntry(new (newState).worstMoves()).getValue(); 
      moves.put(move, score); 
     } 
    }); 
    return moves; 
} 

private Map<Move, Double> worstMoves() { 
    Map<Move, Double> moves = new HashMap<>(); 
    state.possibleMoves().stream().parallel().forEach(move -> { 
     if (!Thread.currentThread().isInterrupted()) { 
      Game newState = state.playMove(move); 
      Double score = newState.isTerminal() ? -newState.utility() 
        : bestEntry(new (newState).bestMoves()).getValue(); 
      moves.put(move, score); 
     } 
    }); 
    return moves; 
} 

ps: ich auch ohne versucht, "parallel()" aber auch hier gibt es noch einen einzigen Faden links.

Vielen Dank im Voraus.

+0

https://techblog.bozho.net/unterbrechend-executor-tasks/ – MarianP

Antwort

0

Future.cancel setzen gerade den Faden als interrupted, dann muss Ihr Code es wie folgt behandeln:

public static void main(String[] args) throws InterruptedException { 

    final ExecutorService executor = Executors.newSingleThreadExecutor(); 
    final Future<Integer> future = executor.submit(() -> count()); 
    try { 
     System.out.println(future.get(1, TimeUnit.SECONDS)); 
    } catch (Exception e){ 
     future.cancel(true); 
     e.printStackTrace(); 
    }finally { 
     System.out.printf("status=finally, cancelled=%s, done=%s%n", future.isCancelled(), future.isDone()); 
     executor.shutdown(); 
    } 

} 

static int count() throws InterruptedException { 
    while (!Thread.interrupted()); 
    throw new InterruptedException(); 
} 

Wie Sie die count Behalten Sie die Kontrolle sehen können, wenn der Faden am Laufen zu halten verfügbar ist, müssen Sie verstehen, dass Tatsächlich gibt es keine Garantie, dass ein laufender Thread gestoppt werden kann, wenn sie nicht möchte.

Referenz:


UPDATE 2017-11-18 23:22

Ich schrieb eine FutureTask-Erweiterung, die die Fähigkeit haben, versuchen zu sto p der Thread, auch wenn der Code das interrupt Signal nicht respektiert. Denken Sie daran, dass es unsicher ist, weil die Thread.stop Methode is deprecated, sowieso es funktioniert und wenn Sie wirklich brauchen, dass Sie es verwenden können (Lesen Sie Thread.stop Vernachlässigungshinweise vor, zum Beispiel, wenn Sie Sperren verwenden, dann .stop kann Deadlocks verursachen).

Prüfregeln

public static void main(String[] args) throws InterruptedException { 
    final ExecutorService executor = newFixedSizeExecutor(1); 
    final Future<Integer> future = executor.submit(() -> count()); 
    try { 
     System.out.println(future.get(1, TimeUnit.SECONDS)); 
    } catch (Exception e){ 
     future.cancel(true); 
     e.printStackTrace(); 
    } 
    System.out.printf("status=finally, cancelled=%s, done=%s%n", future.isCancelled(), future.isDone()); 
    executor.shutdown(); 
} 

static int count() throws InterruptedException { 
    while (true); 
} 

Individuelle Executor

static ThreadPoolExecutor newFixedSizeExecutor(final int threads) { 
    return new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()){ 
     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 
      return new StoppableFutureTask<>(new FutureTask<>(callable)); 
     } 
    }; 
} 

static class StoppableFutureTask<T> implements RunnableFuture<T> { 

    private final FutureTask<T> future; 
    private Field runnerField; 

    public StoppableFutureTask(FutureTask<T> future) { 
     this.future = future; 
     try { 
      final Class clazz = future.getClass(); 
      runnerField = clazz.getDeclaredField("runner"); 
      runnerField.setAccessible(true); 
     } catch (Exception e) { 
      throw new Error(e); 
     } 
    } 

    @Override 
    public boolean cancel(boolean mayInterruptIfRunning) { 
     final boolean cancelled = future.cancel(mayInterruptIfRunning); 
     if(cancelled){ 
      try { 
       ((Thread) runnerField.get(future)).stop(); 
      } catch (Exception e) { 
       throw new Error(e); 
      } 
     } 
     return cancelled; 
    } 

    @Override 
    public boolean isCancelled() { 
     return future.isCancelled(); 
    } 

    @Override 
    public boolean isDone() { 
     return future.isDone(); 
    } 

    @Override 
    public T get() throws InterruptedException, ExecutionException { 
     return future.get(); 
    } 

    @Override 
    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 
     return future.get(timeout, unit); 
    } 

    @Override 
    public void run() { 
     future.run(); 
    } 
} 

Ausgang

java.util.concurrent.TimeoutException 
    at java.util.concurrent.FutureTask.get(FutureTask.java:205) 
    at com.mageddo.spark.sparkstream_1.Main$StoppableFutureTask.get(Main.java:91) 
    at com.mageddo.spark.sparkstream_1.Main.main(Main.java:20) 
status=finally, cancelled=true, done=true 

Process finished with exit code 0 
0

Vielen Dank für Ihre Antworten. Ich denke, ich habe eine einfachere Lösung gefunden. Zunächst einmal glaube ich, dass future.cancel (true) nicht funktionierte, weil es wahrscheinlich nur das unterbrochene Flag auf dem Thread gesetzt hat, der die Aufgabe gestartet hat. (Dies ist der Thread, der mit der Zukunft verknüpft ist). Da die Task jedoch parallele Stream-Verarbeitung verwendet, erzeugt sie Worker in verschiedenen Threads, die nie unterbrochen werden. Daher kann ich das isInterrupted() -Flag nicht regelmäßig überprüfen. Die "Lösung" (oder vielleicht mehr von Workaround), die ich gefunden habe, ist, meine eigene unterbrochene Flagge in den Objekten meines Algorithmus zu behalten und sie manuell auf wahr zu setzen, wenn die Aufgabe abgebrochen wird. Da alle Threads in der gleichen Instanz arbeiten, haben sie alle Zugriff auf die unterbrochene Flagge und sie gehorchen.

Verwandte Themen