Good with Computers

The Sixty North Blog

Stateful Transducers

In the previous article in this series on transducers we saw how we can develop the notion of the transducer from a single function which literally transforms reducers to a more capable protocol which supports two further capabilities: First of all, the association of initial ‘seed’ values with a reduction operation, and secondly the opportunity for cleanup for stateful transducers. So far, we’ve exercised the first capability, but not the second. To demonstrate clean-up, we need to introduce stateful transducers.

The mapping and filtering transducers we have seen so far are stateless. What this means is that the result for the current item being processed depends only on the values of the result accumulated so far and the new item. We can, however, make stateful transducers, and the fact that our Python transducers are classes makes this particularly easy, because it gives us an obvious place to store the state, in instances of those classes. Perhaps the simplest example is an enumerating transducer which keeps track of item indexes and accumulates (index, item) tuple pairs into the result:

class Enumerating:

    def __init__(self, reducer, start):
        self._reducer = reducer
        self._counter = start

    def initial(self):
        return self._reducer.initial()

    def step(self, result, item):
        index = self._counter
        self._counter += 1
        return self._reducer.step(result, (index, item))

    def complete(self, result):
        return self._reducer.complete(result)


def enumerating(start=0):
    """Create a transducer which enumerates items."""

    def enumerating_transducer(reducer):
        return Enumerating(reducer, start)

    return enumerating_transducer

We’ll use this by composing it onto the end of our existing transducer chain:

>>> square_primes_transducer = compose(
...     filtering(is_prime),
...     mapping(square))
>>>
>>> enumerated_square_primes_transducer = compose(
...     square_primes_transducer,
...     enumerating())
>>>
>>> appending_reducer = Appending()
>>>
>>> transduce(enumerated_square_primes_transducer,
...           appending_reducer,
...           range(100))
[(0, 4), (1, 9), (2, 25), (3, 49), (4, 121), (5, 169), (6, 289),
(7, 361), (8, 529), (9, 841), (10, 961), (11, 1369), (12, 1681),
(13, 1849), (14, 2209), (15, 2809), (16, 3481), (17, 3721),
(18, 4489), (19, 5041), (20, 5329), (21, 6241), (22, 6889),
(23, 7921), (24, 9409)]

Cleaning up left-over state

So far, the implementations of the complete() method in our transducers haven’t been very interesting. They’ve simply delegated the call to next reducer in the chain. At the end of the chain, the complete() implementations of the Appending or Conjoining reducers simply return whatever was passed to them.

Sometimes, the state accumulated within the transducer needs to be returned as part of the final result. For example, consider a batching transducer which collects successive items together into non-overlapping groups of a specified size. The transducer maintains a pending batch as internal state, and when the batch has grown to the requisite size, accumulates it into the result. When we reach the end of the input data, there may be a partial batch. If our design calls for returning the partial batch, we need a way to detect the end of processing and deal with any internal state. This is where the complete() method comes into play. Here’s our batching transducer and its corresponding transducer factory:

class Batching:

    def __init__(self, reducer, size):
        self._reducer = reducer
        self._size = size
        self._pending = []

    def initial(self):
        return self._reducer.initial()

    def step(self, result, item):
        self._pending.append(item)
        if len(self._pending) == self._size:
            batch = self._pending
            self._pending = []
            return self._reducer.step(result, batch)
        return result

    def complete(self, result):
        r = self._reducer.step(result, self._pending) if len(self._pending) > 0 else result
        return self._reducer.complete(r)


def batching(size):
    """Create a transducer which produces non-overlapping batches."""

    if size < 1:
        raise ValueError("batching() size must be at least 1")

    def batching_transducer(reducer):
        return Batching(reducer, size)

    return batching_transducer
&#91;/pyg&#93;

Here we see that the complete method, calls <code>step()</code> on the underlying reducer one more time to pass on the partial batch.  Here it is in action:

[pyg lang="python" style="default" linenumbers=""]
>>> batched_primes_transducer = compose(filtering(is_prime), batching(3))
>>> transduce(batched_primes_transducer, Appending(), range(100))
[[2, 3, 5], [7, 11, 13], [17, 19, 23], [29, 31, 37], [41, 43, 47],
[53, 59, 61], [67, 71, 73], [79, 83, 89], [97]]

Notice in particular the partial batch included at the end.

With stateful transducers and special handling of result completion and clean-up in place, in the next article we’ll look at how to signal and detect early termination of a reduction operation, such as occurs when searching for and finding an item in a data series.

Stay in Touch

Our business hours are 08:00 to 16:00 CET/CEST.