2

Ich habe eine Sequenz bekommt deckeln (von File.walkTopDown) und ich brauche eine lang andauernde Operation auf jeden von ihnen laufen. Ich würde gerne Kotlin-Best-Practices/-Coutines verwenden, aber ich bekomme entweder keine Parallelität oder viel zu viel Parallelität und treffe einen IO-Fehler "zu viele offene Dateien".wie Kotlin Koroutinen maximale Concurrency

File("/Users/me/Pictures/").walkTopDown() 
    .onFail { file, ex -> println("ERROR: $file caused $ex") } 
    .filter { ... only big images... } 
    .map { file -> 
     async { // I *think* I want async and not "launch"... 
      ImageProcessor.fromFile(file) 
     } 
    } 

Dies scheint nicht parallel zu laufen, und meine Multi-Core-CPU geht nie über 1 CPU Wert. Gibt es einen Weg mit Coroutinen, um "NumberOfCores-Paralleloperationen" im Sinne von "Deferred Jobs" durchzuführen?

Ich schaute auf Multithreading using Kotlin Coroutines, die zuerst alle Jobs erstellt, dann verbindet sie, aber das bedeutet, die Sequenz/Datei Baum gehen komplett vor der schweren Verarbeitung Join Schritt, und das scheint ... iffy! Wenn man sie in einen Sammel- und einen Prozessschritt aufteilt, könnte die Sammlung der Verarbeitung weit voraus sein.

val jobs = ... the Sequence above... 
    .toSet() 
println("Found ${jobs.size}") 
jobs.forEach { it.await() } 

Antwort

0

ich habe es mit einem Kanal arbeiten. Aber vielleicht bin ich mit deinem Weg überflüssig?

val pipe = ArrayChannel<Deferred<ImageFile>>(20) 
launch { 
    while (!(pipe.isEmpty && pipe.isClosedForSend)) { 
     imageFiles.add(pipe.receive().await()) 
    } 
    println("pipe closed") 
} 
File("/Users/me/").walkTopDown() 
     .onFail { file, ex -> println("ERROR: $file caused $ex") } 
     .forEach { pipe.send(async { ImageFile.fromFile(it) }) } 
pipe.close() 
1

Das Problem mit Ihrem ersten Schnipsel ist, dass es überhaupt nicht läuft - denken Sie daran, Sequence faul ist, und Sie haben einen Terminalbetrieb wie toSet() oder forEach() zu verwenden. Darüber hinaus müssen Sie die Anzahl der Threads begrenzen, die über den Aufbau einen newFixedThreadPoolContext Kontext für diese Aufgabe verwendet werden können, und deren Verwendung in async:

val pictureContext = newFixedThreadPoolContext(nThreads = 10, name = "reading pictures in parallel") 

File("/Users/me/Pictures/").walkTopDown() 
    .onFail { file, ex -> println("ERROR: $file caused $ex") } 
    .filter { ... only big images... } 
    .map { file -> 
     async(pictureContext) { 
      ImageProcessor.fromFile(file) 
     } 
    } 
    .toList() 
    .forEach { it.await() } 

Edit: Sie haben einen Terminalbetreiber verwenden (toList) vor dem die Ergebnisse wartet

+0

Ich dachte, das würde funktionieren, aber es scheint immer noch das Finale forEach sequentiell zu verarbeiten. z.B. .map {Datei -> async (CommonPool) { println ("Start") val img = ImageFile.fromFile (Datei) println ("end") img } } .forEach { imageFiles.add (it.await()) if (Math.random()> 0.999) { imageFiles.save() } } –

+0

Oh, Snap, Sie haben Recht. Jetzt denke ich, dass es keine Möglichkeit gibt, dies mit Sequenzen zu tun. Bearbeitete die Antwort – voddan