2017-05-07 2 views
1

Ich habe ein Diagramm, das eine Sequenz von Dateien akzeptiert, verarbeitet sie nacheinander und am Ende der Ausführung sollte das Programm Erfolg (0) oder Fehler (-1) zurückgeben, wenn alle Ausführungen erfolgreich waren oder fehlgeschlagen.
Wie konnte dieser letzte Schritt erreicht werden? Wie kann der Sink wissen, wenn er das Ergebnis für die letzte Datei erhält?Wie signalisiert man den Sink, wenn alle Elemente verarbeitet wurden?

val graph = createGraph("path-to-list-of-files") 
val result = graph.run() 

def createGraph(fileOrPath: String): RunnableGraph[NotUsed] = { 
    printStage("PREPARING") { 
    val producer: Source[ProducerFile, NotUsed] = Producer(fileOrPath).toSource() 
    val validator: Flow[ProducerFile, ProducerFile, NotUsed] = Validator().toFlow() 
    val provisioner: Flow[ProducerFile, PrivisionerResult, NotUsed] = Provisioner().toFlow() 
    val executor: Flow[PrivisionerResult, ExecutorResult, NotUsed] = Executor().toFlow() 
    val evaluator: Flow[ExecutorResult, EvaluatorResult, NotUsed] = Evaluator().toFlow() 
    val reporter: Sink[EvaluatorResult, Future[Done]] = Reporter().toSink() 

    val graphResult = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 
    import GraphDSL.Implicits._ 
    producer ~> validator ~> provisioner ~> executor ~> evaluator ~> reporter 

    ClosedShape 
    }) 
    printLine("The graph pipeline was created") 
    graphResult 
} 

Antwort

1

Ihre reporter Sink materialisiert bereits zu einem Future[Done], die Sie anschließen können, wenn Sie wollen einige Code ausgeführt werden, wenn alle Ihre Elemente bearbeitet haben.

Im Moment werden sie jedoch nicht in Ihrem Diagramm angezeigt. Zwar gibt es eine Möglichkeit, es zu belichten ist die Grafik DSL verwenden, in Ihrem Fall ist es noch einfacher, die fließend DSL zu verwenden, um dies zu erreichen:

val graphResult: RunnableGraph[Future[Done]] = producer 
    .via(validator) 
    .via(provisioner) 
    .via(executor) 
    .via(evaluator) 
    .toMat(reporter)(Keep.right) 

Dies wird Ihnen wieder die Future[Done], wenn Sie Ihr Diagramm laufen

val result: Future[Done] = graph.run() 

was dann anhängen kann - zB

result.onComplete { 
    case Success(_) => println("Success!") 
    case Failure(_) => println("Failure..") 
} 
Verwandte Themen