2017-03-12 3 views
-1

einen core.async Kanal betrachten, die wie so erstellt:den Inhalt eines core.async Kanalspülung

(def c (chan)) 

Und lassen Sie uns (zB auf diesen Kanal aus verschiedenen Orten gesetzt und genommen Werte annehmen, in zu gehen. -loops).

Wie würde man alle Elemente auf dem Kanal zu einer bestimmten Zeit spülen?

Zum Beispiel eine der Kanal ein Atom machen könnte und haben dann ein Ereignis wie folgt aus:

(def c (atom (chan)) 

(defn reset [] 
    (close! @c) 
    (reset! c (chan))) 

Gibt es eine andere Art und Weise, dies zu tun?

+0

Warum wird ein Kanal von verschiedenen Orten gelesen? Sie haben keine Garantie, wo es gelesen wird – fl00r

+0

Das Schließen des Kanals "spült" die Elemente nicht. Sie werden bis zum Lesen im Kanal warten. –

Antwort

0

Lassen Sie uns ein wenig klarer definieren, was Sie zu tun scheinen: Sie haben Code in mehreren Go-Loops ausgeführt, von denen jeder Daten auf den gleichen Kanal legt. Du möchtest ihnen alle sagen können: "Der Kanal, auf den du Werte anlegst, ist nicht mehr gut, lege deine Werte ab jetzt auf einen anderen Kanal." Wenn das nicht das ist, was du machen willst, dann macht deine ursprüngliche Frage keinen Sinn, da es kein "Spülen" gibt - du nimmst entweder die Werte, die auf den Kanal gelegt werden, oder du tust es nicht.

Zuerst verstehen, warum Ihr Ansatz nicht funktioniert, die die Kommentare zu Ihrer Frage berühren: Wenn Sie deref ein Atom c, erhalten Sie einen Kanal, und dieser Wert ist immer der gleiche Kanal. Sie haben Code in go-loop s, die >! genannt haben und derzeit geparkt sind und auf die Nehmer warten. Wenn Sie @c schließen, bleiben diese geparkten Threads geparkt (wer geparkt ist, während von einem Kanal (<!) nimmt, erhält sofort den Wert nil, wenn der Kanal schließt, aber geparkt >! s wird einfach geparkt bleiben). Sie können reset!c den ganzen Tag lang, aber die geparkten Threads sind immer noch auf einem früheren Wert geparkt, den sie von deref ing bekommen haben.

Also, wie machst du es? Hier ist ein Ansatz.

(require '[clojure.core.async :as a 
    :refer [>! <! >!! <!! alt! take! go-loop chan close! mult tap]]) 

(def rand-int-chan (chan)) 
(def control-chan (chan)) 
(def control-chan-mult (mult control-chan)) 

(defn create-worker 
    [put-chan control-chan worker-num] 
    (go-loop [put-chan put-chan] 
    (alt! 
     [[put-chan (rand-int 10)]] 
     ([_ _] (println (str "Worker" worker-num " generated value.")) 
      (recur put-chan)) 

     control-chan 
     ([new-chan] (recur new-chan))))) 

(defn create-workers 
    [n c cc] 
    (dotimes [n n] 
    (let [tap-chan (chan)] 
     (a/tap cc tap-chan) 
     (create-worker c tap-chan n)))) 

(create-workers 5 rand-int-chan control-chan-mult) 

So werden wir 5 Arbeiter Schleifen zu schaffen, die ihr Ergebnis auf rand-int-chan setzen wird, und wir werden ihnen einen geben „Steuerkanal.“ Ich lasse Sie mult und tap auf eigene Faust erkunden, aber kurz gesagt, wir erstellen einen einzelnen Kanal, auf den wir Werte setzen können, und dieser Wert wird dann an alle Kanäle gesendet, die ihn anzapfen.

In unserer Worker-Schleife führen wir einen von zwei Dingen durch: Setzen Sie einen Wert auf den rand-int-chan, den wir verwenden, wenn wir ihn erstellen, oder wir nehmen einen Wert von diesem Steuerkanal ab. Wir können dem Worker-Thread geschickt mitteilen, dass der Kanal, auf den seine Werte gesetzt werden, sich geändert hat, indem er ihm den neuen Kanal übergeben hat, den er dann beim nächsten Mal durch die Schleife bindet. Also, um zu sehen, sie in Aktion:

(<!! rand-int-chan) 

=> 6 
Worker2 generated value. 

Diese zufällige Ints aus dem Kanal nehmen, und der Arbeiter-Thread druckt, dass sie einen Wert erzeugt haben, dass in der Tat mehr Threads sehen teilnehmen hier.

Nehmen wir an, wir wollen den Kanal ändern, um die zufälligen ganzen Zahlen einzuschalten.Kein Problem, wir tun:

(def new-rand-int-chan (chan)) 
(>!! control-chan new-rand-int-chan) 
(close! rand-int-chan) ;; for good measure, may not be necessary 

Wir schaffen den Kanal, und dann setzen wir diesen Kanal auf unsere control-chan. Wenn wir dies tun, wird der Worker-Thread den zweiten Teil seiner alt! ausführen, der einfach an die Spitze der go-loop zurückschleift, außer dass diesmal die put-chan an die new-rand-int-chan gebunden ist, die wir gerade erhalten haben. Also jetzt:

(<!! new-rand-int-chan) 

=> 3 
Worker1 generated value. 

Dies gibt uns unsere Integer, die genau das ist, was wir wollen. Jeder Versuch, <!! aus dem alten Kanal geben nil, da wir den Kanal geschlossen:

(<!! rand-int-chan) 
; nil 
0

Lesen Sie alles auf einen Vektor mit into und verwenden Sie es nicht.

(go (async/into [] c))