At the Software Architect 2015 conference in London I presented What if? Supporting decisions with software dynamics simulations.1 This talk introduces the idea of performing numerical simulations of software development teams and the products they build. The value in such simulations is to inform policy decisions and guide deliberate perturbations to the software development process, such as whether and when to add or remove personnel from a project. Simulations should not be used to make hard predications about, for example, when a particular project will be finished.
At PyCon UK 2015 I led a very well attended workshop with the goal of introducing Python developers to the tried-and-tested techniques and patterns of Domain Driven Design (DDD), in particular when used as part of an event-sourced architecture.
The two-and-a-half hour workshop was comprised of excerpts from our training course DDD Patterns in Python. Although the workshop material was heavily edited and compressed from the course – I’m confident that the majority of attendees grasped the main principles.
Several attendees have since asked for the introductory slides, which preceded the exercises. Here they are:
Sixty North training materials are for individual use. For training in a commercial setting please contact us to book a training course or obtain a license for the materials.
In the previous article in this series on transducers we looked at lazily evaluating transducers. This time we’ll look not at pulling output through a transducer chain from downstream, but at pushing input items into the chain from upstream.
All of the uses of transducers we’ve demonstrated in Python so far are probably better handled by existing and well established Python programming techniques, such as generator expressions and generator functions. At this point in the series, we move definitely beyond that into new territory where transducers bring completely new capabilities to Python.
One the key selling points of transducers is that they abstract the essence of a transformation away from the details of the data series that is being transformed. We’ll show this in Python by using transducers to transform a series of events modelled using Python coroutines.
Coroutines in Python
Coroutines in Python are little-used, and their workings are not widely known, so their implementation bears repeating here. If you’re familiar with the notion of coroutines in general, and the specifics of how they’re implemented in Python, you can skim over this section.
Coroutines are like generator functions insofar as they are resumable functions. In fact, coroutines in Python are generator functions which use
yield as an expression rather than a statement. What this means in practice is that generator function objects sport a
send() method which allows the client of the generator function to transmit information to a running generator and for the generator to receive this data as the value of the
yield expression. As usual, an example will serve to make things clearer.
We’ll start by defining a generator function which enters an infinite loop, waits at the
yield expression for a value to be received, and then prints this value to the console.
>>> def event_receiver(): ... while True: ... message = (yield) ... print("Message:", message) ... >>>
We create a generator object just the same as we would with any other generator:
>>> r = event_receiver() >>> r <generator object event_receiver at 0x100772828>
Now we’ll try to send it a message, using the
send() method of the generator object:
>>> r.send("message") Traceback (most recent call last): File "<stdin>", line 1, in <module> TypeError: can't send non-None value to a just-started generator >>>
This actually fails, because the generator code has not yet been executed at all. We need to prime the pump, so to speak, by advancing execution to the first occurrence of
yield. We can do this by passing the generator to the
We’ll fix this pump-priming annoyance of generator based coroutines shortly.
Now we can send messages:
>>> r.send("message") Message: message >>> r.send("another message") Message: another message
When we’re done, we terminate the coroutine by calling the
close() method. (This actually raises a
GeneratorExit exception at the site of the
yield expression, which allows control flow to exit the otherwise infinite loop; this special exception is intercepted by the Python runtime system, so it isn’t seen by us at the console).
>>> r.close() >>>
Any further attempts to
send() messages into the generator function cause
StopIteration to be raised. This, of course, is the normal means of indicating that a generator is exhausted:
>>> r.send("message") Traceback (most recent call last): File "<stdin>", line 1, in <module> StopIteration >>>
Priming generator-based coroutines
Now to address the awkwardness of having to prime coroutine generator functions by initially passing them to
next(). We can improve this with a function decorator which creates the generator object and calls next on our behalf. We’ll call the decorator
def coroutine(func): def start(*args, **kwargs): g = func(*args, **kwargs) next(g) return g return start
We’ll use our new decorator to assist in defining a slightly more sophisticated coroutine for printing, called
import sys @coroutine def rprint(sep='\n', end=''): """A coroutine sink which prints received items to stdout Args: sep: Optional separator to be printed between received items. end: Optional terminator to be printed after the last item. """ try: first_item = (yield) sys.stdout.write(str(first_item)) sys.stdout.flush() while True: item = (yield) sys.stdout.write(sep) sys.stdout.write(str((item))) sys.stdout.flush() except GeneratorExit: sys.stdout.write(end) sys.stdout.flush()
In this implementation, we intercept
GeneratorExit explicitly to give us the opportunity to print a terminator. We also regularly flush the stream so we get immediate feedback for our following experiments.
The opposite of a sink is a source. Until now, we’ve been sourcing ‘events’ ourself by sending them from the REPL, but to make this a little more interesting, we’ll cook up a function – just a plain old function, not a generator – which takes values from an iterable series and intermittently sends them, after a delay, to anything with a
send() method such as our coroutine generators. For fun, the random delay has a so-called Poisson distribution which mimics a radioactive source; imagine a device with a geiger counter which sends the next item from an iterable series each time an atom decays:
def poisson_source(rate, iterable, target): """Send events at random times with uniform probability. Args: rate: The average number of events to send per second. iterable: A series of items which will be sent to the target one by one. target: The target coroutine or sink. Returns: The completed value, or None if iterable was exhausted and the target was closed. """ for item in iterable: duration = random.expovariate(rate) sleep(duration) try: target.send(item) except StopIteration as e: return e.value target.close() return None
When either the iterable series is exhausted or the target signals it has terminated (by raising
StopIteration) we call
close() on the target. Note that by supplying an infinite iterable series we could make the source send events forever.
Let’s hook our source and sink together at the REPL:
>>> printer = rprint(sep=', ', end='\nDONE!\n') >>> count_to_nine = range(10) >>> poisson_source(rate=0.5, iterable=count_to_nine, target=printer) 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 DONE! >>>
Combined event sources and event sinks
Of course, we can build functions which act as both sinks and sources, transforming the messages they receive in some way and forwarding the processed results onwards to another sink. Here’s a combined source and sink function, which simply doubles the values it receives:
@coroutine def doubler(target): while True: item = (yield) doubled_item = item * 2 try: target.send(doubled_item) except StopIteration as e: return e.value
doubler() we chain the components of the pipeline together
>>> printer = rprint(sep=', ', end='\nDONE!\n') >>> count_to_nine = range(10) >>> poisson_source(rate=0.5, ... iterable=count_to_nine, ... target=doubler(target=printer)) 0, 2, 4, 6, 8, 10, 12, 14, 16, 18 DONE!
doubler() it’s but a short hop to a more general
mapper() which accepts an arbitrary transforming function:
@coroutine def mapper(transform, target): while True: item = (yield) transformed_item = transform(item) try: target.send(transformed_item) except StopIteration as e: return e.value
Used like so,
>>> printer = rprint(sep=', ', end='\nDONE!\n') >>> count_to_nine = range(10) >>> poisson_source(rate=0.5, iterable=count_to_nine, target=mapper(transform=square, target=printer)) 0, 1, 4, 9, 16, 25, 36, 49, 64, 81 DONE!
From here, you can see how we could also implement equivalents of
reduce() and so on to operate on the ‘push’ event stream modelled by Python coroutines.
The point here is that we can’t just re-use any existing functions which process ‘pull’ data series – such as the functions in
itertools – with ‘push’ data series. Each and every function needs to be reimplemented to accept values pushed from upstream, and send processed results downstream.
Transducers provide a way out of this quandary. We’ve demonstrated earlier in this series that ‘reduce’ is a fundamental operation, and by reimagining
reduce() into a more general
transduce() we were able to use the same transducers to operate on both eager and lazy data series. We can do the same with coroutine-based push events, by implementing a version of
transduce() which allows us to use any transducer to process a stream of such events.
reactive_transduce() is a coroutine which accepts two arguments: a transducer and a target sink to which the transduced results will be sent:
@coroutine def reactive_transduce(transducer, target=None): reducer = transducer(sending()) accumulator = target if (target is not None) else reducer.initial() try: while True: item = (yield) accumulator = reducer.step(accumulator, item) if isinstance(accumulator, Reduced): accumulator = accumulator.value break except GeneratorExit: pass return reducer.complete(accumulator)
reactive_transduce() function connects to the upstream end of a transducer chain, adapting from the coroutine protocol to the reducer interface. At the downstream end of the transducer chain, we need to adapt the other way, from the reducer interface to the coroutine protocol. To do this we use a reducer called
Sending, which we hard-wire as the ‘bottom’ reducer on the first line of
Sending reducer looks like this:
class Sending: def initial(self): return null_sink() def step(self, result, item): try: result.send(item) except StopIteration: return Reduced(result) else: return result def complete(result): result.close() return result
step() method literally sends the next item to the
result – which must therefore be a legitimate event sink. Should the sink indicate that it can’t accept a further item, by raising
StopIteration we return the result wrapped in the
Reduced sentinel. The
initial() method provides a legitimate sink – just a simple do-nothing sink defined as:
@coroutine def null_sink(): while True: _ = (yield)
Going back to
reactive_transduce() the main loop continues to iterate, receiving new values via a yield expression, until such time as
GeneratorExit is signalled by the client or the reducer signals termination by returning
When the main loop is exited by whatever means, we give the reducer opportunity to
complete(), and the
Sending.complete() method ensures that
close() is called on the target.
With these pieces in place, let’s look at how to use
reactive_transduce(). We’ll reproduce our previous example where we squared the output from
poisson_source(), but this time using the
mapping() transducer to do the work:
>>> poisson_source(rate=0.5, ... iterable=range(10), ... target=transduce(transducer=mapping(square), ... target=printer)) ... 0, 1, 4, 9, 16, 25, 36, 49, 64, 81 DONE!
The key point here is that we can now take an arbitrary transducer and reuse it with eager collections, lazy iterables, and push-events! In fact, simply by devising an appropriate transduce function we can use re-use our transducers in an arbitrary data-series processing context.
This is the true power of transducers: Data processing components completely abstracted away from how the input data arrives, or to where the output results are sent.
In the previous article in this series on transducers we looked at transducers which push more items downstream through the reducer chain than they receive from upstream. We promised that this would make lazy evaluation of transducer chains quite interesting.
When used with our
transduce() function, our mapping and filtering transducers are in some ways less flexible than the
filter() functions built into Python 3 because our
transduce() eagerly evaluates the reduction operation, whereas the built-in
filter() are lazy.1
The eagerness of our mapping and filtering transducers is not inherent in their implementation though. The eagerness is a result of the for-loop in
transduce() which must run to completion before returning. Thankfully, due to the clear separation of concerns between the reduction algorithm embodied in the transducers and the transducer “driver”, we can design an alternative transducible process which is lazy.
Here’s a reminder of our non-lazy
UNSET = object() def transduce(transducer, reducer, iterable, init=UNSET): r = transducer(reducer) accumulator = init if (init is not UNSET) else reducer.initial() for item in iterable: accumulator = r.step(accumulator, item) if isinstance(accumulator, Reduced): accumulator = accumulator.value break return r.complete(accumulator)
Recall that our non-lazy
transduce() function accepts, in addition to the transducer, a separate
reducer argument which is used to collect the results of applying the transducer into, say, a
list. Our lazy transduction function will be implemented as a Python generator function which yields each result as it becomes available, returning control to the caller, and then resumes execution when the next value is requested.
In order to handle early terminating transducers such as
First, stateful transducers which emit left-over state such as
Batching, and transducers which emit more elements than they consume such as
lazy_transduce() function is necessarily quite complex:
from collections import deque def lazy_transduce(transducer, iterable): """Lazy application of a transducer to an iterable.""" r = transducer(Appending()) accumulator = deque() reduced = False for item in iterable: accumulator = r.step(accumulator, item) if isinstance(accumulator, Reduced): accumulator = accumulator.value reduced = True yield from all_pending_items_in(accumulator) if reduced: break left_overs = r.complete(accumulator) assert left_overs is accumulator yield from all_pending_item_in(left_overs) def all_pending_items_in(queue): while queue: yield queue.popleft()
Our function accepts only a transducer and the iterable series of source items. There’s no need to provide a reducer, because this function hardwires it’s own on the first line, where we provide an
Appending reducer. Notice that unlike the eager
transduce() we never call the
Appending.initial() method to retrieve the seed value for the reduction, so we must provide a legitimate mutable sequence type. For reasons that will become clear shortly, we provide a
deque from the Python Standard Library
collections module2 – a double-ended queue, which supports
append() to push items into the right-hand end.
We also set a flag
reduced so we know when we’re finished.
The first part of the body of the for-loop is the same as for eager
transduce(): we step the transducer, accumulating each item, looking for the sentinel
Reduced value as we go. If we encounter
Reduced we un-box its contents and set the
reduced flag to signal that we’re (nearly) done.
The next part of the for-loop body is where things really diverge from the eager
transduce() version. Bearing in mind that the call to
step() may have appended multiple items to the accumulator, we now need to yield them one by-one to the client. We do this using the
yield from statement which delegates to another generator function
all_items_pending_in() which simply keeps yielding items from the queue until it is empty.
At the end of the for-loop, we check the
reduced flag, and
break out of the loop if we’re done.
After the loop, with all the input items dealt with, we make the necessary call to
complete(), bearing in mind that this may append further results to the accumulator queue. After a sanity check that the return value from
complete() is indeed the queue (which we know it should be, because
Appending.complete() simply returns its argument) we use the
yield from all_pending_items_in(left_overs) statement one last time to yield any lingering results to the client.
In order to demonstrate the laziness in action, we’ll create a little wrapper around the built-in
range() sequence that logs the yielded integers to the console:
def logging_range(n): for i in range(n): print("i =", i) yield i
Here it in in action, demonstrating it’s laziness:
>>> primes_repeating = compose(filtering(is_prime), repeating(3)) >>> repeated_primes = lazy_transduce(primes_repeating, logging_range(100)) >>> repeated_primes <generator object lazy_transduce at 0x103027708> >>> next(repeated_primes) i = 0 i = 1 i = 2 2 >>> next(repeated_primes) 2 >>> next(repeated_primes) 2 >>> next(repeated_primes) i = 3 3 >>> next(repeated_primes) 3 >>> next(repeated_primes) 3 >>> next(repeated_primes) i = 4 i = 5 5 >>> next(repeated_primes) 5 >>> next(repeated_primes) 5 >>> next(repeated_primes) i = 6 i = 7 7 >>> next(repeated_primes) 7 >>> next(repeated_primes) 7 >>> next(repeated_primes) i = 8 i = 9 i = 10 i = 11 11 >>> next(repeated_primes) 11 >>> next(repeated_primes) 11 >>> next(repeated_primes) i = 12 i = 13 13
So we see that transducers allow orthogonal specification of the reducing operation, the result collection and whether to evaluate eagerly or lazily. Neat!
In a future article we’ll look at using transducers to process ‘push’ events modelled by Python coroutines.
In the previous article in our series on understanding transducers through Python we showed how to support early termination of a reduction operation. This time, we’ll demonstrate how transducers can produce more items than they consume. Although this may seem obvious, it leads to some important consequences for implementing lazy evaluation of transducers, which is what we’ll look at next time.
Consider a transducer
Repeating which repeats each source item multiple times into the output:
class Repeating: def __init__(self, reducer, num_times): self._reducer = reducer self._num_times = num_times def initial(self): return self._reducer.initial() def step(self, result, item): for _ in range(self._num_times): result = self._reducer.step(result, item) return result def complete(self, result): return self._reducer.complete(result) def repeating(num_times): if num_times < 0: raise ValueError("num_times cannot be negative") def repeating_transducer(reducer): return Repeating(reducer, num_times) return repeating_transducer [/pyg] The key point to notice here, is that each call to <code>Repeating.step()</code> results in multiple calls to the underlying reducer's <code>self._reducer.step()</code>, thereby injecting more items into the output series than are received in the input series. By composing it with our filtering primality checking predicate, we can use it to repeat each prime number three times: [pyg lang="python" style="default" linenumbers=""] >>> primes_repeating = compose(filtering(is_prime), repeating(3)) >>> transduce(primes_repeating, Appending(), range(100)) [2, 2, 2, 3, 3, 3, 5, 5, 5, 7, 7, 7, 11, 11, 11, 13, 13, 13, 17, 17, 17, 19, 19, 19, 23, 23, 23, 29, 29, 29, 31, 31, 31, 37, 37, 37, 41, 41, 41, 43, 43, 43, 47, 47, 47, 53, 53, 53, 59, 59, 59, 61, 61, 61, 67, 67, 67, 71, 71, 71, 73, 73, 73, 79, 79, 79, 83, 83, 83, 89, 89, 89, 97, 97, 97]
In the next article, we’ll see that although seemingly fairly innocuous, support for item injecting transducers such as
Repeating complicates lazy evaluation quite a bit!