2013-06-18 11 views
12

Ist es irgendwie möglich, mit Scala's parallelen Sammlungen eine Iteratorohne zu parallelisieren, die es vorher vollständig auswertet?Paralleler Iterator in Scala

Hier spreche ich über die Parallelisierung der funktionalen Transformationen auf einem Iterator, nämlich map und flatMap. Ich denke, dies erfordert, einige Elemente der Iterator im Voraus zu bewerten und dann mehr zu berechnen, sobald einige über next verbraucht werden.

Alles, was ich finden könnte, würde erfordern, dass der Iterator bestenfalls zu Iterable oder Stream konvertiert wird. Die Stream wird dann komplett ausgewertet, wenn ich .par darauf anrufe.

Ich begrüße auch Vorschläge zur Umsetzung, wenn dies nicht ohne weiteres zur Verfügung steht. Implementierungen sollten parallel map und flatMap unterstützen.

+0

Die Antwort ist _probably no_ aber können Sie ein wenig mehr über das sagen, was Sie von diesem wollen? Insbesondere wann sollte die Berechnung beginnen - nachdem Sie den Iterator erstellt haben oder wenn Sie etwas aufrufen, das die Auswertung erzwingt? –

+0

@RexKerr Scheint wie eine Designwahl; aber wenn es bei der ersten Anfrage gestartet wird, ist die erste Anfrage etwas Besonderes. Ich versuche gerade, so etwas zu implementieren und ich entscheide mich sofort zu starten und die nächsten 'n' Ergebnisse zu speichern. Sobald einer verbraucht ist, berechne ich einen Ersatz. – ziggystar

Antwort

3

Ihre beste Wette mit der Standardbibliothek verwendet wahrscheinlich nicht parallel Sammlungen aber concurrent.Future.traverse:

import concurrent._ 
import ExecutionContext.Implicits.global 
Future.traverse(Iterator(1,2,3))(i => Future{ i*i }) 

obwohl ich denke, dass das Start das Ganze ausführen wird so bald wie möglich.

1

Vom ML, Verfahrgeschwindigkeit Iterator Elemente parallel:

https://groups.google.com/d/msg/scala-user/q2NVdE6MAGE/KnutOq3iT3IJ

zog ich Future.traverse aus einem ähnlichen Grund aus. Für meinen Anwendungsfall, bei dem N Jobs funktionieren, habe ich einen Code eingegeben, um den Ausführungskontext aus der Jobwarteschlange zu speisen.

Mein erster Versuch beinhaltete das Blockieren des Feeder-Threads, aber das riskierte auch das Blockieren von Tasks, die Tasks im Ausführungskontext erzeugen wollten. Was weißt du, Blockieren ist böse.

+0

Können Sie einen Kommentar abgeben, warum Sie '(NUM_CPUs + 1)^2' als Größe für die blockierende Warteschlange verwenden? – ziggystar

+0

Auch ich fand den harten Weg, dass 1. Ich bin nicht gut im gleichzeitigen Programmieren 2. 'flatMap' ist schwieriger. – ziggystar

+0

@ziggystar Mit "du" meinst du "Juha" auf der ML.Ich denke nicht, dass es eine magische Zahl ist: groß genug, damit der Konsument dem ursprünglichen Iterator (der möglicherweise I/O macht) und der Mapping-Funktion (CPU-gebunden, aber lang oder kurz) nicht voraus ist Laufen?). Ich sehe, dass die Zukunft, die die Warteschlange füttert, blockieren wird, ohne "blocking" zu nennen; vielleicht ist die +1 übrig von der "gewünschten Parallelität". Meine Lösung hatte das Ende der Pipelineüberprüfung für mehr Arbeit, d. H. Das Letzte, was ein Job tun würde, ist zu prüfen, ob genügend Jobs in Bearbeitung sind, und wenn nicht, füttere das Biest. Ich stimme zu, es ist schwer, Einfachheit ist der Schlüssel. –

0

Es ist ein bisschen schwer, genau zu folgen, was Sie nach, aber vielleicht ist es so etwas wie dieses:

val f = (x: Int) => x + 1 
val s = (0 to 9).toStream map f splitAt(6) match { 
    case (left, right) => left.par; right 
} 

Dies f auf den ersten 6 Elemente parallel und dann wieder einen Strom über den Rest eveluate wird .

+0

Dies scheint nicht parallel zu laufen - müssen Sie die 'map f' nicht nach' par' verschieben? – DNA

6

Ich weiß, dass dies eine alte Frage ist, aber tut die ParIterator Implementierung in der iterata Bibliothek tun, was Sie gesucht haben?

scala> import com.timgroup.iterata.ParIterator.Implicits._ 
scala> val it = (1 to 100000).toIterator.par().map(n => (n + 1, Thread.currentThread.getId)) 
scala> it.map(_._2).toSet.size 
res2: Int = 8 // addition was distributed over 8 threads 
+1

Es adressiert das Problem. Es könnte jedoch ein bisschen effizienter sein, da Sie viel blockieren, wenn Sie große Schwankungen in den Laufzeiten der Operationen innerhalb eines Chunks haben. – ziggystar

+0

Wie könnte es @ziggystar effizienter gemacht werden? –

+0

'ParIterator' teilt den' Iterator' in Stücke. Wenn Sie also kleine Chunks (z. B. Größe 2) haben und ein Element 1s dauert und das andere 10s, dann haben Sie eine schlechte Parallelisierung. Eine andere Implementierung könnte den Arbeitern neue Elemente aus dem Iterator geben, sobald ein Arbeiter frei wird. – ziggystar