2017-09-01 1 views
3

Hier ist die einfachste grafische Darstellung unter Verwendung eines Partition und Merge, die ich tun konnte, aber bei der Ausführung gibt es den folgenden Fehler: requirement failed: The inlets [] and outlets [] must correspond to the inlets [Merge.in0, Merge.in1] and outlets [Partition.out0, Partition.out1]Akka Streams Einlässe und Auslässe entsprechen

Ich verstehe, dass die Meldung besagt, dass ich entweder haben mehr Ausgaben als Eingänge oder einen nicht verbundenen Fluss, aber ich kann nicht scheinen, in diesem einfachen Beispiel zu sehen, wo die Nichtübereinstimmung ist.

Jede Hilfe wird geschätzt.

Die Grafik:

def createGraph()(implicit actorSystem: ActorSystem): Graph[ClosedShape, Future[Done]] = { 
     GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s => 
      import GraphDSL.Implicits._ 
      val inputs: List[Int] = List(1, 2, 3, 4) 
      val source: Source[Int, NotUsed] = Source(inputs) 

      val messageSplit: UniformFanOutShape[Int, Int] = builder.add(Partition[Int](2, i => i%2)) 

      val messageMerge: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2)) 

      val processEven: Flow[Int, Int, NotUsed] = Flow[Int].map(rc => { 
      actorSystem.log.debug(s"even: $rc") 
      rc 
      }) 

      val processOdd: Flow[Int, Int, NotUsed] = Flow[Int].map(rc => { 
      actorSystem.log.debug(s"odd: $rc") 
      rc 
      }) 

      source ~> messageSplit.in 
      messageSplit.out(0) -> processEven -> messageMerge.in(0) 
      messageSplit.out(1) -> processOdd -> messageMerge.in(1) 
      messageMerge.out ~> s 
      ClosedShape 
     } 
    } 

Der Test:

import akka.actor.ActorSystem 
import akka.stream._ 
import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Partition, RunnableGraph, Sink, Source} 
import akka.{Done, NotUsed} 
import org.scalatest.FunSpec 

import scala.concurrent.Future 
class RoomITSpec extends FunSpec { 

    implicit val actorSystem: ActorSystem = ActorSystem("RoomITSpec") 
    implicit val actorCreator: ActorMaterializer = ActorMaterializer() 
    describe("graph") { 
    it("should run") { 
     val graph = createGraph() 
     RunnableGraph.fromGraph(graph).run 
    } 
    } 
} 

Antwort

1

Kleine syntaktische Fehler.

// Notice the curly arrows 
messageSplit.out(0) ~> processEven ~> messageMerge.in(0) 
messageSplit.out(1) ~> processOdd ~> messageMerge.in(1) 

statt, was Sie geschrieben:

// Straight arrows 
messageSplit.out(0) -> processEven -> messageMerge.in(0) 
messageSplit.out(1) -> processOdd -> messageMerge.in(1) 

Sie am Ende zu erzeugen (und Wegwerfen) -Tupeln stattdessen auf das Diagramm der Zugabe.

+0

Vielen Dank! –