2009-11-17 8 views
15

Ich habe eine Iteration vals: Iterable[T] und eine lang andauernde Funktion ohne relevante Nebenwirkungen: f: (T => Unit). Im Moment ist dies vals in der offensichtlichen Weise angewandt:Concurrent map/foreach in scala

vals.foreach(f)

ich die Anrufe f möchte gleichzeitig zu tun (in Grenzen). Gibt es irgendwo in der Scala-Basisbibliothek eine offensichtliche Funktion? Etwas wie:

Concurrent.foreach(8 /* Number of threads. */)(vals, f)

Während f ziemlich lange läuft, ist es kurz genug, dass ich will nicht den Aufwand für einen Thread für jeden Anruf aufgerufen wird, so dass ich bin auf der Suche nach etwas, basierend auf einem Thread-Pool.

Antwort

10

Ich mag die Futures Antwort. Während es jedoch gleichzeitig ausgeführt wird, wird es auch asynchron zurückgegeben, was wahrscheinlich nicht das ist, was Sie wollen. Der richtige Ansatz wäre wie folgt:

import scala.actors.Futures._ 

vals map { x => future { f(x) } } foreach { _() } 
+8

Seien Sie vorsichtig, dass 'Vals' eine strikte Sammlung ist - wenn es faul ist (und in Scala 2.7 dies beinhaltet die' Range' Klasse), werden die Futures erst erstellt, wenn jeder von 'foreach' und nichts wird parallel passieren. –

+0

Ich nehme an, wir könnten dieses Problem lösen, indem wir einen weiteren "foreach" -Ruf zwischen der "map" und der aktuellen "foreach" einfügen. Also: 'vals map {x => Zukunft {f (x)}} foreach {x => x} foreach {_()}' –

+0

Das wäre eine Karte, die wir injizieren müssen, keine weitere foreach? Und mir ist nicht klar, dass die Karte einer faulen Sammlung streng ist. Der sicherste Weg ist, nachArray zu rufen. –

3

Ich hatte einige Probleme mit scala.actors.Futures in Scala 2.8 (es war fehlerhaft, als ich überprüfte). Mit Java-Libs für mich direkt gearbeitet, aber:

final object Parallel { 
    val cpus=java.lang.Runtime.getRuntime().availableProcessors 
    import java.util.{Timer,TimerTask} 
    def afterDelay(ms: Long)(op: =>Unit) = new Timer().schedule(new TimerTask {override def run = op},ms) 
    def repeat(n: Int,f: Int=>Unit) = { 
    import java.util.concurrent._ 
    val e=Executors.newCachedThreadPool //newFixedThreadPool(cpus+1) 
    (0 until n).foreach(i=>e.execute(new Runnable {def run = f(i)})) 
    e.shutdown 
    e.awaitTermination(Math.MAX_LONG, TimeUnit.SECONDS) 
    } 
} 
2

ich scala.actors.Futures verwenden würde:

vals.foreach(t => scala.actors.Futures.future(f(t))) 
13

Scalaz hat parMap. Sie würden es wie folgt verwenden:

import scalaz.Scalaz._ 
import scalaz.concurrent.Strategy.Naive 

Dieser jeden Funktors ausstatten (einschließlich Iterable) mit einer parMap Methode, so dass Sie nur tun können:

vals.parMap(f) 

Sie erhalten auch parFlatMap, parZipWith usw.

2

Die neueste Version von Functional Java verfügt über einige Funktionen für die Parallelität, die Sie verwenden können.

import fjs.F._ 
import fj.control.parallel.Strategy._ 
import fj.control.parallel.ParModule._ 
import java.util.concurrent.Executors._ 

val pool = newCachedThreadPool 
val par = parModule(executorStrategy[Unit](pool)) 

Und dann ...

par.parMap(vals, f) 

zu shutdown Denken Sie daran, die pool.

0

Sie können die Parallel Collections aus der Scala-Standardbibliothek verwenden. Sie sind wie gewöhnliche Sammlungen, aber ihre Operationen laufen parallel. Sie müssen nur einen par Aufruf setzen, bevor Sie einige Auflistungsoperationen aufrufen.

import scala.collection._ 

val array = new Array[String](10000) 
for (i <- (0 until 10000).par) array(i) = i.toString 
9

Viele der Antworten aus dem Jahr 2009 noch die alte scala.actors.Futures._ verwenden, die in der neueren Scala nicht mehr. Während Akka der bevorzugte Weg ist, ist ein viel besser lesbarer Weg, nur parallel zu verwenden (.par) Sammlungen:

vals.foreach { v => f(v) }

wird

vals.par.foreach { v => f(v) }

Alternativ parMap Verwendung mit dem Vorbehalt, obwohl prägnanter erscheinen können, die Sie sich merken müssen, * die übliche Scalaz zu importieren. Wie immer gibt es in Scala mehr als eine Möglichkeit, dasselbe zu tun!

+0

Dies sollte jetzt die akzeptierte Antwort sein –