2017-08-18 1 views
2

lesen Ich möchte viele .CSV-Dateien in einem Ordner asynchron lesen und eine Iterable einer benutzerdefinierten Fallklasse zurückgeben.Mehrere Dateien asynchron mit Akka Streams, Scala

Kann ich das mit Akka Streams und How erreichen?

* Ich habe versucht, irgendwie den Auftrag zur Balance nach Dokumentation, aber es ist ein wenig durch schwer zu verwalten ...

Oder

Ist es eine gute Praxis, statt Schauspieler zu benutzen? (Ein Elternteil Darsteller mit Kindern, jedes Kind, um eine Datei zu lesen, und ein Iterable an Eltern zurückgeben, und Eltern alle Iterables kombinieren?)

+0

Frage ist nicht ganz klar. 1. Möchten Sie eine einzelne Iterable einer benutzerdefinierten Fallklasse für alle CSV-Dateien oder eine für jede CSV-Datei zurückgeben? 2. Was ist, wenn es Tausende von Dateien gibt, möchten Sie sie alle gleichzeitig lesen, oder wollen Sie nur ein gewisses Maß an Parallelität? –

Antwort

1

vor allem müssen Sie lesen/lernen, wie Akka Stream funktioniert, mit Source, Flow und Sink. Dann können Sie anfangen, die Operatoren zu lernen.

Um mehrere Aktionen parallel zu machen, können Sie den Operator mapAsync verwenden, in dem Sie die Anzahl der Parallelität angeben.

/** 
    * Using mapAsync operator, we pass a function which return a Future, the number of parallel run futures will 
    * be determine by the argument passed to the operator. 
    */ 
    @Test def readAsync(): Unit = { 
    Source(0 to 10)//-->Your files 
     .mapAsync(5) { value => //-> It will run in parallel 5 reads 
     implicit val ec: ExecutionContext = ActorSystem().dispatcher 
     Future { 
      //Here read your file 
      Thread.sleep(500) 
      println(s"Process in Thread:${Thread.currentThread().getName}") 
      value 
     } 
     } 
     .runWith(Sink.foreach(value => println(s"Item emitted:$value in Thread:${Thread.currentThread().getName}"))) 
    } 

Sie können mehr über akka lernen und Akka Stream hier https://github.com/politrons/Akka

1

Meistens sind die gleichen wie @ Paul Antwort, aber mit kleinen Verbesserungen

def files = new java.io.File("").listFiles().map(_.getAbsolutePath).to[scala.collection.immutable.Iterable] 

Source(files).flatMapConcat(filename => //you could use flatMapMerge if you don't bother about line ordering 
    FileIO.fromPath(Paths.get(filename)) 
     .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true).map(_.utf8String)) 
).map { csvLine => 
    // parse csv here 
    println(csvLine) 
    }