2017-10-23 3 views
2

Ich möchte eine Reactor Flux von einem gRPC StreamObserver erstellen. Dies muss durchgeführt werden, solange StreamObserver die entsprechenden Schnittstellen nicht nativ implementiert (siehe z. B. this issue)."Bridging" -Reaktor Flux von gRPC StreamObserver

Was kam ich mit ungefähr wie folgt aus:

final StreamObserver<ProtoResponse>[] streamObserverArray = new StreamObserver[1]; 
Flux<Response> myFlux Flux.create(sink -> streamObserverArray[0] = new StreamObserver<ProtoResponse>() { 
     @Override 
     public void onNext(ProtoResponse value) { 
      final Response response = convertFromProto(value); 
      sink.next(response); 
     } 

     @Override 
     public void onError(Throwable throwable) { 
      sink.error(throwable); 
     } 

     @Override 
     public void onCompleted() { 
      sink.complete(); 
     } 
    }); 
myFlux 
    .doOnError(throwable -> {/* actual logic in here */}) // 
    .doOnComplete(() -> {/* actual logic in here */}) // 
    .doOnCancel(() -> {/* actual logic in here */}) // 
    .parallel() // 
    .runOn(Schedulers.parallel()) // 
    .doOnNext(/* actual heavy lifting logic in here */) // 
    .map(/* ... */) // 
    .sequential() // 
    .doOnNext(/* ...*/) // 
    .subscribe(); // needed to start the actual processing of the events on this Flux 

MyGrpcService.newStub(channel).getResponses(protoRequest, streamObserverArray[0]); 

Die Idee, warum ich hier Reactor verwenden möchten ist es, die „schweres Heben Arbeit“ auf mehrere Threads parallel zu verteilen und das nicht zu tun, auf den gRPC Anfrage Threads.

Ich sehe mehrere Probleme mit dem Ansatz, wie es oben getan wird:

  • Ich mag es nicht wirklich die Abhilfe mit dem StreamObserver[] Array
  • Ich muß zuerst den kompletten Fluss schaffen, weil, wenn ich don Beenden Sie es nicht mit .subscribe() zuerst, die StreamObserver könnte null sein, wenn gRPC beginnt zu kommunizieren (auch Race-Bedingung).
  • Ich bin mir nicht sicher, ob Gegendruck so funktioniert, wie es beabsichtigt ist (obwohl das momentan nicht mein Hauptanliegen ist).

So würden meine Fragen sein: Was ist der beste/bevorzugte Weg ist von einem gRPC StreamObserver zu einem Reaktor Flux zu überbrücken? Gibt es Best Practices?

Antwort

0

Nach weiteren Hantieren und das Verständnis der gesamten reaktiven Sachen ein bisschen besser, kam ich auf die folgende Lösung:

/** 
* Bridge the StreamObserver from gRPC to the Publisher from the reactive world. 
*/ 
public class StreamObserverPublisher implements Publisher<Long>, StreamObserver<Long> { 

    private Subscriber<? super Long> subscriber; 

    @Override 
    public void onNext(Long l) { 
     subscriber.onNext(l); 
    } 

    @Override 
    public void onError(Throwable throwable) { 
     subscriber.onError(throwable); 
    } 

    @Override 
    public void onCompleted() { 
     subscriber.onComplete(); 
    } 

    @Override 
    public void subscribe(Subscriber<? super Long> subscriber) { 
     this.subscriber = subscriber; 
     this.subscriber.onSubscribe(new BaseSubscriber() {}); 
    } 
} 

// and somewhere else in the code 
StreamObserverPublisher streamObserverPublisher = new StreamObserverPublisher(); 
Flux<Long> longFlux = Flux.from(streamObserverPublisher); 
longFlux.subscribe(...); // must be done before executing the gRPC request 
MyGrpcService.newStub(channel).getResponses(protoRequest, streamObserverPublisher);