📜 ⬆️ ⬇️

Clojure - transducers, reducers and other dregs

Recently, transducers have received a certain fame - a new feature from the not yet released Clojure 1.7. At the time of this writing, Slojure 1.7-alpha5 is relevant, but a fair amount of transducer ports in various languages ​​have already appeared: Python , Ruby , JavaScript , PHP , Java , C ++ , Lua , Erlang . And ... it is a little discouraging. After all, for a long time (even in Clojure 1.5), the reducers library was added . So, nobody specifically said about the reduers, did not port anything anywhere, although they seem to do similar things ... Or not?

Let's see why we needed all these reducers & transducers in Clojure (do we really need them?), How they work, how to use them ... And finally find out if it is time to throw reducers into a landfill.

It would be wrong to describe concepts arising in Clojure outside the context of this language. Therefore, there will be a lot of Clojure listings. But there will be no matan. In general, the initial knowledge of Clojure is relevant (especially the concept of sequences ), but to know Haskell is not necessary. I also warn you in advance that all the listed listings for standard functions are in fact greatly changed, sometimes even “slightly” broken. All for the benefit of simplification. Oh yeah, that burrito in the picture.
')

Roll up ...


So, Clojure is a functional language, which means an ordinary imperative cycle is not good.
Well, okay, we didn’t really want to - there is a functionally reduce !

(defn my-reduce ([rf coll] ;;     (if-let [s (seq coll)] (my-reduce rf (first s) (next s)) (rf))) ([rf acc coll] (if-let [[x & xs] (seq coll)] (recur rf (rf acc x) xs) acc))) 

In fact, reduce , of course, is implemented somewhat differently , but for us this is not important now, we forget. The function rf (let's call its reduction function) here takes two arguments: the first is a kind of “sliding state”; the second is an element from the coll . If the initial state is not specified, then (first coll) or (rf) . Run across the entire coll collection, for each element we call rf , while “dragging” the state acc . When the items have run out, just return the acc .

A small example. Suppose we have a list of strings, we want to calculate their total length.
Here is the imperative code with the loop:

 (defn length-of-strings [strings] (with-local-vars [acc 0] ;; -,  Clojure   ! (doseq [c strings] (var-set acc (+ @acc (count c)))) ;;    @acc)) 

The loop state is a simple counter acc (number). At each iteration, we set it equal (+ @acc (count c)) .
And now one more time, only through reduce :

 (defn length-of-strings [coll] (my-reduce (fn ([acc c] (+ acc (count c)))) ;;  - 0 ;;   coll)) 

If you temporarily forget about laziness, you can implement many primitive operations, like map or filter .

 (defn my-map [f coll] (my-reduce (fn [acc c] (conj acc (fc))) [] coll)) (defn my-filter [p coll] (my-reduce (fn [acc c] (if (pc) (conj acc c) acc)) [] coll)) 

For the implementation of take reduced variant will no longer fit - the loop always runs through the entire sequence (this is not Haskell, where everything is lazy).

In order to overcome this drawback, version 1.5 added a special crutch marker reduced and its corresponding predicate reduced? . At the same time rewrote reduce , getting something like this:

 (defn my-reduce ([rf coll] (if-let [s (seq coll)] (my-reduce rf (first s) (next s)) (rf))) ([rf acc coll] (if-let [[x & xs] (seq coll)] (let [ret (rf acc x)] (if (reduced? ret) @ret (recur rf ret xs))) acc))) 

As soon as the reduction function returns (reduced ...) , the cycle ends and the value of @ret .

 (defn take-r [n coll] (my-reduce (fn [[n1 acc] c] (if (pos? n1) [(dec n1) (conj acc c)] (reduced acc))) [n []] coll)) ;;    ! (take-r 5 (range)) ;; => [0 1 2 3 4] 

It is impossible not to recall the wonderful function of reductions . In essence, this is an analogue of reduce , only returns a lazy list of all intermediate values ​​of acc , and not just the last. It is very convenient to use when debugging. We write the step of the algorithm in the form of a function, run reduce on the collection with input data. If suddenly something is wrong, replace reduce with reductions , run it in the REPL and get all intermediate steps. With cycles it will not be so easy - debugging crutches will have to be crammed, which is not very convenient.

Sometimes reductions useful in and of themselves, which factorials are there to calculate:

 ;; => **   (def factorials (reductions *' (cons 1 (map inc (range))))) (nth factorials 20) ;; => 2432902008176640000 

Clojure uses sequences to traverse collections. If we decide to run through a vector, hash table, or simple iterator, then a fair amount of temporary objects will be created on the heap.

The obvious optimization that asks in such a situation is to implement a specialized variant of reduce for those collections for which this makes sense. Well, if the collection does not lend itself to such optimization, then use the standard implementation, similar to the one shown at the beginning of the article. For this there is a special protocol clojure.core.protocol / CollReduce . When a collection object supports it, this implementation will be used inside clojure.core/reduce . Therefore, reduce in Clojure is usually faster than a similar doseq loop.

Transformers


A transformer is a function that takes a single reduction function and returns a new one.
For example, here's a "increase by 1" transformer:

 (defn inc-t [rf] (fn [acc c] (rf acc (inc c)))) ;;     (reduce + 0 (map inc [1 2 3 4])) ;; => 14 (reduce (inc-t +) 0 [1 2 3 4]) ;; => 14 

You can somewhat generalize this case by allowing instead of inc to specify any function:

 (defn map-t [f] (fn [rf] (fn [acc c] (rf acc (fc))))) (def inc-t (map-t inc)) (def dec-t (map-t dec)) ;; ... (reduce (inc-t +) 0 [1 2 3 4]) ;; => 14 

And here, for example, the “filter” transformer:

 (defn filter-t [pred] (fn [rf] (fn [acc c] (if (pred c) (rf acc c) acc)))) (def odd?-t (filter-t odd?)) (def even?-t (filter-t even?)) ;;  (reduce (even?-t *) 1 [1 2 3 4]) ;; => 8 

Is it possible to combine several transformers? Of course!

 (defn odd?-inc-t [rf] (odd?-t (inc-t rf))) ;; ..    (def odd?-inc-t (comp (filter-t odd?) (map-t inc))) ;;   .. (def odd?-inc-t (comp (fn [rf] (fn [acc c] (if (odd? c) (rf acc c) acc))) (fn [rf] (fn [acc c] (rf acc (inc c)))))) ;;      (defn odd?-inc-t [rf] (fn [acc c] (if (odd? c) (rf acc (inc c)) acc))) ;;   (reduce * 1 (->> [1 2 3 4 5] (filter odd?) (map inc))) ;; => 48 (reduce (odd?-inc-t *) 1 [1 2 3 4 5]) ;; ==> 48 

It is worth noting that the transformers are in the "reverse" order. If we want the collection elements to be processed by a transformer A before they fall into B , then we need to glue them together as (comp AB) . And now focus:

 (def cc (vec (range 1000000))) (time (reduce + 0 (->> cc (filter odd?) (map inc)))) ;; "Elapsed time: 171.390143 msecs" ;; => 250000500000 (time (reduce ((comp (filter-t odd?) (map-t inc)) +) 0 cc)) ;; "Elapsed time: 93.246015 msecs" ;; => 250000500000 

Here's how, a tangible increase in speed on level ground! All, of course, depends on many details and various nuances, so in reality, the gain may be different. In general, I want to say that you should not take this piece of code as a benchmark.

But in general, the results are not surprising. When using map and filter , 2 intermediate sequences are created. We run through the source vector, create a temporary list of filtered values. Then we run through this list and build another one, but with larger elements. And, finally, we run over it already, summing up the values.

On the other hand, the version with transformers does not create any temporary collections. Instead, odd? immediately applied over the source elements odd? and inc .

Where are my reducers?


And everything was fine until version 1.5 introduced the new standard library clojure.core.reducers . Exactly, a separate library will have to be explicitly imported. It also announced its own versions of map , filter , take-while , and others. And, of course, they are not compatible with regular versions of clojure.core . Therefore, it is better to write (require '[clojure.core.reducers :as r]) instead of simple (use 'clojure.core.reducers) .

So, what is a reducer? Briefly and stupidly: a reducer is any object by which you can redraw. Any collection in terms of clojure.core.reducers is a reducer. Hash table - reducer. And java.lang.String reducer. Well, nil , of course, too . Let's see the definition:

 (defn reducer [coll xf] ;; `xf` -   (reify clojure.core.protocols/CollReduce (coll-reduce [this f1] (let [f2 (xf f1)] (clojure.core.protocols/coll-reduce coll f2 (f2)))) (coll-reduce [this f1 init] (let [f2 (xf f1)] (clojure.core.protocols/coll-reduce coll f2 init))))) 

Here the collection coll is taken, and a new one is returned, on which you can run reduce , and only that. Neither add an element, nor delete, nor even walk through the elements. But before each launch of reduce reduction function will be passed through the transformer xf .

 (def nums [1 2 3 4 5]) (def nums+1 (reducer nums inc-t)) (reduce + 0 nums) ;; => 15 (reduce + 0 nums+1) ;; => 20 

As already mentioned, the reducers library has its own map , filter , take-while and similar variants. All of them accept a reducer and return a new one to which the corresponding transformer is “attached”.

So it could look like clojure.core.reducers/map (it, of course, looks very different ):

 (def map-r [f coll] (reducer coll (map-t f))) 

And now a few examples of how all this stuff can be used:

 (require '[clojure.core.reducers :as r]) (def nums [1 2 3 4 5 6 7 8]) (type (map inc nums)) ;; => clojure.lang.LazySeq (reduce conj [] (map inc nums)) ;; => [2 3 4 5 6 7 8 9] (type (r/map inc nums)) ;; => clojure.core.reducers$folder$reify__1234 ;; -  sequence (reduce conj [] (r/map inc nums)) ;; => [2 3 4 5 6 7 8 9] ;;      (reduce conj [] (r/filter odd? nums)) ;; => [1 3 5 7] (reduce + 0 (->> nums (r/map inc) (r/map inc))) ;; => 52 ;; ~~ (+ 0 (inc (inc 1)) (inc (inc 2)) ...) (reduce + 0 (->> nums (r/filter odd?) (r/map inc))) ;; => 20 ;; ~~ (+ 0 (inc 1) (inc 3) ...) 

Parallel


To be honest, “reducers” was called so in vain. "Folders" would be more correct. Indeed, in addition to the CollReduce protocol (which appeared long before reducers), another, more important, CollFold protocol is declared in the library:

 (defprotocol CollFold (coll-fold [coll n combinef reducef])) 

In principle, it is very similar, only the reduct functions are now two, and the incomprehensible argument n has also been added. Idea: for some collections you can run in several streams. Briefly: we divide it into blocks of about n elements in size, each piece is collapsed with #(reduce reducef (combinef) %) , then the list of results (one per block) is collapsed again, but with #(reduce combinef %) .

A reducer that can collapse itself in parallel is called a folder .
Only 2 standard collections support the CollFold protocol - vectors and hash tables.

 (def v (vec (range 10000000))) ;; ,  1  (time (reduce + v)) ;; "Elapsed time: 648.897616 msecs" ;; => 49999995000000 ;;    (time (r/coll-fold v 512 + +)) ;; "Elapsed time: 187.414147 msecs" ;; => 49999995000000 

All standard reducers, for which it makes sense, implement CollFold . This, for example, r/map , r/filter , r/mapcat , r/flatten . On the other hand, r/take , r/take-while , r/drop do not support parallelization. The above is the implementation of r/map . Here is its updated version:

 (def map-r [f coll] ;;   `reducer`  `folder` (folder coll (map-t f))) 

There is no need to use coll-fold directly - there is a wrapper for everyday needs. It sets the default value for n (block size) to 512. In general, the hint is clear - reducers are clearly intended for large collections (> 1K items). And again: do not use coll-fold directly, call fold .

Ah, there is still foldcat . A kind of accelerated (due to multithreading) option #(reduce conj [] %) . This function returns clojure.core.reducers.Cat objects that implement Counted , Sequable , and CollFold .

 (r/map inc [1 2 3]) ;; => #<reducers$folder$reify__..... ;;   ** ? ;; ..     `reduce`  (reduce conj [] (r/map inc [1 2 3])) ;; => [2 3 4] ;;    ... (def v (vec (range 1000000))) (time (count (reduce conj [] (r/map inc v)))) ;; "Elapsed time: 90.124397 msecs" ;; => 1000000 ;; -  ,    `foldcat` (time (count (r/foldcat (r/map inc v)))) ;; "Elapsed time: 25.054988 msecs" ;; => 1000000 (time (count (r/foldcat (r/map inc (r/foldcat (r/map inc v)))))) ;; "Elapsed time: 32.054988 msecs" ;; => 1000000 ;;  `foldcat`, ,  foldable (, ) (satisfies? r/CollFold (r/foldcat [])) ;; => true 

On the scene rush ...


Unlike viewers, transducers are no longer a separate library. It is rather a concept (read the idea) that will be integrated directly into the clojure.core module. We are waiting for this good in version 1.7 (quite a bit left).

Briefly: transducers are the same transformers, only after re-branding otherwise named. Well, almost: the reduct functions can now take not only 0 and 2 arguments, but also 1. And the transducer, respectively, is a function from 0-1-2-ary reduct function to 0-1-2-ary new .

 (def typical-transducer (fn [rf] (fn ([] ...) ;;    ([acc] ...) ;; ... ([acc c] ...))) ;; ,    ,    ;;   `map-t`,  33%   (defn map-t-improved [f] (fn [rf] (fn ([] (rf)) ;;   ([acc] (rf acc)) ;;   ([acc c] (rf acc (fc)))))) ;;  `c`  `(fc)` 

The 0-ary reduct function, as before, can be called if an initial element is needed. Option 2-ary is used for the reduction itself. And the 1-ary variant is called at the very end of the whole work (at the end of reduce ). It is needed in those cases when you need to "add" new elements for the latter.

Example: a dedupe transducer skipping replays from the collection:

 (defn my-dedupe [] (fn [rf] ;; , ! (let [prev (atom ::none)] (fn ;;   - ([] (rf)) ([acc] (rf acc)) ([acc c] (let [p @prev] (reset! prev c) (if (= pc) acc (rf acc c)))))))) (def rf ((my-dedupe) +)) (reduce rf 0 [1 1, 2 2, 3, 1 1]) ;; => 7 (reduce rf 0 [1 1, 2 2, 3, 1 1]) ;; => 6 ;; ... `rf`  ,    2  

Thin moment - our transducer returns a new reduct function. Moreover, this reduct function has a mutable state and is able to do essentially 3 different things (1 each for arity). Well, just some kind of object. But at the same time, the transducer itself does not have a state, it only acts as a certain factory.

As an example of using the 1-ary variant of the reduction function, partition-all is given . Simplified implementation:

 (defn partition-all-t [n] (fn [rf] (let [buffer (java.util.ArrayList. n)] ;; ! (fn ([] (rf)) ([acc] (if (.isEmpty buffer) ;;    -   (rf acc) ;; ... (let [v (vec (.toArray buffer)) ;;     acc' (rf acc v)] ;;    2- `rf` ;;       (rf acc')))) ([acc c] (.add buffer c) (if (= n (.size buffer)) ;;    - ""  (let [v (vec (.toArray buffer))] (.clear buffer) (rf acc v)) ;;  -    acc)))))) ;;  ,   (   ,  (conj) => []) (reduce ((partition-all-t 3) conj) (range 10)) ; >> ClassCastException java.lang.Long cannot be cast to clojure.lang.IPersistentCollection ;;  ... ;;  ,    []... (reduce ((partition-all-t 3) conj) [] (range 10)) ;; => [[0 1 2] [3 4 5] [6 7 8]] ;; ,  ... 

Hmm ... Neither the 0-ary or the 1-ary options ((partition-all-t 3) conj) have ever been called - the usual reduce knows nothing about all these innovations. It calls the 0-ary variant only if the collection is empty, 1-ary does not call at all.

Therefore, we created a new function transduce . Here it is, in contrast to the “obsolete” reduce , it always calls (rf) , unless the initial state is explicitly specified. And this function is guaranteed to call (rf acc) , and exactly once . And transduce itself calls our transducer and hides the mutable reduct function from our eyes. In other words, all the dirty work (in terms of side effects) is done “under the hood”.

 ;;    ,      (transduce (partition-all-t 3) conj (range 10)) ;; => [[0 1 2] [3 4 5] [6 7 8] [9]] ;; ... ! ;;   ( ) (transduce (comp (filter odd?) (partition-all-t 3)) conj (range 10)) ;; => [[1 3 5] [7 9]] 

And what if to try transduce instead of reduce use?

 (reduce (identity -) 0 [1 2 3 4]) ;; => -10 ;; ~~ (- (- (- (- 0 1) 2) 3) 4) (transduce identity - 0 [1 2 3 4]) ;; => 10 ;;  ! 

It turns out that direct replacing reduce with transduce does not work - the new requirement of the 1-ary reduct function interferes. In our example, after the end of the calculations, transduce calls (- acc) , which changes the sign of the result to the opposite. Complement will help fix the situation:

 ((completing -) 3 2) ;; => 1 ((identity -) 1) ;; => -1 ((completing -) 1) ;; => 1 (transduce completing - 0 [1 2 3 4]) ;; => -10 ;;   ! 

In the core of the language, there are special functions for working with transformers with transducers. It is quite expected that the standard set of these most transducers was also added. And in order not to produce a lot of new functions (there are so many of them, you will get confused in two accounts), we decided to upgrade the existing map , filter , take , interpose , mapcat and company:

 (map inc [1 2 3]) ;; => (2 3 4) (map inc) ;; => #<core$map$fn__4525 clojure.core$map$fn__4525@2d9e9038> ;;  ! ;;    (transduce (map inc) + [1 2 3]) ;; => 9 (transduce (comp (map inc) (filter even?)) + [1 2 3]) ;; => 6 ;; ~~ (+ (inc 1) (inc 3)) => 6 

In addition to transduce there are several other functions for working with transducers:

 ;;     (sequence (map inc) [1 2 3]) ;; => (2 3 4) ;;     (transduce (map inc) conj [1 2 3]) ;; => [2 3 4] ;; ... ;;  `sequence`   ** ! ;;  `transduce`      (take 5 (sequence (map inc) (range))) ;; => (1 2 3 4 5) ;;   `into`     (into [9] (map inc) [1 2 3]) ;; => [9 2 3 4] 

But the funniest feature is eduction . It returns a proxy object on which you can call seq , reduce or get a java-iterator. It is expected that this object will simply cause transduce or sequnce . Trifle, but convenient.

 (def odds (eduction (filter odd?) (range))) (def evens (eduction (remove odd?) (range))) ;;     sequential (take 5 odds) ;; => (1 3 5 7 9) ;;     sequence   100500  ;;       - sequence  GC (nth odds 100500) ;; => 2010001 ;;       reduce (  LazyCol) ;; ~= (reduce ((filter even?) ((take 100500) +)) 0 (range)) (transduce (take 100500) + evens) ;; => 10100149500 

Stop, stop, stop. After all, it is suspiciously reminiscent of clojure.core.reducers/reducer ... He, however, could only be turned off, and then seq allowed to run. So r/reducer throw in the trash! But not r/folder , he knows how to multithreading!

 (require '[clojure.core.reducers :as r]) (def v (vec (range 1000000))) (time (transduce (map inc) + v)) ;; "Elapsed time: 120.193971 msecs" ;; => 500000500000 (time (r/fold + (r/folder v (map inc)))) ;; "Elapsed time: 37.597224 msecs" ;; => 500000500000 ;;   ! (transduce (take 100500) + v) ;; => 5050074750 (r/fold + (r/reducer v (take 100500))) ;; => 5050074750 ;;  ;; reducer  -   eduction (r/fold + (eduction (take 100500) v)) ;; => 5050074750 (reduce + (r/folder v (take 100500))) ;; => 5050074750 ;;    (r/fold + (r/folder v (take 100500))) ;; => 109071345018 ;; ... ;;     (   ) 

When using transducers, both better performance compared to conventional map / filter / etc (based on lazy sequences) and greater flexibility / abstractness are achieved. I note that we are talking here about the clojur sequences: in terms of the level of abstraction and speed, transducers are comparable to conventional iterators / enumerators / generators (in different languages, they are called differently).

But back to Clojure. Earlier in core.async there was a whole lot of functions like map> , map< , filter< , filter> , etc. Today they have been removed (well, as they have removed, for now they have only declined). But allowed to specify the transformer transducer when creating the channel:

 ;;      project.clj (require '[clojure.core.async :as a]) ;;     (def xf (filter odd?)) ;;     (def ch (a/chan 10 xf)) ;;      0  9     (a/onto-chan ch (range 10)) ;;     (a/<!! (a/into [] ch)) ;; => [1 3 5 7 9] 

The transducer can only be hung on buffered channels. And before the element is in the buffer, our transducer processes it. There are all sorts of pipeline 's, they also work with transducers.

Summing up


Various reducer / transducer - all the essence of a generalization of the convolution operation. And therefore, they require a reduct function with 2 arguments to work its useful.

In addition to the 2-ary variant, it is better to determine the 0-ary one at the same time - it can be used if the initial state of convolution is not specified. Or maybe not used: if the source collection is not empty, then reduce will take its first element. But transduce is not so despicable - either the initial state is passed to it explicitly, or the 0-ary call to the reduct function is used.

On the other hand, transduce requires more from the reduct function — a 1-ary variant is needed. Which, in general, most often does nothing at all. Seriously, usually ([x] x) is the most meaningful implementation in this case. But we are lazy, rewriting old functions (0/2-ary) is too lazy for us, so we use completing wrapper, which adds an empty 1-ary version.

Further, reductors are based on transformers. Transformer = function with type rf -> rf . In fact, a reducer is a collection to which the transformer is tightly screwed. And, every time we run reduce on this collection, first the transformer “spoils” our reduct function.

The transducer ~ = transformer, only requires support for the 1-ary reduct function. So we always define this most hapless 1-arnick, and proudly declare to everyone: “Well, of course, I don’t use outdated transformers, only transducers”

For all this, transducers are not limited to working with collections. You can fasten them to channels, I / O streams, queues, observators, etc. In general, everything that fantasy is enough.

Total:

Source: https://habr.com/ru/post/247889/


All Articles