Ich möchte eine Reactor Flux von einem gRPC StreamObserver erstellen. Dies muss durchgeführt werden, solange StreamObserver die entsprechenden Schnittstellen nicht nativ implementiert (siehe z. B. this issue)."Bridging" -Reaktor Flux von gRPC StreamObserver
Was kam ich mit ungefähr wie folgt aus:
final StreamObserver<ProtoResponse>[] streamObserverArray = new StreamObserver[1];
Flux<Response> myFlux Flux.create(sink -> streamObserverArray[0] = new StreamObserver<ProtoResponse>() {
@Override
public void onNext(ProtoResponse value) {
final Response response = convertFromProto(value);
sink.next(response);
}
@Override
public void onError(Throwable throwable) {
sink.error(throwable);
}
@Override
public void onCompleted() {
sink.complete();
}
});
myFlux
.doOnError(throwable -> {/* actual logic in here */}) //
.doOnComplete(() -> {/* actual logic in here */}) //
.doOnCancel(() -> {/* actual logic in here */}) //
.parallel() //
.runOn(Schedulers.parallel()) //
.doOnNext(/* actual heavy lifting logic in here */) //
.map(/* ... */) //
.sequential() //
.doOnNext(/* ...*/) //
.subscribe(); // needed to start the actual processing of the events on this Flux
MyGrpcService.newStub(channel).getResponses(protoRequest, streamObserverArray[0]);
Die Idee, warum ich hier Reactor verwenden möchten ist es, die „schweres Heben Arbeit“ auf mehrere Threads parallel zu verteilen und das nicht zu tun, auf den gRPC Anfrage Threads.
Ich sehe mehrere Probleme mit dem Ansatz, wie es oben getan wird:
- Ich mag es nicht wirklich die Abhilfe mit dem
StreamObserver[]
Array - Ich muß zuerst den kompletten Fluss schaffen, weil, wenn ich don Beenden Sie es nicht mit
.subscribe()
zuerst, dieStreamObserver
könntenull
sein, wenn gRPC beginnt zu kommunizieren (auch Race-Bedingung). - Ich bin mir nicht sicher, ob Gegendruck so funktioniert, wie es beabsichtigt ist (obwohl das momentan nicht mein Hauptanliegen ist).
So würden meine Fragen sein: Was ist der beste/bevorzugte Weg ist von einem gRPC StreamObserver zu einem Reaktor Flux zu überbrücken? Gibt es Best Practices?