In the previous article in this series on transducers, we showed how to implement stateful transducers, and how to deal with any left-over state or other clean-up operations when the reduction operation is complete. Sometimes, however, there is no need to process a whole series of items in order to produce the final result. For example, if we just want the first item from a series, we only need to process the first item. Our existing implementation of transduce() looks like this:
UNSET = object() def transduce(transducer, reducer, iterable, init=UNSET): r = transducer(reducer) accumulator = init if (init is not UNSET) else reducer.initial() for item in iterable: accumulator = r.step(accumulator, item) return r.complete(accumulator)
It uses a for-loop to iteratively apply the reducing function, but there is no way of exiting the loop early.
To accommodate early termination, we'll allow our step() method to return a sentinel value indicating that reduction is complete. This sentinel will actually be a 'box' called called Reduced which we can detect by type, and which will contain the final value:
class Reduced: """A sentinel 'box' used to return the final value of a reduction.""" def __init__(self, value): self._value = value @property def value(self): return self._value
It has only an initialiser which accepts a single value, and a property to provide read-only access to that value.
Our updated Reduced detecting transduce() function looks like this:
def transduce(transducer, reducer, iterable, init=UNSET): r = transducer(reducer) accumulator = init if (init is not UNSET) else reducer.initial() for item in iterable: accumulator = r.step(accumulator, item) if isinstance(accumulator, Reduced): accumulator = accumulator.value break return r.complete(accumulator)
When we detect Reduced we simply un-box its value and then break from the loop.
Our First transducer, which accepts an optional predicate looks like this:
class First: def __init__(self, reducer, predicate): self._reducer = reducer self._predicate = predicate def initial(self): return self._reducer.initial() def step(self, result, item): return Reduced(self._reducer.step(result, item)) if self._predicate(item) else result def complete(self, result): return self._reducer.complete(result) def first(predicate=None): predicate = true if predicate is None else predicate def first_transducer(reducer): return First(reducer, predicate) return first_transducer
Notice that true here can be passed to the transducer constructor in lieu of the predicate function being supplied; this isn't the Python constant True but a function which always returns True, irrespective of what it is passed. We need to define ourselves:
def true(*args, **kwargs): return True
Putting it all together, we get:
>>> transduce( ... compose( ... filtering(is_prime), ... mapping(square), ... first(lambda x: x > 1000)), ... Appending(), ... range(1000)) 
Notice that single result is returned as the only element of a list. This is because we used Appending as our reducer. If we'd prefer a scalar value to be returned, we can simply define a new reducer called ExpectingSingle that only expects exactly one step() operation to be performed:
class ExpectingSingle: def __init__(self): self._num_steps = 0 def initial(self): return None def step(self, result, item): assert result is None self._num_steps += 1 if self._num_steps > 1: raise RuntimeError("Too many steps!") return item def complete(self, result): if self._num_steps < 1: raise RuntimeError("Too few steps!") return result
Reattempting our example, we now get a scalar value:
.. code-block:: python
>>> transduce( ... compose( ... filtering(is_prime), ... mapping(square), ... first(lambda x: x > 1000)), ... ExpectingSingle(), ... range(1000)) 1369
We've now exercised all the parts of the transducer protocol:
- Association of the initial value through initial()
- Incremental reduction through step()
- Completion and clean-up of state through complete()
- Signalling early completion with Reduced()
In the next article, we'll show how this protocol allows transducers to produce more items than they consume, which may be obvious, be is an important case to be handled when we implement lazy transduction in a future article.