2017-01-22 4 views
2

Ich benutze Alpakka-FTP, aber vielleicht suche ich nach einem allgemeinen Akka-Stream-Muster. Der FTP-Anschluss können Dateien auflisten oder sie abrufen:Akka Streams, Quellelemente als weitere Quelle?

def ls(host: String): Source[FtpFile, NotUsed] 
def fromPath(host: String, path: Path): Source[ByteString, Future[IOResult]] 

Im Idealfall würde Ich mag einen Strom wie diese erstellen:

LIST 
    .FETCH_ITEM 
    .FOREACH(do something) 

Aber ich bin nicht in der Lage, eine solche Strom mit den beiden Funktionen zu erstellen Ich habe oben geschrieben. Ich fühle mich wie ich in der Lage sein, es sollte ein Flow, so etwas wie

Ftp.ls 
    .via(some flow that uses the Ftp.fromPath above) 
    .runWith(Sink.foreach(do something)) 

Ist dies möglich, nur die ls und fromPath Funktionen, die oben angegeben erhalten mit?

EDIT:

Ich bin in der Lage, es zu trainieren mit einem Schauspieler und mapAsync, aber ich fühle mich immer noch sollte es einfacher sein.

class Downloader extends Actor { 
    override def receive = { 
    case ftpFile: FtpFile => 
     Ftp.fromPath(Paths.get(ftpFile.path), settings) 
     .toMat(FileIO.toPath(Paths.get("testHDF.txt")))(Keep.right) 
     .run() pipeTo sender 
    } 
} 

val downloader = as.actorOf(Props(new Downloader)) 

Ftp.ls("test_path", settings) 
    .mapAsync(1)(ftpFile => (downloader ? ftpFile) (3.seconds).mapTo[IOResult]) 
    .runWith(Sink.foreach(res => println("got it!" + res))) 

Antwort

0

Sie sollten flatMapConcat für diesen Zweck verwenden können. Ihr spezielles Beispiel neu geschrieben here zu

Ftp.ls("test_path", settings).flatMapConcat{ ftpFile => 
    Ftp.fromPath(Paths.get(ftpFile.path), settings) 
}.runWith(FileIO.toPath(Paths.get("testHDF.txt"))) 

Dokumentation werden kann.

Verwandte Themen