2016-06-13 8 views
5

Ich habe eine Liste von Dateien. Ich möchte:Akka Streams: Mehrere Dateien lesen

  1. Um von allen von ihnen als eine einzige Quelle zu lesen.
  2. Dateien sollten der Reihe nach nacheinander gelesen werden. (kein Round-Robin)
  3. Zu keinem Zeitpunkt sollte eine Datei vollständig im Speicher sein.
  4. Ein Fehler beim Lesen einer Datei sollte den Stream zusammenbrechen.

Es fühlte sich wie das funktionieren sollte: (Scala, akka Ströme v2.4.7)

val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath) 
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true)) 
    .map(bs => bs.utf8String) 
) 
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_))) 
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines 

Aber das führt zu einem Compiler-Fehler, da FileIO hat eine materialisierte Wert zugeordnet, und Source.combine doesn unterstütze das nicht.

Mapping der materialisierten Wert weg macht mich fragen, wie Datei-Lesefehler behandelt, aber nicht kompiliert:

val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath) 
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true)) 
    .map(bs => bs.utf8String) 
    .mapMaterializedValue(f => NotUsed.getInstance()) 
) 
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_))) 
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines 

Aber löst eine Illegal zur Laufzeit:

java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out] 

Antwort

8

Der folgende Code ist nicht so kurz, wie es sein könnte, um die verschiedenen Probleme klar zu modularisieren.

// Given a stream of bytestrings delimited by the system line separator we can get lines represented as Strings 
val lines = Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true).map(bs => bs.utf8String) 

// given as stream of Paths we read those files and count the number of lines 
val lineCounter = Flow[Path].flatMapConcat(path => FileIO.fromPath(path).via(lines)).fold(0l)((count, line) => count + 1).toMat(Sink.head)(Keep.right) 

// Here's our test data source (replace paths with real paths) 
val testFiles = Source(List("somePathToFile1", "somePathToFile2").map(new File(_).toPath)) 

// Runs the line counter over the test files, returns a Future, which contains the number of lines, which we then print out to the console when it completes 
testFiles runWith lineCounter foreach println 
+0

Ich war auf der Suche nach Modular, das weiß ich zu schätzen. Ich habe die Zeilenanzahl als Beispiel für etwas verwendet, was ich mit den Dateien machen konnte, und "lineCounter", wie geschrieben, verknüpft das mit dem Lesen der Datei. (Es ist eine Senke) Aber wenn ich die Falte und alles danach an anderer Stelle verschiebe, bin ich mit einem Flow [Pfad, String, NotUsed] zurück, der genau das Stück ist, nach dem ich gesucht habe. – randomstatistic

+0

Könnten Sie bitte Importe mit Ihren Beispielen bereitstellen, sie sind ein wesentlicher Teil des Codes. –

+1

@OsskarWerrewka Es sollte alles in akka.stream.Scaladsl und Java IO/NIO sein. Hattest du ein Problem damit? –

-1

habe ich eine Antwort aus dem Tor - nicht akka.FileIO verwenden. Dies scheint zum Beispiel in Ordnung, zu arbeiten:

val sources = Seq("sample.txt", "sample2.txt").map(io.Source.fromFile(_).getLines()).reduce(_ ++ _) 
val source = Source.fromIterator[String](() => sources) 
val lineCount = source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) 

ich noch wissen möchte, ob es eine bessere Lösung.

+0

Durch die Verwendung von 'io.Source' verlieren Sie viel Macht. Für kleine Dateien könnte das funktionieren, aber für große Dateien ist das keine Option. – jarandaf

+0

@jarandaf Können Sie klären? Ich hatte den Eindruck, dass io.Source gerade einen BufferedReader unter der Haube verwendet hat und der getLines-Iterator nicht die ganze Datei auf einmal lädt. – randomstatistic

+0

besser gedacht, Sie könnten Recht haben (obwohl 'FileIO'' ByteString' anstelle von 'String' behandelt, das leistungsfähiger sein soll). Bei "io.Source" muss man immer daran denken, die Quelle zu schließen (was nicht standardmäßig gemacht wird). – jarandaf

2

aktualisieren Oh, habe ich sehe die akzeptierte Antwort nicht, weil ich nicht auf die Seite haben aufzufrischen> _ <. Ich werde das hier trotzdem lassen, da ich auch einige Hinweise zur Fehlerbehandlung hinzugefügt habe.

Ich glaube, das folgende Programm tut, was Sie wollen:

import akka.NotUsed 
import akka.actor.ActorSystem 
import akka.stream.{ActorMaterializer, IOResult} 
import akka.stream.scaladsl.{FileIO, Flow, Framing, Keep, Sink, Source} 
import akka.util.ByteString 
import scala.concurrent.{Await, Future} 
import scala.util.{Failure, Success} 
import scala.util.control.NonFatal 
import java.nio.file.Paths 
import scala.concurrent.duration._ 

object TestMain extends App { 
    implicit val actorSystem = ActorSystem("test") 
    implicit val materializer = ActorMaterializer() 
    implicit def ec = actorSystem.dispatcher 

    val sources = Vector("build.sbt", ".gitignore") 
    .map(Paths.get(_)) 
    .map(p => 
     FileIO.fromPath(p) 
     .viaMat(Framing.delimiter(ByteString(System.lineSeparator()), Int.MaxValue, allowTruncation = true))(Keep.left) 
     .mapMaterializedValue { f => 
      f.onComplete { 
      case Success(r) if r.wasSuccessful => println(s"Read ${r.count} bytes from $p") 
      case Success(r) => println(s"Something went wrong when reading $p: ${r.getError}") 
      case Failure(NonFatal(e)) => println(s"Something went wrong when reading $p: $e") 
      } 
      NotUsed 
     } 
    ) 
    val finalSource = Source(sources).flatMapConcat(identity) 

    val result = finalSource.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) 
    result.onComplete { 
    case Success(n) => println(s"Read $n lines total") 
    case Failure(e) => println(s"Reading failed: $e") 
    } 
    Await.ready(result, 10.seconds) 

    actorSystem.terminate() 
} 

Der Schlüssel hier ist die flatMapConcat() Methode: Es verwandelt jedes Element eines Stroms in eine Quelle und gibt einen Strom von Elementen von diesen Quellen ergibt sich, wenn Sie werden nacheinander ausgeführt.

Wie für den Umgang mit Fehlern, können Sie entweder einen Handler für die Zukunft in dem mapMaterializedValue Argumente hinzufügen, oder Sie können, indem Sie einen Handler die endgültigen Fehler des laufenden Stream verarbeiten auf den Sink.foreach zukünftigen Wert materialisierte. Ich habe beides im obigen Beispiel getan, und wenn Sie es testen, sagen wir, bei einer nicht vorhandenen Datei, werden Sie sehen, dass die gleiche Fehlermeldung zweimal gedruckt wird. Leider sammelt flatMapConcat() keine materialisierten Werte, und ehrlich gesagt kann ich nicht sehen, wie es vernünftig funktioniert, daher müssen Sie, falls nötig, separat damit umgehen.

Verwandte Themen