2016-04-20 2 views
2

Ich habe einen Integrationsfluss, bei dem einige der Schritte asynchron und einige synchron sind. Ich möchte die barrier verwenden, um den Thread Main zu blockieren, bis alle asynchronen Tasks abgeschlossen sind. Basierend auf der Dokumentation gibt es zwei Möglichkeiten, die Barriere zu verwenden.Verwenden der Sperre, um auf den Abschluss des Integrationsflusses zu warten

  1. Eine zweite Auslösenachricht an den Eingangskanal der Schranke senden.
  2. die Trigger Methode Invoke manuell der Barriere

In meinem Anwendungsfall eine Meldung in der Strömung kommt und geht dann durch mehrere Komponenten, bis sie den completed Kanal erreicht. Ich möchte, dass der Haupt-Thread blockiert wird, bis die ursprünglichen Nachrichten den abgeschlossenen Kanal erreichen. Daher erscheint es sinnvoll, die Option # 2 zu verwenden und die Barrier-Trigger-Methode aufzurufen, nachdem der completed-Status erreicht wurde. Das scheint nicht zu funktionieren. Hier ist eine vereinfachte Version meines Flow.

<int:gateway 
    service-interface="...BarrierGateway" 
    id="barrierGateway" default-request-channel="input"> 
</int:gateway> 

<int:channel id="input"> 
    <int:dispatcher task-executor="executor" /> 
</int:channel> 

<int:service-activator input-channel="input" output-channel="completed"> 
    <bean class="...BarrierSA" /> 
</int:service-activator> 

<int:channel id="completed" /> 
<int:service-activator input-channel="completed" 
    ref="barrier1.handler" method="trigger" /> 

<int:barrier id="barrier1" input-channel="input" timeout="10000" /> 

ich eine Nachricht an die gateway senden, die es den input Kanal verläuft, der einen dispatcher so einen neuen Thread verwendet wird gestartet, nach vorne um die Nachricht zu übergeben. An dieser Stelle möchte ich den main Thread blockieren, während der Executor-1 Thread den Fluss durchläuft. Der Rest des Flusses ist einfach. Mein service-activator schläft für 3 Sekunden, bevor die Nachricht zurückgegeben wird, um eine Verzögerung zu simulieren. Sobald die Nachricht im Kanal completed empfangen wurde, sollte der Service-Aktivator die Methode barrier trigger aufrufen und nur zu diesem Zeitpunkt sollte der Haupt-Thread freigegeben werden. Stattdessen wird der Hauptthread freigegeben, nachdem der Dispatcher einen neuen Thread gestartet hat. Ich habe versucht, eine konstante Korrelationsidentifikation ('abc') zu spezifizieren, aber das half nicht.

Antwort

2

Ich sehe Sie sind in einer Falle gefangen.

Die <int:barrier> suspendiert den Thread nur auf die Nachricht Nachricht, sondern nur den Thread, der diese Nachricht an ihn bringt. Mit Blick auf Ihre Konfiguration ist es das gleiche input Kanal mit Executor. Der Zweck der ExecutorChannel, Nachricht an einen anderen Thread zu verschieben, aber den Thread des Aufrufers nicht zu unterbrechen.

Von anderer Seite haben Sie einen weiteren Fehler um diese input. Sie deklarieren zwei Abonnenten für ihn, wobei nur einer von ihnen durch die Balancierungsstrategie aufgerufen wird.

Um Ihre Aufgabe zu beheben, sollten wir einen weiteren Top-Level-Kanal als <publish-subscribe-channel> haben. Und richtig, schon jetzt können Sie zwei Abonnenten haben.

Einer von ihnen sollte ein <bridge> zu Ihrem inputExecutorChannel sein. Und ein anderer der gewünschte <barrier>. Und nur jetzt kann es den Haupt-Thread von der <gateway> aussetzen (in Ihren Begriffen blockieren).

Von anderen Seite wäre die einfachere Lösung, die <barrier> überhaupt nicht zu verwenden. Die <gateway> hat die Möglichkeit, den Thread des Anrufers zu blockieren und auf eine Antwort zu warten. Das funktioniert natürlich, wenn die Gateway-Methoden nicht void sind.

Und noch ein Punkt auf Ihre config: Wenn Sie für die Antwort im Gateway warten Sie nicht, wird die <barrier> mit dem fehler

throw new DestinationResolutionException("no output-channel or replyChannel header available"); 

Also, betrachten etwas als output-channel es gut zu nutzen.

+0

Die Barriere ist für anspruchsvollere Umgebungen; Für einen einfachen Anwendungsfall, wie @Artem sagt, wird der Hauptthread im Gateway ausgesetzt (wenn es einen Rückgabetyp von der Methode gibt), bis der Fluss endet (unabhängig von Async-Handoffs) oder bis die Antwort Timeout "ist überschritten - das ist standardmäßig Unendlich. Wenn Sie einen Leerverkauf haben, dann ist die Barriere die richtige Lösung, aber jede Seite der Barriere muss auf einem separaten Thread laufen, wie Artem sagt. –

Verwandte Themen