2015-05-06 4 views
5

Ich versuche, einen akka Stream Flow (in Java DSL) mit 2 Akteuren als Quellen, dann einer Merge-Kreuzung und dann 1 Waschbecken zu erstellen und auszuführen:Wie können mehrere Actors als Quellen an einen Akka-Stream angehängt werden?

Source<Integer, ActorRef> src1 = Source.actorRef(100, OverflowStrategy.backpressure()); 
    Source<Integer, ActorRef> src2 = Source.actorRef(100, OverflowStrategy.backpressure()); 
    Sink<Integer, BoxedUnit> sink = Flow.of(Integer.class).to(Sink.foreach(System.out::println)); 

    RunnableFlow<BoxedUnit> closed = FlowGraph.factory().closed(sink, (b, out) -> { 
     UniformFanInShape<Integer, Integer> merge = b.graph(Merge.<Integer>create(2)); 
     b.from(src1).via(merge).to(out); 
     b.from(src2).to(merge); 
    }); 

    closed.run(mat); 

Meine Frage ist, wie erhalte ich ActorRef Verweise auf die Quell-Akteure, um ihnen Nachrichten zu senden? Im Fall von 1 Akteur würde ich keinen Graphengenerator verwenden, und dann würde die Methode .run() oder runWith() das ActorRef-Objekt zurückgeben. Aber was tun bei vielen Quellschauspielern? Ist es überhaupt möglich, einen solchen Fluss zu realisieren?

+0

Sie müssen die Elemente passieren, für die Sie zugreifen müssen der materialisierte Wert zu "closed" und dann eine Funktion bereitstellen, die die materialisierten Werte kombiniert. So etwas wie: 'closed (src1, src2, (actorRef1, actorRef2) -> SomethingContainingBothActorRefs, (b, s1, s2) -> ...)' – jrudolph

+0

Danke, jrudolph. – dev4ever44

Antwort

6

Ich beantworte meine eigene Frage für den Fall, dass jemand es braucht.

Mit jrudolph Rat konnte ich Schauspieler wie diese verwenden (im eigentlichen Code habe ich etwas Schöneres, als eine Liste von 2 ActorRefs):

Source<Integer, ActorRef> src1 = Source.actorRef(100, OverflowStrategy.fail()); 
    Source<Integer, ActorRef> src2 = Source.actorRef(100, OverflowStrategy.fail()); 
    Sink<Integer, BoxedUnit> sink = Flow.of(Integer.class).to(Sink.foreach(System.out::println)); 

    RunnableFlow<List<ActorRef>> closed = FlowGraph.factory().closed(src1, src2, (a1, a2) -> Arrays.asList(a1, a2), (b, s1, s2) -> { 
     UniformFanInShape<Integer, Integer> merge = b.graph(Merge.<Integer>create(2)); 
     b.from(s1).via(merge).to(sink); 
     b.from(s2).to(merge); 
    }); 

    List<ActorRef> stream = closed.run(mat); 
    ActorRef a1 = stream.get(0); 
    ActorRef a2 = stream.get(1); 
Verwandte Themen