Transducer sollen nicht wirklich ein Problem für eine Chargenfunktion sein - als Nehmer auf dem in
Kanal, wird es von allen Wandlern auf dem transformierten Werte sehen Kanal, und alle Nehmer, die auf out
hören, sehen wiederum Werte, die durch den Wandler dieses Kanals umgewandelt werden.
Wie für eine Implementierung, die Funktion unten nehmen Chargen von max-count
Elemente aus in
, oder aber viele kommen von max-time
da die letzte Charge ausgegeben wurde, und geben sie an out
, zu schließen, wenn der Eingangskanal schließt, vorbehaltlich der Eingangskanals Wandler (falls vorhanden, und etwaige auf out
hören Nehmer wird auch, dass der Kanal des Wandlers, wie oben erwähnt angewandt):
(defn batch [in out max-time max-count]
(let [lim-1 (dec max-count)]
(async/go-loop [buf [] t (async/timeout max-time)]
(let [[v p] (async/alts! [in t])]
(cond
(= p t)
(do
(async/>! out buf)
(recur [] (async/timeout max-time)))
(nil? v)
(if (seq buf)
(async/>! out buf))
(== (count buf) lim-1)
(do
(async/>! out (conj buf v))
(recur [] (async/timeout max-time)))
:else
(recur (conj buf v) t))))))
großes Stück Code, einfach und korrekt. Benutzte es für das Batching von Redis PubSub-Nachrichten (mit 'out' als Publisher). – siphiuel
Wunderbare Antwort. – john
Ich frage mich, ob 'clojure.core.async/take' dafür gut passen könnte, aber man muss grundsätzlich auf jeden Fall die Schleife hinzufügen (und die" niemals ewig blockieren "- Timeout!) Also am Ende von An diesem Tag sieht die obige Implementierung noch immer solide aus. –