2016-11-16 3 views
2

Ich bin auf der Suche nach besseren Weg zu "schließen" einige Ressource, hier zerstören externe Process, in CompletableFuture Kette. Gerade jetzt mein Code sieht in etwa wie folgt aus:Schließen externen Prozess in CompletableFuture-Kette

public CompletableFuture<ExecutionContext> createFuture() 
{ 
    final Process[] processHolder = new Process[1]; 
    return CompletableFuture.supplyAsync(
      () -> { 
       try { 
        processHolder[0] = new ProcessBuilder(COMMAND) 
          .redirectErrorStream(true) 
          .start(); 
       } catch (IOException e) { 
        throw new UncheckedIOException(e); 
       } 
       return PARSER.parse(processHolder[0].getInputStream()); 
      }, SCHEDULER) 
      .applyToEither(createTimeoutFuture(DURATION), Function.identity()) 
      .exceptionally(throwable -> { 
       processHolder[0].destroyForcibly(); 
       if (throwable instanceof TimeoutException) { 
        throw new DatasourceTimeoutException(throwable); 
       } 
       Throwables.propagateIfInstanceOf(throwable, DatasourceException.class); 
       throw new DatasourceException(throwable); 
      }); 
} 

Das Problem, das ich sehe, ist ein „Hacky“ Ein-Elementanordnung, die sich auf den Prozess hält, so dass es im Falle eines Fehlers geschlossen werden kann. Gibt es eine CompletableFuture API, die erlaubt, einige "Kontext" zu exceptionally (oder eine andere Methode, um das zu erreichen) zu übergeben?

Ich dachte über benutzerdefinierte CompletionStage Implementierung, aber es sieht aus wie eine große Aufgabe loszuwerden "Halter" Variable.

Antwort

2

Es ist keine lineare Kette von CompletableFuture s erforderlich. Nun, eigentlich haben Sie schon nicht wegen der createTimeoutFuture(DURATION), die für die Umsetzung einer Zeitüberschreitung recht kompliziert ist. Sie können es einfach so:

public CompletableFuture<ExecutionContext> createFuture() { 
    CompletableFuture<Process> proc=CompletableFuture.supplyAsync(
     () -> { 
      try { 
       return new ProcessBuilder(COMMAND).redirectErrorStream(true).start(); 
      } catch (IOException e) { 
       throw new UncheckedIOException(e); 
      } 
     }, SCHEDULER); 
    CompletableFuture<ExecutionContext> result 
     =proc.thenApplyAsync(process -> PARSER.parse(process.getInputStream()), SCHEDULER); 
    proc.thenAcceptAsync(process -> { 
     if(!process.waitFor(DURATION, TimeUnit.WHATEVER_DURATION_REFERS_TO)) { 
      process.destroyForcibly(); 
      result.completeExceptionally(
       new DatasourceTimeoutException(new TimeoutException())); 
     } 
    }); 
    return result; 
} 

Wenn Sie die timout Zukunft halten wollen, vielleicht Sie den Prozess Startzeit betrachten erheblich zu sein, Sie

public CompletableFuture<ExecutionContext> createFuture() { 
    CompletableFuture<Throwable> timeout=createTimeoutFuture(DURATION); 
    CompletableFuture<Process> proc=CompletableFuture.supplyAsync(
     () -> { 
      try { 
       return new ProcessBuilder(COMMAND).redirectErrorStream(true).start(); 
      } catch (IOException e) { 
       throw new UncheckedIOException(e); 
      } 
     }, SCHEDULER); 
    CompletableFuture<ExecutionContext> result 
     =proc.thenApplyAsync(process -> PARSER.parse(process.getInputStream()), SCHEDULER); 
    timeout.exceptionally(t -> new DatasourceTimeoutException(t)) 
      .thenAcceptBoth(proc, (x, process) -> { 
       if(process.isAlive()) { 
        process.destroyForcibly(); 
        result.completeExceptionally(x); 
       } 
      }); 
    return result; 
} 
+0

Erste Snippet kann ich nicht verwenden, weil '.waitFor' Blöcke und ich möchte möglicherweise sehr große' InputStream' (daher '.redirectErrorStream (true)', habe ich "Fehlererkennung" später, aber das ist eine andere Geschichte). Der zweite Weg scheint ein wenig verschachtelt (3 Futures, einschließlich 'CompletableFuture '), aber ich werde es versuchen. – Xaerxess

+1

Es blockiert in einer * asynchronen * Operation. Das ist der springende Punkt von 'thenAcceptAsync' oder gut,' CompleteableFuture' Stufen im Allgemeinen; unabhängige Stufen können gleichzeitig ausgeführt werden. Hier laufen 'PARSER.parse' und' process.waitFor' gleichzeitig in unabhängigen Stufen. – Holger

+0

Ah, ich verstehe. Lass uns 'waitFor' dann versuchen! – Xaerxess

1

verwenden konnte ich die benutzt habe ein Element array selbst zu emulieren, was richtige Schließungen in Java wäre.

Eine andere Option verwendet eine private statische Klasse mit Feldern. Die Vorteile sind, dass es den Zweck klarer macht und etwas weniger Auswirkungen auf den Garbage Collector mit großen Closures hat, dh ein Objekt mit N Feldern gegenüber N Arrays der Länge 1. Es wird auch nützlich, wenn Sie über dieselben Felder schließen müssen in anderen Methoden.

Dies ist ein de facto Muster, auch außerhalb des Anwendungsbereichs der CompletableFuture und es war (ab) verwendet, lange bevor lambdas etwas in Java waren, z.B. anonyme Klassen. Also, fühlen Sie sich nicht so schlecht, es ist nur, dass die Entwicklung von Java uns keine richtigen Schließungen (noch? Jemals?) Zur Verfügung gestellt hat.

Wenn Sie möchten, können Sie Werte von CompletableFuture s in .handle() zurückgeben, so dass Sie das Fertigstellungsergebnis vollständig umbrechen und einen Wrapper zurückgeben können. Meiner Meinung nach ist dies nicht besser als manuelle Schließungen, fügte hinzu, dass Sie solche Wrapper pro Zukunft erstellen werden.

Unterklasse CompletableFuture ist nicht erforderlich. Sie sind nicht daran interessiert, sein Verhalten zu ändern, sondern nur daran Daten anzuhängen, was Sie mit der aktuellen Variablenerfassung von Java machen können. Das heißt, es sei denn, Sie profilieren sich und sehen, dass das Erstellen dieser Schließungen tatsächlich die Leistung beeinflusst, was ich sehr bezweifle.

Verwandte Themen