2015-11-09 8 views
5

Ich möchte Nachrichten auf einem core.async chan durch Zählung und Timeout, (d. H. 10ms oder 10 Nachrichten, je nachdem, was zuerst kommt). Tim Baldridge has a video on batching, aber es verwendet veraltete Funktionen in core.async und verwendet keine Wandler. Ich bin auf der Suche nach so etwas wie die folgenden ...Wie man Nachrichten mit core.async richtig stapelt?

(defn batch [in out max-time max-count] 
    ... 
) 

Antwort

11

Transducer sollen nicht wirklich ein Problem für eine Chargenfunktion sein - als Nehmer auf dem in Kanal, wird es von allen Wandlern auf dem transformierten Werte sehen Kanal, und alle Nehmer, die auf out hören, sehen wiederum Werte, die durch den Wandler dieses Kanals umgewandelt werden.

Wie für eine Implementierung, die Funktion unten nehmen Chargen von max-count Elemente aus in, oder aber viele kommen von max-time da die letzte Charge ausgegeben wurde, und geben sie an out, zu schließen, wenn der Eingangskanal schließt, vorbehaltlich der Eingangskanals Wandler (falls vorhanden, und etwaige auf out hören Nehmer wird auch, dass der Kanal des Wandlers, wie oben erwähnt angewandt):

(defn batch [in out max-time max-count] 
    (let [lim-1 (dec max-count)] 
    (async/go-loop [buf [] t (async/timeout max-time)] 
     (let [[v p] (async/alts! [in t])] 
     (cond 
      (= p t) 
      (do 
      (async/>! out buf) 
      (recur [] (async/timeout max-time))) 

      (nil? v) 
      (if (seq buf) 
      (async/>! out buf)) 

      (== (count buf) lim-1) 
      (do 
      (async/>! out (conj buf v)) 
      (recur [] (async/timeout max-time))) 

      :else 
      (recur (conj buf v) t)))))) 
+0

großes Stück Code, einfach und korrekt. Benutzte es für das Batching von Redis PubSub-Nachrichten (mit 'out' als Publisher). – siphiuel

+0

Wunderbare Antwort. – john

+0

Ich frage mich, ob 'clojure.core.async/take' dafür gut passen könnte, aber man muss grundsätzlich auf jeden Fall die Schleife hinzufügen (und die" niemals ewig blockieren "- Timeout!) Also am Ende von An diesem Tag sieht die obige Implementierung noch immer solide aus. –