In the previous article in this series on transducers we looked at transducers which push more items downstream through the reducer chain than they receive from upstream. We promised that this would make lazy evaluation of transducer chains quite interesting.
When used with our
transduce() function, our mapping and filtering transducers are in some ways less flexible than the
filter() functions built into Python 3 because our
transduce() eagerly evaluates the reduction operation, whereas the built-in
filter() are lazy.1
The eagerness of our mapping and filtering transducers is not inherent in their implementation though. The eagerness is a result of the for-loop in
transduce() which must run to completion before returning. Thankfully, due to the clear separation of concerns between the reduction algorithm embodied in the transducers and the transducer “driver”, we can design an alternative transducible process which is lazy.
Here’s a reminder of our non-lazy
UNSET = object() def transduce(transducer, reducer, iterable, init=UNSET): r = transducer(reducer) accumulator = init if (init is not UNSET) else reducer.initial() for item in iterable: accumulator = r.step(accumulator, item) if isinstance(accumulator, Reduced): accumulator = accumulator.value break return r.complete(accumulator)
Recall that our non-lazy
transduce() function accepts, in addition to the transducer, a separate
reducer argument which is used to collect the results of applying the transducer into, say, a
list. Our lazy transduction function will be implemented as a Python generator function which yields each result as it becomes available, returning control to the caller, and then resumes execution when the next value is requested.
In order to handle early terminating transducers such as
First, stateful transducers which emit left-over state such as
Batching, and transducers which emit more elements than they consume such as
lazy_transduce() function is necessarily quite complex:
from collections import deque def lazy_transduce(transducer, iterable): """Lazy application of a transducer to an iterable.""" r = transducer(Appending()) accumulator = deque() reduced = False for item in iterable: accumulator = r.step(accumulator, item) if isinstance(accumulator, Reduced): accumulator = accumulator.value reduced = True yield from all_pending_items_in(accumulator) if reduced: break left_overs = r.complete(accumulator) assert left_overs is accumulator yield from all_pending_item_in(left_overs) def all_pending_items_in(queue): while queue: yield queue.popleft()
Our function accepts only a transducer and the iterable series of source items. There’s no need to provide a reducer, because this function hardwires it’s own on the first line, where we provide an
Appending reducer. Notice that unlike the eager
transduce() we never call the
Appending.initial() method to retrieve the seed value for the reduction, so we must provide a legitimate mutable sequence type. For reasons that will become clear shortly, we provide a
deque from the Python Standard Library
collections module2 – a double-ended queue, which supports
append() to push items into the right-hand end.
We also set a flag
reduced so we know when we’re finished.
The first part of the body of the for-loop is the same as for eager
transduce(): we step the transducer, accumulating each item, looking for the sentinel
Reduced value as we go. If we encounter
Reduced we un-box its contents and set the
reduced flag to signal that we’re (nearly) done.
The next part of the for-loop body is where things really diverge from the eager
transduce() version. Bearing in mind that the call to
step() may have appended multiple items to the accumulator, we now need to yield them one by-one to the client. We do this using the
yield from statement which delegates to another generator function
all_items_pending_in() which simply keeps yielding items from the queue until it is empty.
At the end of the for-loop, we check the
reduced flag, and
break out of the loop if we’re done.
After the loop, with all the input items dealt with, we make the necessary call to
complete(), bearing in mind that this may append further results to the accumulator queue. After a sanity check that the return value from
complete() is indeed the queue (which we know it should be, because
Appending.complete() simply returns its argument) we use the
yield from all_pending_items_in(left_overs) statement one last time to yield any lingering results to the client.
In order to demonstrate the laziness in action, we’ll create a little wrapper around the built-in
range() sequence that logs the yielded integers to the console:
def logging_range(n): for i in range(n): print("i =", i) yield i
Here it in in action, demonstrating it’s laziness:
>>> primes_repeating = compose(filtering(is_prime), repeating(3)) >>> repeated_primes = lazy_transduce(primes_repeating, logging_range(100)) >>> repeated_primes <generator object lazy_transduce at 0x103027708> >>> next(repeated_primes) i = 0 i = 1 i = 2 2 >>> next(repeated_primes) 2 >>> next(repeated_primes) 2 >>> next(repeated_primes) i = 3 3 >>> next(repeated_primes) 3 >>> next(repeated_primes) 3 >>> next(repeated_primes) i = 4 i = 5 5 >>> next(repeated_primes) 5 >>> next(repeated_primes) 5 >>> next(repeated_primes) i = 6 i = 7 7 >>> next(repeated_primes) 7 >>> next(repeated_primes) 7 >>> next(repeated_primes) i = 8 i = 9 i = 10 i = 11 11 >>> next(repeated_primes) 11 >>> next(repeated_primes) 11 >>> next(repeated_primes) i = 12 i = 13 13
So we see that transducers allow orthogonal specification of the reducing operation, the result collection and whether to evaluate eagerly or lazily. Neat!
In a future article we’ll look at using transducers to process ‘push’ events modelled by Python coroutines.