Ich habe eine Liste von Dateien. Ich möchte:Akka Streams: Mehrere Dateien lesen
- Um von allen von ihnen als eine einzige Quelle zu lesen.
- Dateien sollten der Reihe nach nacheinander gelesen werden. (kein Round-Robin)
- Zu keinem Zeitpunkt sollte eine Datei vollständig im Speicher sein.
- Ein Fehler beim Lesen einer Datei sollte den Stream zusammenbrechen.
Es fühlte sich wie das funktionieren sollte: (Scala, akka Ströme v2.4.7)
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
)
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_)))
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
Aber das führt zu einem Compiler-Fehler, da FileIO
hat eine materialisierte Wert zugeordnet, und Source.combine
doesn unterstütze das nicht.
Mapping der materialisierten Wert weg macht mich fragen, wie Datei-Lesefehler behandelt, aber nicht kompiliert:
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
.mapMaterializedValue(f => NotUsed.getInstance())
)
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_)))
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
Aber löst eine Illegal zur Laufzeit:
java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out]
Ich war auf der Suche nach Modular, das weiß ich zu schätzen. Ich habe die Zeilenanzahl als Beispiel für etwas verwendet, was ich mit den Dateien machen konnte, und "lineCounter", wie geschrieben, verknüpft das mit dem Lesen der Datei. (Es ist eine Senke) Aber wenn ich die Falte und alles danach an anderer Stelle verschiebe, bin ich mit einem Flow [Pfad, String, NotUsed] zurück, der genau das Stück ist, nach dem ich gesucht habe. – randomstatistic
Könnten Sie bitte Importe mit Ihren Beispielen bereitstellen, sie sind ein wesentlicher Teil des Codes. –
@OsskarWerrewka Es sollte alles in akka.stream.Scaladsl und Java IO/NIO sein. Hattest du ein Problem damit? –