ich mich gefragt, ob es ein Weg, um den reaktiven Kontext in eine Thread Variable zu drücken, bevor ein Teilnehmer das OnNext Signal empfangen. Beim Graben innerhalb des Reaktorkerns habe ich Hooks und Lift BiFunction gefunden.Mit Haken und einen Aufzug schieben Context in Thread
Ich habe eine Klasse mit der folgenden Implementierung erstellt. Die Klasse besteht aus einer ThreadLocal-Variablen, die den Kontext enthält und die erforderliche BiFunction-Schnittstelle implementiert. Er wird den gesamten Anruf an den tatsächlichen Teilnehmer delegieren und den Kontext, wenn er in die ThreadLocal-Variable geändert wird, vor dem Aufrufen von onNext auf dem tatsächlichen Abonnenten auch schieben.
package com.example.demo;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.util.context.Context;
import java.util.function.BiFunction;
public class ThreadLocalContextLifter<T> implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {
private static Logger logger = LoggerFactory.getLogger(ThreadLocalContextLifter.class);
private static final ThreadLocal<Context> contextHolder = new ThreadLocal<>();
public static Context getContext() {
Context context = contextHolder.get();
if (context == null) {
context = Context.empty();
contextHolder.set(context);
}
return context;
}
public static void setContext(Context context) {
contextHolder.set(context);
}
@Override
public CoreSubscriber<? super T> apply(Scannable scannable, CoreSubscriber<? super T> coreSubscriber) {
return new ThreadLocalContextCoreSubscriber<>(coreSubscriber);
}
final class ThreadLocalContextCoreSubscriber<U> implements CoreSubscriber<U> {
private CoreSubscriber<? super U> delegate;
public ThreadLocalContextCoreSubscriber(CoreSubscriber<? super U> delegate) {
this.delegate = delegate;
}
@Override
public Context currentContext() {
return delegate.currentContext();
}
@Override
public void onSubscribe(Subscription s) {
delegate.onSubscribe(s);
}
@Override
public void onNext(U u) {
Context context = delegate.currentContext();
if (!context.isEmpty()) {
Context currentContext = ThreadLocalContextLifter.getContext();
if (!currentContext.equals(context)) {
logger.info("Pushing reactive context to holder {}", context);
ThreadLocalContextLifter.setContext(context);
}
}
delegate.onNext(u);
}
@Override
public void onError(Throwable t) {
delegate.onError(t);
}
@Override
public void onComplete() {
delegate.onComplete();
}
}
}
Die Instanz wird mit dem folgenden Code in die Haken geladen:
Hooks.onEachOperator(Operators.lift(new ThreadLocalContextLifter<>()));
Ich habe einige Tests laufen und es scheint, richtig zu arbeiten, aber ich bin nicht von der Lösung überzeugt. Ich vermute, dass der Haken die Leistung des Reaktors verschlechtert oder dass es in einigen Fällen, die mir nicht bewusst sind, nicht funktionieren wird.
Meine Frage ist einfach: Ist dies eine schlechte Idee?