Article 3 of 8 in the series Understanding Transducers through Python
By Rob Smallshire

In the previous article in the series we looked at improving the experience of composing transducers together in Python, by introducing a compose() function. We finished by showing this snippet, which composes a filtering transducer with a mapping transducer to produce a prime-squaring transducer. Recalling that transducers are used to transform-reducers, we pass an appending reducer to the prime-squaring transducer to get a prime-squaring-appending reducer. This is passed in turn to reduce(), along with an input range and an empty seed list for the result:

>>> reduce(compose(filtering(is_prime),
...                mapping(square))
...        (appender), # appender assumes a MUTABLE-sequence
...        range(20),
...        []) # list is a MUTABLE sequence
[4, 9, 25, 49, 121, 169, 289, 361]

And therein lies the rub. There's a fairly well disguised implicit dependency here, between the empty list we've passed as the initial value for the reduction and our passing of appender() as the ultimate reducer. We can illustrate this by passing an immutable sequence type, which doesn't support append(), rather than a mutable sequence type, which does. Look what happens if we pass in an empty tuple instead of an empty list:

>>> reduce(compose(filtering(is_prime),
...                mapping(square))
...        (appender), # appender assumes a MUTABLE-sequence
...        range(20),
...        tuple()) # tuple is an IMMUTABLE sequence
Traceback (most recent call last):
  File "", line 1, in
  File "", line 4, in filter_reducer
  File "", line 4, in map_reducer
  File "", line 2, in appender
AttributeError: 'tuple' object has no attribute 'append'

We can "fix" this by passing another reducer, rather than appender, called conjoiner [1]:

def conjoiner(result, item):
    return result + type(result)((item,))

which we can use like this:

>>> reduce(compose(filtering(is_prime),
...                mapping(square))
...        (conjoiner), # conjoiner assumes an IMMUTABLE-sequence
...        range(20),
...        tuple()) # tuple is an IMMUTABLE sequence
(4, 9, 25, 49, 121, 169, 289, 361)

Notice that the result is now enclosed in parentheses rather than square brackets, indicating that it is a tuple.

In order to address the fact that the ultimate reducer and the seed value will often need to change in sympathy, meaning that if one changes we need to remember to change the other, we'll to enrich the transducer interface. It will got from being a simple function call, to something that is at once more complex and more capable. To understand what those complexities are, we'll refer back to the Clojure archetype.

Examining the Clojure original

Our code has a very general form, but it is lacking features of the Clojure original, such as early termination of the reduction process. Let's look at the Clojure original for map [2] when called with a single argument:

(defn map
  ([f]
  (fn [rf]
    (fn
      ([] (rf))
      ([result] (rf result))
      ([result input]
        (rf result (f input)))
      ([result input & inputs]
        (rf result (apply f input inputs))))))

This may not be very clear - even if you can read Clojure! - so let's annotate it with some comments:

(defn map ;; The tranducer factory...
  ([f] ;; ...accepts a single argument 'f', the
  ;; transforming function
  (fn [rf] ;; The transducer function accepts a
    ;; reducing function 'rf'
    (fn ;; This is the reducing function returned
      ;; by the transducer

      ([] (rf)) ;; 0-arity : Forward to the zero-arity
      ;; reducing function 'rf'

      ([result] (rf result)) ;; 1-arity : Forward to the one-arity
      ;; reducing function 'rf'

      ([result input] ;; 2-arity : Perform the reduction with
        ;; one arg to 'f'
        (rf result (f input)))

      ([result input & inputs] ;; n-arity : Perform the reduction with
        ;; multiple args to 'f'
        (rf result (apply f input inputs))))))

Here's our reducing function in Python, which only implements the equivalent of the 2-arity version which performs the actual reduction:

def map_reducer(result, item):
    return reducer(result, transform(item))

The Clojure definitions of the zero- and one-arity reduction functions don't provide much clue as to what they are for - they're just contract preserving functions which forward from the 'new' reducer to the underlying reducer which has been wrapped by it.

In fact, the zero-arity function is called to produce the initial seed value when one isn't provided. For example, for addition the seed needs to be zero [3], for multiplication the seed needs to be one [4] , and in our Python examples for appending the seed should be the empty list, and for conjoining the seed should be an empty tuple. The map reducer simply delegates this to the underlying reducer, since it can't know – and indeed shouldn't know – upon which kind of data structure it is operating.

The one-arity function, which accepts only the intermediate result and no further input is used to perform transducer chain clean-up or reduction to a final result when processing of the sequence is complete or terminated early. This is useful for certain stateful transducers which need to deal with any left-over state. We'll look at some examples later.

So to document our improved understanding in comments:

(defn map ;; The tranducer factory...
  ([f] ;; ...accepts a single argument 'f', the
  ;; transforming function
  (fn [rf] ;; The transducer function accepts a
    ;; reducing function 'rf'
    (fn ;; This is the reducing function returned
      ;; by the transducer

      ([] (rf)) ;; 0-arity : Return a 'seed' value
      ;; obtained from 'rf'

      ([result] (rf result)) ;; 1-arity : Obtain final result from 'rf'
      ;; and clean-up

      ([result input] ;; 2-arity : Perform the reduction with
        ;; one arg to 'f'
        (rf result (f input)))

      ([result input & inputs] ;; n-arity : Perform the reduction with
        ;; multiple args to 'f'
        (rf result (apply f input inputs))))))

In fact, to fully implement the concepts inherent in Clojure reducers and transducers we need to do more work in our Python version to support:

  1. Explicit (although optional) association of the seed value with the reduction operation
  2. Early termination of reduction processes. For example, a search can terminate early without needing to reducing a whole series
  3. Reduction to a final value and opportunity to clean-up left-over state

Clojure supports these distinct behaviours through different arity versions of the same anonymous reducing function. Python doesn't support overloading on arity, and in any case, overloading on arity in order to support different operations can seem obtuse. [5] We have a perfectly good tool for bundling related named functions together in Python, and that tool is the class.

In the next phase, we'll convert our reducing functions into classes and necessarily replace our use of the Python Standard Library reduce() function with something a little more sophisticated which can support our new class-based reducers.

In Python, the conceptual interface to a reducer, will look like this:

class Reducer:

    def __init__(self, reducer): # Construct from reducing function
        pass

    def initial(self): # Return the initial seed value
        pass # 0-arity

    def step(self, result, item): # Next step in the reduction
        pass # 2-arity

    def complete(self, result): # Produce a final result and clean up
        pass # 1-arity

Notice that the __init__() function – and therefore the class – is a transducer. It accepts a reducer and returns a reducer!

new_reducer = Reducer(reducer)

It takes a particularly clear-minded and highly-caffeinated state to appreciate that the class is a transducer but instances of the class are reducers! In fact, we've found it so confusing, that we generally wrap the constructor call in another function with a more appropriate name:

def transducer(reducer):
    return Reducer(reducer)

More concretely, here is our mapping() transducer factory, the transducing function and the reducer it creates:

def mapping(transform):

    def mapping_transducer(reducer):
        return Mapping(reducer, transform)

    return mapping_transducer

Let's implement our Mapping reducer cum transducer class:

class Mapping:

    def __init__(self, reducer, transform):
        self._reducer = reducer
        self._transform = transform

    def initial(self):
        return self._reducer.initial()

    def step(self, result, item):
        return self._reducer.step(result, self._transform(item))

    def complete(self, result):
        return self._reducer.complete(result)

In the absence of any necessary behaviours specific to a particular reduction algorithm, the initial(), step() and complete() methods simply forward to the next reducer in the chain (self._reducer). The only behaviour here specialised for Mapping is to apply self._transform() to the item before passing the result down the chain.

And here's our filtering transducer-factory together with the Filtering reducer cum transducer:

class Filtering:

    def __init__(self, reducer, predicate):
        self._reducer = reducer
        self._predicate = predicate

    def initial(self):
        return self._reducer.initial()

    def step(self, result, item):
        return self._reducer.step(result, item) if self._predicate(item)
        else result

    def complete(self, result):
        return self._reducer.complete(result)

    def filtering(predicate):

        def filtering_transducer(reducer):
            return Filtering(reducer, predicate)

        return filtering_transducer

To allow the chain of reducers produced by our transducers to terminate in a regular reducer, such as appending, we'll replace our appending and conjoining reducing functions with classes which sport the same interface as our other reducers:

class Appending:

    def initial(self):
        return []

    def step(self, result, item):
        result.append(item)
        return result

    def complete(self, result):
        return result

class Conjoining:

    def initial(self):
        return tuple()

    def step(self, result, item):
        return result + type(result)((item,))

    def complete(self, result):
        return result

These two reducing classes have no internal state, and hence no need for initialisation functions, but crucially, we use the ability afforded by the initial() method to associate a seed value with the reducing operation. [[[Being stateless, we could have decorated the methods of these reducers with @staticmethod; we haven't done so though, to avoid detracting from the important similarity between our reducer and transducer classes.]]]

To make use of our class-based transducers, we need an alternative to reduce() which understands our new transducer/reducer protocol. Following Clojure's lead, we will call it transduce():

UNSET = object()

def transduce(transducer, reducer, iterable, init=UNSET):
    r = transducer(reducer)
    accumulator = init if (init is not UNSET) else r.initial()
    for item in iterable:
        accumulator = r.step(accumulator, item)
    return r.complete(accumulator)

We supply the reducer separately, rather than bundling it up inside the transducer object, because it contains the knowledge of how to accumulate the final result. Excluding that from our transducer definition, allows us to keep our transducer more general and reusable without committing to a particular result representation. For example, we might compose a complex transducer and want to keep that separate from whether the final result is accumulated in a list or in a tuple.

Let's try to use our new transduce() function to apply a transducer to a list of numbers. We'll do this step-by-step to keep things clear. First we'll compose the transducer from a filtering and and mapping:

>>> square_primes_transducer = compose(
...     filtering(is_prime),
...     mapping(square))

Then we'll construct the reducer which will accumulate the final result. We want a list, so we'll use Appending:

>>> appending_reducer = Appending()

Now we'll pass these to transduce():

>>> transduce(square_primes_transducer, appending_reducer, range(100))
[4, 9, 25, 49, 121, 169, 289, 361, 529, 841, 961, 1369, 1681, 1849,
2209, 2809, 3481, 3721, 4489, 5041, 5329, 6241, 6889, 7921, 9409]

Et voila!

By using transduce() and enriching our notion of what a reducer looks like, we no longer need to separately specify the seed value. If we want a tuple, we can use a different reducer:

>>> conjoining_reducer = Conjoining()
>>> transduce(square_primes_transducer, conjoining_reducer, range(100))
(4, 9, 25, 49, 121, 169, 289, 361, 529, 841, 961, 1369, 1681, 1849,
2209, 2809, 3481, 3721, 4489, 5041, 5329, 6241, 6889, 7921, 9409)

This decoupling of the transducer processing pipeline from the result type may not seem important in this example, but as we see later, it buys us a great deal of flexibility and re-use potential.

In the next article, we'll look at stateful transducers, and how having our transducers implemented as classes makes this particularly straightforward.

[1]conjoin verb To join together. There is also an equivalent Clojure function conj, and Clojure/conj is a Clojure programming conference.
[2]The definition of the mapping transducer factory source code on Github.
[3]We say the additive identity is zero.
[4]We say the multiplicative identity is one.
[5]It seems I'm not the only person who found Clojure's use of overloading by arity an impediment to understanding transducers. In fact, overloading by arity is incidental to the concept of transducers, and a curiosity of the Clojure archetype.
Category: Transducers