2017-11-14 6 views
0

Ich habe mehrere Dienste, von denen einige den HystrixObservableCommand von Hystrix verwenden, um andere Dienste aufzurufen, und andere verwenden HystrixCommand. Wie gebe ich TraceIds vom aufrufenden Dienst an die Observables in HystrixObservableCommand weiter und lasse sie auch weitergeben, wenn der Fallback aufgerufen wird?Wie traceIds in Hystrix Observables übergeben?

Alle Dienste verwenden grpc-java.

Beispielcode, die ich habe:

WorldCommand worldCommand = new WorldCommand(greeterRequest, worldServiceStub); 
     String messageFromWorldService = ""; 
     String idFromWorldService = ""; 
     try { 

      Greeter.GreeterReply greeterReply = worldCommand.construct().toBlocking().toFuture().get(); 
      messageFromWorldService = greeterReply.getMessage(); 
      idFromWorldService = greeterReply.getId(); 
      logger.info("Response from WorldService -- {}, id = {}", messageFromWorldService, idFromWorldService); 
     } catch (StatusRuntimeException | InterruptedException | ExecutionException e) { 
      logger.warn("Exception when calling WorldService\n" + e); 
     } 

WorldCommand.java

public class WorldCommand extends HystrixObservableCommand<Greeter.GreeterReply> { 

    private static final Logger logger = LoggerFactory.getLogger(WorldCommand.class.getName()); 

    private final Greeter.GreeterRequest greeterRequest; 
    private final WorldServiceGrpc.WorldServiceStub worldServiceStub; 

    public WorldCommand(Greeter.GreeterRequest greeterRequest, WorldServiceGrpc.WorldServiceStub worldServiceStub) { 
     super(HystrixCommandGroupKey.Factory.asKey("WorldService")); 
     this.greeterRequest = greeterRequest; 
     this.worldServiceStub = worldServiceStub; 
    } 

    @Override 
    protected Observable<Greeter.GreeterReply> construct() { 
     Context context = Context.current(); 
     return Observable.create(new Observable.OnSubscribe<Greeter.GreeterReply>() { 
      @Override 
      public void call(Subscriber<? super Greeter.GreeterReply> observer) { 
       logger.info("In WorldCommand"); 
       if (!observer.isUnsubscribed()) { 
        //pass on the context, if you want only certain headers to pass on then create a new Context and attach it. 
        context.attach(); 
        logger.info("In WorldCommand after attach"); 
        worldServiceStub.greetWithHelloOrWorld(greeterRequest, new StreamObserver<Greeter.GreeterReply>() { 
         @Override 
         public void onNext(Greeter.GreeterReply greeterReply) { 
          logger.info("Response from WorldService -- {}, id = {}", greeterReply.getMessage(), greeterReply.getId()); 
          observer.onNext(greeterReply); 
          observer.onCompleted(); 
         } 

         @Override 
         public void onError(Throwable t) { 
          logger.info("Exception from WorldService -- {}", t); 
         } 

         @Override 
         public void onCompleted() { 

         } 
        }); 
       } 
      } 
     }).subscribeOn(Schedulers.io()); 
    } 

    @Override 
    protected Observable<Greeter.GreeterReply> resumeWithFallback() { 
     logger.info("Response from fallback"); 
     Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("teammate").setId("-1").build(); 
     return Observable.just(greeterReply); 
    } 

ich Zipkin bin mit grpc Tracing und MDCCurrentTraceContext die traceid und spanId in den Protokollen zu drucken.

Die beiden Protokolleinträge im WorldCommand drucken die Trace- und Span-IDs nicht aus, sie werden im RxIOScheduler-Thread aufgerufen.

EDIT

Added ConcurrencyStrategy wie von Mike vorgeschlagen.

public class CustomHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { 

    private static final Logger log = LoggerFactory.getLogger(CustomHystrixConcurrencyStrategy.class); 

    public <T> Callable<T> wrapCallable(Callable<T> callable){ 
     log.info("In CustomHystrixConcurrencyStrategy: callable="+ callable.toString()); 
     return new ContextCallable<>(callable); 
    } 
} 

Hallo Service ruft zwei Dienste Welt und Team. Der WorldCommand ist ein HystrixObservableCommand, der TeamCommand ist ein HystrixCommand.

logger.info("In the HelloService:greetWithHelloWorld"); 
Greeter.GreeterRequest greeterRequest = Greeter.GreeterRequest.newBuilder().setId(request.getId()).build(); 

//Call WorldService 
ManagedChannel worldChannel = getChannel("localhost:8081", "helloService-world-client"); 
//Async stub instead of blockingStub 
WorldServiceGrpc.WorldServiceStub worldServiceStub = WorldServiceGrpc.newStub(worldChannel); 

WorldCommand worldCommand = new WorldCommand(greeterRequest, worldServiceStub); 
String messageFromWorldService = ""; 
String idFromWorldService = ""; 
try { 

    Greeter.GreeterReply greeterReply = worldCommand.construct().toBlocking().toFuture().get(); 
    messageFromWorldService = greeterReply.getMessage(); 
    idFromWorldService = greeterReply.getId(); 
    logger.info("Response from WorldService -- {}, id = {}", messageFromWorldService, idFromWorldService); 
} catch (StatusRuntimeException | InterruptedException | ExecutionException e) { 
    logger.warn("Exception when calling WorldService\n" + e); 
} 

//Call TeamService 
ManagedChannel teamChannel = getChannel("localhost:8082", "helloService-team-client"); 
TeamServiceGrpc.TeamServiceBlockingStub teamServiceStub = TeamServiceGrpc.newBlockingStub(teamChannel); 
TeamCommand teamCommand = new TeamCommand(greeterRequest, teamServiceStub); 

String messageFromTeamService = ""; 
String idFromTeamService = ""; 
try { 
    Greeter.GreeterReply greeterReply = teamCommand.construct().toBlocking().toFuture().get(); 
    messageFromTeamService = greeterReply.getMessage(); 
    idFromTeamService = greeterReply.getId(); 
    logger.info("Response from TeamService -- {}, id = {}", messageFromTeamService, idFromTeamService); 
} catch (StatusRuntimeException | InterruptedException | ExecutionException e) { 
    logger.warn("Exception when calling TeamService\n" + e); 
} 

assert(idFromWorldService.equals(idFromTeamService)); 
Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("Hello" + messageFromWorldService + " from " + messageFromTeamService).setId(idFromWorldService).build(); 
responseObserver.onNext(greeterReply); 
responseObserver.onCompleted(); 

PreservableContext Klasse

public class PreservableContexts { 

    //private final TraceContext traceContext; 
    private static final Logger logger = LoggerFactory.getLogger(PreservableContexts.class.getName()); 

    public PreservableContexts() { 
     logger.info("Creating new PreservableContexts"); 
     //this.traceContext = TraceContextHolder.getContext(); 
    } 

    public void set() { 
     // if (traceContext != null) { 
      //TraceContextHolder.setContext(traceContext); 
     // } 
    } 

    public void clear() { 
     //TraceContextHolder.clearContext(); 
    } 

Das Protokoll in PreservableContexts und CustomHystrixConcurrencyStrategy gedruckt werden nie. Ich registriere die Startreihe, wenn ich den HelloServer starte.

HystrixConcurrencyStrategy strategy = new CustomHystrixConcurrencyStrategy(); 
     HystrixPlugins.getInstance().registerConcurrencyStrategy(strategy); 
     context = HystrixRequestContext.initializeContext(); 

EDIT 2

aktualisiert, wie die Observable eingerichtet sind:

ManagedChannel worldChannel = getChannel("localhost:8081", "helloService-world-client"); 
    //Async stub instead of blockingStub 
    WorldServiceGrpc.WorldServiceStub worldServiceStub = WorldServiceGrpc.newStub(worldChannel); 
    WorldCommand worldCommand = new WorldCommand(greeterRequest, worldServiceStub); 

    //Call TeamService 
    ManagedChannel teamChannel = getChannel("localhost:8082", "helloService-team-client"); 
    TeamServiceGrpc.TeamServiceStub teamServiceStub = TeamServiceGrpc.newStub(teamChannel); 
    //TeamServiceGrpc.TeamServiceBlockingStub teamServiceStub = TeamServiceGrpc.newBlockingStub(teamChannel); 
    TeamCommand teamCommand = new TeamCommand(greeterRequest, teamServiceStub); 

    try { 
     rx.Observable<Greeter.GreeterReply> worldReplyObservable = worldCommand.observe().subscribeOn(Schedulers.computation()); 
     rx.Observable<Greeter.GreeterReply> teamReplyObservable = teamCommand.observe().subscribeOn(Schedulers.computation()); 
     Observable.zip(worldReplyObservable, teamReplyObservable, new Func2<Greeter.GreeterReply, Greeter.GreeterReply, Object>() { 
      @Override 
      public Object call(Greeter.GreeterReply worldReply, Greeter.GreeterReply teamReply) { 
       String messageFromWorldService = worldReply.getMessage(); 
       String idFromWorldService = worldReply.getId(); 
       logger.info("Response from WorldService -- {}, id = {}", messageFromWorldService, idFromWorldService); 

       String messageFromTeamService = teamReply.getMessage(); 
       String idFromTeamService = teamReply.getId(); 
       logger.info("Response from TeamService -- {}, id = {}", messageFromTeamService, idFromTeamService); 

       assert(idFromWorldService.equals(idFromTeamService)); 
       Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("Hello" + messageFromWorldService + " from " + messageFromTeamService).setId(idFromWorldService).build(); 
       logger.info("Final response=" + greeterReply.getMessage()); 
       responseObserver.onNext(greeterReply); 
       responseObserver.onCompleted(); 
       return null; 
      } 
     }); 
    } catch (StatusRuntimeException e) { 
     logger.warn("Exception when calling WorldService and/or TeamService\n" + e); 
    } 

ich jetzt ein seltsames Problem, tut die Anrufe TeamCommand und WorldCommand nicht vollständig, wie in diesem Code wird nie ausgeführt:

Observable.zip(worldReplyObservable, teamReplyObservable, new Func2<Greeter.GreeterReply, Greeter.GreeterReply, Object>() { 
       @Override 
       public Object call(Greeter.GreeterReply worldReply, Greeter.GreeterReply teamReply) { 
        String messageFromWorldService = worldReply.getMessage(); 

Wenn es einen Fallback gibt, haben die Hystrix-Timer-Threads nicht mehr den MDC.

Antwort

0

Ich habe nicht viel Wissen mit Hysterix, aber wenn Sie versuchen, einige kontextbezogene Informationen wie Trace-IDs herumzugeben, dann ist io.grpc.Context die richtige Klasse zu verwenden. Sie müssten context.withValue aufrufen, um einen neuen Kontext mit der TraceID zu erstellen. An den Stellen, an denen Sie die Daten haben möchten, müssen Sie den Kontext anhängen. Achten Sie auch darauf, den Kontext zu lösen, wenn Sie fertig sind, was in Ihrem Snippet nicht passiert.

0

Sie verwenden müssen ...

HystrixPlugins.getInstance().registerConcurrencyStrategy(...)

... eine benutzerdefinierte registrieren HystrixConcurrencyStrategy, die Ihre eigene Callable verwendet ...

public class ConcurrencyStrategy extends HystrixConcurrencyStrategy {  
    @Override 
    public <K> Callable<K> wrapCallable(Callable<K> c) { 
     return new ContextCallable<>(c); 
    } 
} 

... das gilt Zusammenhang Erhaltung rund um die Rennstrecke ...

public class ContextCallable<K> implements Callable<K> { 

    private final Callable<K> callable; 
    private final PreservableContexts contexts; 

    public ContextCallable(Callable<K> actual) { 
     this.callable = actual; 
     this.contexts = new PreservableContexts(); 
    } 

    @Override 
    public K call() throws Exception { 
     contexts.set(); 
     try { 
      return callable.call(); 
     } finally { 
      contexts.clear(); 
     } 
    } 
} 

... über eine Hilfsklasse Kontext der Lage ist, die Erhaltung der Zipkin ist ...

public class PreservableContexts { 

    private final TraceContext traceContext; 

    public PreservableContexts() { 
     this.traceContext = TraceContextHolder.getContext(); 
    } 

    public void set() { 
     if (traceContext != null) { 
      TraceContextHolder.setContext(traceContext); 
     } 
    } 

    public void clear() { 
     TraceContextHolder.clearContext(); 
    } 

} 

... und eine einfache Methode zum Hinzufügen anderer Kontexte erlauben, die Sie zB beibehalten möchten MDC, SecurityContext etc ...

+0

Ich erstellte eine benutzerdefinierte ConcurrentStrategy-Klasse und fügte Protokolle hinzu, um zu überprüfen, dass es aufgerufen wird, aber keine Protokolle gedruckt werden. Ich habe die Frage bearbeitet, da es schwierig ist, sie in einem Kommentar zu formatieren. Irgendwelche Ideen? Auch das Javadoc von HystrixConcurrenyStrategy erwähnt seine Verwendung mit HystrixCommand und nicht HystrixObservableCommand - " " 'Zum Beispiel ruft jedes Callable, das von HystrixCommand ausgeführt wird, wrapCallable (Callable) auf, um eine Chance für benutzerdefinierte Implementierungen zu bieten, das Callable mit zusätzlichem Verhalten zu dekorieren. '' ' Funktioniert die benutzerdefinierte ConcurrentStrategy für beide? – user2237511

+0

Angenommen, Sie verwenden die [Strategie zur Thread-Isolierung] (https://github.com/Netflix/Hystrix/wiki/How-it-Works#threads--thread-pools), dann die [Concurrency-Strategie] (https: // github com/Netflix/Hystrix/wiki/Plugins # concurrenciestrategy) sollte angewendet werden ... Ich bin mir nicht sicher über Semaphore Isolation, aber es sieht nicht so aus, als ob Sie sich dafür entscheiden. Ich habe das gerade lokal getestet und Observable-Befehle lösen das Plugin aus. Ich denke, das Problem, das Sie haben, ist, wie Sie die Befehle aufrufen. Statt 'teamCommand.construct()' versuche 'teamCommand.execute()' oder 'teamCommand.queue()'. – Mike

+0

Anweisungen für [synchrone Ausführung] (https://github.com/Netflix/Hystrix/wiki/How-To-Use#synchronous-execution) und [Asynchrone Ausführung] (https://github.com/Netflix/Hystrix/ wiki/How-To-Use # asynchrone Ausführung) finden Sie auf der Seite [How To Use] (https://github.com/Netflix/Hystrix/wiki/How-To-Use). – Mike