2017-10-27 1 views
0

Für mein Mandelbrot-Explorer-Projekt muss ich mehrere teure Jobs ausführen, idealerweise parallel. Ich beschloss, Chunking die Arbeitsplätze, um zu versuchen, und jeder Klumpen in seinem eigenen thread läuft, und am Ende landete mit so etwas wieSchließen eines Kanals auf der Producer-Seite, wenn alle Jobs fertig sind

(defn point-calculator [chunk-size points] 
    (let [out-chan (chan (count points)) 
     chunked (partition chunk-size points)] 

    (doseq [chunk chunked] 
     (thread 
     (let [processed-chunk (expensive-calculation chunk)] 
      (>!! out-chan processed-chunk)))) 

    out-chan)) 

Wo points eine Liste [real, imaginär] Koordinaten getestet werden, und expensive-calculation ist eine Funktion, die den Chunk übernimmt und jeden Punkt im Chunk testet. Es kann sehr lange dauern, bis jeder Chunk fertig ist (je nach Chunk-Größe und Anzahl der Jobs möglicherweise eine Minute oder länger).

Auf meinem Verbraucher Ende, ich bin mit

(loop [] 
    (when-let [proc-chunk (<!! result-chan)] 
    ; Do stuff with chunk 
    (recur))) 

jeden verarbeitete Brocken zu konsumieren. Momentan blockiert dies, wenn der letzte Chunk verbraucht wird, da der Channel noch offen ist.

Ich brauche eine Möglichkeit, den Kanal zu schließen, wenn die Jobs fertig sind. Dies erweist sich aufgrund der Asynchronität der Erzeugerschleife als schwierig. Ich kann nicht einfach eine close! nach der doseq setzen, da die Schleife nicht blockiert, und ich kann nicht nur schließen, wenn der zuletzt indizierte Job erledigt ist, da die Reihenfolge unbestimmt ist.

Die beste Idee, die ich mir vorstellen konnte, war die Aufrechterhaltung eines (atom #{}) Jobs und disj jeden Job, wie es fertig ist. Dann könnte ich entweder nach der eingestellten Größe in der Schleife suchen, und close!, wenn es 0 ist, oder eine Uhr an das Atom anhängen und dort nachsehen.

Dies scheint jedoch sehr hackish. Gibt es einen idiomatischen Umgang damit? Schlägt dieses Szenario vor, dass ich async falsch verwende?

Antwort

1

Ich würde einen Blick auf die take Funktion von core-async. Das ist, was es ist Dokumentation sagt:

„Gibt einen Kanal, der höchstens zurückkehren, n Elemente aus ch Nach n Artikel zurückgegeben wurden, oder ch geschlossen wurde, wird der Rückkanal schließen .“

so führt Sie auf eine einfache Lösung: statt out-chan der Rückkehr Sie es in take nur wickeln kann:

(clojure.core.async/take (count chunked) out-chan)

, die funktionieren sollte. Auch würde ich Ihnen empfehlen, um Ihr Beispiel von Blockierung Put/Get to Parking (<!, >!) und thread zu go/go-loop, die idiomatische Nutzung für Core Async ist neu schreiben.

+0

Danke, ich werde das später versuchen. Ich muss meinen PC zurücksetzen. -_- Und ich benutzte Parkrufe, aber ich war der Meinung, dass die Erstellung einer Unmenge an Blöcken mit lang andauernden Prozessen ein schlechtes Design war. – Carcigenicate

+0

ja. Ich nehme an, du hast Recht – leetwinski

0

Sie könnten async/pipeline (-blocking) verwenden, um Parallelismen zu steuern. Verwenden Sie aysnc/oto-chan, um den Eingangskanal automatisch zu schließen, nachdem alle Chunks kopiert wurden.

z. Das folgende Beispiel zeigt eine 16-fache Verbesserung der verstrichenen Zeit, wenn Parallelismus auf 16 eingestellt ist.

(defn expensive-calculation [pts] 
    (Thread/sleep 100) 
    (reduce + pts)) 

(time 
(let [points  (take 10000 (repeatedly #(rand 100))) 
     chunk-size 500 
     inp-chan (chan) 
     out-chan (chan)] 
    (go-loop [] (when-let [res (<! out-chan)] 
       ;; do stuff with chunk 
       (recur))) 
    (pipeline-blocking 16 out-chan (map expensive-calculation) inp-chan) 
    (<!! (onto-chan inp-chan (partition-all chunk-size points))))) 
Verwandte Themen