2017-11-26 3 views
0

ich mich gefragt, ob es ein Weg, um den reaktiven Kontext in eine Thread Variable zu drücken, bevor ein Teilnehmer das OnNext Signal empfangen. Beim Graben innerhalb des Reaktorkerns habe ich Hooks und Lift BiFunction gefunden.Mit Haken und einen Aufzug schieben Context in Thread

Ich habe eine Klasse mit der folgenden Implementierung erstellt. Die Klasse besteht aus einer ThreadLocal-Variablen, die den Kontext enthält und die erforderliche BiFunction-Schnittstelle implementiert. Er wird den gesamten Anruf an den tatsächlichen Teilnehmer delegieren und den Kontext, wenn er in die ThreadLocal-Variable geändert wird, vor dem Aufrufen von onNext auf dem tatsächlichen Abonnenten auch schieben.

package com.example.demo; 

import org.reactivestreams.Subscription; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import reactor.core.CoreSubscriber; 
import reactor.core.Scannable; 
import reactor.util.context.Context; 

import java.util.function.BiFunction; 

public class ThreadLocalContextLifter<T> implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> { 

    private static Logger logger = LoggerFactory.getLogger(ThreadLocalContextLifter.class); 

    private static final ThreadLocal<Context> contextHolder = new ThreadLocal<>(); 

    public static Context getContext() { 
     Context context = contextHolder.get(); 
     if (context == null) { 
      context = Context.empty(); 
      contextHolder.set(context); 
     } 
     return context; 
    } 

    public static void setContext(Context context) { 
     contextHolder.set(context); 
    } 

    @Override 
    public CoreSubscriber<? super T> apply(Scannable scannable, CoreSubscriber<? super T> coreSubscriber) { 
     return new ThreadLocalContextCoreSubscriber<>(coreSubscriber); 
    } 

    final class ThreadLocalContextCoreSubscriber<U> implements CoreSubscriber<U> { 

     private CoreSubscriber<? super U> delegate; 

     public ThreadLocalContextCoreSubscriber(CoreSubscriber<? super U> delegate) { 
      this.delegate = delegate; 
     } 

     @Override 
     public Context currentContext() { 
      return delegate.currentContext(); 
     } 

     @Override 
     public void onSubscribe(Subscription s) { 
      delegate.onSubscribe(s); 
     } 

     @Override 
     public void onNext(U u) { 
      Context context = delegate.currentContext(); 
      if (!context.isEmpty()) { 
       Context currentContext = ThreadLocalContextLifter.getContext(); 
       if (!currentContext.equals(context)) { 
        logger.info("Pushing reactive context to holder {}", context); 
        ThreadLocalContextLifter.setContext(context); 
       } 
      } 
      delegate.onNext(u); 
     } 

     @Override 
     public void onError(Throwable t) { 
      delegate.onError(t); 
     } 

     @Override 
     public void onComplete() { 
      delegate.onComplete(); 
     } 
    } 

} 

Die Instanz wird mit dem folgenden Code in die Haken geladen:

Hooks.onEachOperator(Operators.lift(new ThreadLocalContextLifter<>())); 

Ich habe einige Tests laufen und es scheint, richtig zu arbeiten, aber ich bin nicht von der Lösung überzeugt. Ich vermute, dass der Haken die Leistung des Reaktors verschlechtert oder dass es in einigen Fällen, die mir nicht bewusst sind, nicht funktionieren wird.

Meine Frage ist einfach: Ist dies eine schlechte Idee?

Antwort

1

Ich glaube nicht, dass irgendetwas mit dieser Idee nicht stimmt ... Der Haken wird von jedem Operator verwendet, der von Reactor zur Verfügung gestellt wird.

Die Context ändert nicht zwischen onNext, so konnte der Aufzug ThreadLocalContextCoreSubscriber es in onSubscribe erfassen. Aber Sie müssen das ThreadLocal mindestens einmal in onNext überprüfen, da onNext und onSubscribekann auf zwei verschiedenen Threads geschehen, so dass Ihre Lösung der Verwendung von delegate.currentContext() funktioniert auch. Am Ende sieht dein Ansatz gut aus.