Good with Computers

The Sixty North Blog

Terminating Transducers

In the previous article in this series on transducers, we showed how to implement stateful transducers, and how to deal with any left-over state or other clean-up operations when the reduction operation is complete. Sometimes, however, there is no need to process a whole series of items in order to produce the final result. For example, if we just want the first item from a series, we only need to process the first item. Our existing implementation of transduce() looks like this:

UNSET = object()

def transduce(transducer, reducer, iterable, init=UNSET):
    r = transducer(reducer)
    accumulator = init if (init is not UNSET) else reducer.initial()
    for item in iterable:
        accumulator = r.step(accumulator, item)
    return r.complete(accumulator)

It uses a for-loop to iteratively apply the reducing function, but there is no way of exiting the loop early.

To accommodate early termination, we’ll allow our step() method to return a sentinel value indicating that reduction is complete. This sentinel will actually be a ‘box’ called called Reduced which we can detect by type, and which will contain the final value:

class Reduced:
    """A sentinel 'box' used to return the final value of a reduction."""

    def __init__(self, value):
        self._value = value

    @property
    def value(self):
        return self._value

It has only an initialiser which accepts a single value, and a property to provide read-only access to that value.

Our updated Reduced detecting transduce() function looks like this:

def transduce(transducer, reducer, iterable, init=UNSET):
    r = transducer(reducer)
    accumulator = init if (init is not UNSET) else reducer.initial()
    for item in iterable:
        accumulator = r.step(accumulator, item)
        if isinstance(accumulator, Reduced):
            accumulator = accumulator.value
            break
    return r.complete(accumulator)

When we detect Reduced we simply un-box its value and then break from the loop.

Our First transducer, which accepts an optional predicate looks like this:

class First:

      def __init__(self, reducer, predicate):
          self._reducer = reducer
          self._predicate = predicate

      def initial(self):
          return self._reducer.initial()

      def step(self, result, item):
          return Reduced(self._reducer.step(result, item)) if self._predicate(item) else result

      def complete(self, result):
          return self._reducer.complete(result)


  def first(predicate=None):
      predicate = true if predicate is None else predicate

      def first_transducer(reducer):
          return First(reducer, predicate)

      return first_transducer

Notice that true here can be passed to the transducer constructor in lieu of the predicate function being supplied; this isn’t the Python constant True but a function which always returns True, irrespective of what it is passed. We need to define ourselves:

def true(*args, **kwargs):
    return True

Putting it all together, we get:

>>> transduce(
...     compose(
...         filtering(is_prime),
...         mapping(square),
...         first(lambda x: x > 1000)),
...     Appending(),
...     range(1000))
[1369]

Notice that single result is returned as the only element of a list. This is because we used “Appending“ as our reducer. If we’d prefer a scalar value to be returned, we can simply define a new reducer called “ExpectingSingle“ that only expects exactly one “step()“ operation to be performed:

class ExpectingSingle:

    def __init__(self):
        self._num_steps = 0

    def initial(self):
        return None

    def step(self, result, item):
        assert result is None
        self._num_steps += 1
        if self._num_steps > 1:
            raise RuntimeError("Too many steps!")
        return item

    def complete(self, result):
        if self._num_steps < 1:
            raise RuntimeError("Too few steps!")
        return result
&#91;/pyg&#93;


Reattempting our example, we now get a scalar value::

&#91;pyg lang="python" style="default" linenumbers=""&#93;
>>> transduce(
...     compose(
...         filtering(is_prime),
...         mapping(square),
...         first(lambda x: x > 1000)),
...     ExpectingSingle(),
...     range(1000))
1369

We’ve now exercised all the parts of the transducer protocol:

  • Association of the initial value through initial()
  • Incremental reduction through step()
  • Completion and clean-up of state through complete()
  • Signalling early completion with Reduced()

In the next article, we’ll show how this protocol allows transducers to produce more items than they consume, which may be obvious, be is an important case to be handled when we implement lazy transduction in a future article.

Stay in Touch

Our business hours are 08:00 to 16:00 CET/CEST.