2017-01-04 3 views
0

Ich habe drei Fragen im Zusammenhang mit Project Reactor und ich werde sie unten fragen. Beginnen Sie mit dem Code, den ich habe (es wird vereinfacht, um das Problem leichter zu verstehen). Project Reactor Timeout-Behandlung

Mono<Integer> doWithSession(Function<String, Mono<Integer>> callback, long timeout) { 
    return Mono.just("hello") 
     .compose(monostr -> monostr 
      .doOnSuccess(str -> System.out.println("Suppose I want to release session here after all")) //(1) 
      .doOnCancel(() -> System.out.println("cancelled")) //(2) 
      .then(callback::apply) 
      .timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout))) 
     ); 
} 

Und Test:

@Test 
public void testDoWithSession2() throws Exception { 
    Function<String, Mono<Integer>> fun1 = str -> Mono.fromCallable(() -> { 
    System.out.println("do some long timed work"); 
    try { 
     Thread.sleep(5000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    System.out.println("work has completed"); 
    return str.length(); 
    }); 

    StepVerifier.create(doWithSession(fun1,1000)) 
    .verifyError(TimeoutException.class); 
} 

So und Fragen:

  1. wie Aufruf von fun1 zu unterbrechen und sofort Fehler zurück? (Vielleicht mache ich etwas falsch, aber es sieht Fehler nicht nach Timeout zurück, aber nach allen Aufruf von Callback)
  2. warum doOnSuccess und doOnCancel zur gleichen Zeit aufgerufen? (Ich erwartete, dass (1) oder (2) wird aufgerufen, aber die beide nicht)
  3. Und wie mit folgendem Fall befassen:
    • vorstellen, dass in Code Mono.just("hello") Verbindung erwirbt;
    • in callback Ich mache etwas mit Verbindung und bekomme ein Ergebnis (Mono<Integer> in meinem Fall);
    • am Ende (bei Erfolg oder bei Ausfall) Ich möchte Sitzung (ich versuche, dies in (1) zu tun).

Antwort

1

1) Wie Sie herausgefunden, , verwenden Sie .publishOn(Schedulers.single()). Dadurch wird sichergestellt, dass das Callable in einem anderen Thread aufgerufen wird und den Thread nur blockiert. Außerdem ermöglicht es die Aufhebung der Kündigung.

2) Die Reihenfolge Ihrer Kette ist wichtig. Sie setzen .doOnSuccess am Anfang der compose (die Sie übrigens nicht wirklich für dieses spezielle Beispiel benötigen, es sei denn, Sie möchten diese Funktion zur späteren Wiederverwendung extrahieren). Es bedeutet also, dass es grundsätzlich Benachrichtigungen von der Mono.just erhält und ausgeführt wird, sobald die Quelle abgefragt wird, noch bevor Ihre Verarbeitung stattgefunden hat ... Gleiches gilt für doOnCancel. Die Annullierung kommt von der timeout auslösenden ...

3) Es gibt eine Fabrik, um eine Reihenfolge außerhalb einer Ressource zu schaffen und sicherzustellen, dass Ressource aufgeräumt wird: Mono.using. So würde es so etwas wie das aussehen:

public <T> Mono<T> doWithConnection(Function<String, Mono<T>> callback, long timeout) { 
    return Mono.using(
      //the resource supplier: 
      () -> { 
       System.out.println("connection acquired"); 
       return "hello"; 
      }, 
      //create a Mono out of the resource. On any termination, the resource is cleaned up 
      connection -> Mono.just(connection) 
           //the blocking callable needs own thread: 
           .publishOn(Schedulers.single()) 
           //execute the callable and get result... 
           .then(callback::apply) 
           //...but cancel if it takes too long 
           .timeoutMillis(timeout) 
           //for demonstration we'll log when timeout triggers: 
           .doOnError(TimeoutException.class, e -> System.out.println("timed out")), 
      //the resource cleanup: 
      connection -> System.out.println("cleaned up " + connection)); 
} 

dass ein Mono<T> der T-Wert des aufrufbar zurückgibt. Im Produktionscode würden Sie es abonnieren, um mit dem Wert umzugehen. Im Test wird StepVerifier.create() für Sie abonnieren.

Lassen Sie uns, dass mit Ihrer Aufgabe mit langer Laufzeit zeigen und sehen, was es gibt:

@Test 
public void testDoWithSession2() throws Exception { 
    Function<String, Mono<Integer>> fun1 = str -> Mono.fromCallable(() -> { 
     System.out.println("start some long timed work"); 
     //for demonstration we'll print some clock ticks 
     for (int i = 1; i <= 5; i++) { 
      try { 
       Thread.sleep(1000); 
       System.out.println(i + "s..."); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
     System.out.println("work has completed"); 
     return str.length(); 
    }); 

    //let two ticks show up 
    StepVerifier.create(doWithConnection(fun1,2100)) 
       .verifyError(TimeoutException.class); 
} 

Diese Ausgänge:

connection acquired 
start some long timed work 
1s... 
2s... 
timed out 
cleaned up hello 

Und wenn wir das Timeout über 5000 setzen, bekommen wir folgendes. (Es gibt einen Assertionsfehler, da der StepVerifier eine Zeitüberschreitung erwartet):

0

Für die erste Frage sieht aus wie die Antwort Disponenten verwenden:

Mono<Integer> doWithSession(Function<String, Mono<Integer>> callback, long timeout) { 
    Scheduler single = Schedulers.single(); 
    return Mono.just("hello") 
      .compose(monostr -> monostr 
        .publishOn(single) // use scheduler 
        .then(callback::apply) 
        .timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout))) 
      ); 
} 

dritte Frage auf diese Weise könnte gelöst:

private Mono<Integer> doWithSession3(Function<String, Mono<Integer>> callback, long timeout) { 
    Scheduler single = Schedulers.single(); 
    return Mono.just("hello") 
      .then(str -> Mono.just(str) // here wrapping our string to new Mono 
        .publishOn(single) 
        .then(callback::apply) 
        .timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout))) 
        .doAfterTerminate((res, throwable) -> System.out.println("Do anything with your string" + str)) 
      ); 
}