Ich habe ein Diagramm, das eine Sequenz von Dateien akzeptiert, verarbeitet sie nacheinander und am Ende der Ausführung sollte das Programm Erfolg (0) oder Fehler (-1) zurückgeben, wenn alle Ausführungen erfolgreich waren oder fehlgeschlagen.
Wie konnte dieser letzte Schritt erreicht werden? Wie kann der Sink wissen, wenn er das Ergebnis für die letzte Datei erhält?Wie signalisiert man den Sink, wenn alle Elemente verarbeitet wurden?
val graph = createGraph("path-to-list-of-files")
val result = graph.run()
def createGraph(fileOrPath: String): RunnableGraph[NotUsed] = {
printStage("PREPARING") {
val producer: Source[ProducerFile, NotUsed] = Producer(fileOrPath).toSource()
val validator: Flow[ProducerFile, ProducerFile, NotUsed] = Validator().toFlow()
val provisioner: Flow[ProducerFile, PrivisionerResult, NotUsed] = Provisioner().toFlow()
val executor: Flow[PrivisionerResult, ExecutorResult, NotUsed] = Executor().toFlow()
val evaluator: Flow[ExecutorResult, EvaluatorResult, NotUsed] = Evaluator().toFlow()
val reporter: Sink[EvaluatorResult, Future[Done]] = Reporter().toSink()
val graphResult = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
producer ~> validator ~> provisioner ~> executor ~> evaluator ~> reporter
ClosedShape
})
printLine("The graph pipeline was created")
graphResult
}