Pypes: Adding Pipes to Python
Pipes are a mainstay of inter-process communication in Unix, and pipelines are also a hugely powerful feature in functional programming languages. They also map well to data science and machine learning applications: Many models can be thought of as chains of transformations applied to inputs. Defining models as pipelines of functionlike objects reflects this view, and encourages good habits from functional programming (functions without side effects, modular and testable code, avoiding mutation, etc). It turns out that adding pipelines to Python is pretty straightforward, and also provides a mechanism to include some “leaky” type checking along the way.
First, let’s look at what we want a pipeline to look like. As a very simple example, consider two functions that we want to compose:
def f(x: str) -> int:
return len(x)
def g(x: int) -> int:
return x + 5
Notice the type hints (a feature introduced in Python 3). We want to define an easy way to do composition of these two functions, as well as raise an error if they have incompatible type signatures—something like:
pipe = f | g
pipe("abc") # returns 8
I’ve chosen a “|” operator to represent composition by analogy with unix, and in this pipeline data flows from left to right.
The easiest way to add a pipe operator is to define a class
to wrap our function objects. Class instances are callable (through
the __call__
special method), and we can define our own operators
in them as well. Let’s define a Pipe
class:
class Pipe(object):
def __init__(self, func):
self.func = func
def __call__(self, *args, **kwargs):
return self.func(*args, **kwargs)
def __or__(self, x):
if not callable(x):
raise TypeError("target must be callable")
if self.func is None:
return Pipe(func=x)
def compose(*args, **kwargs):
return x(self.func(*args, **kwargs))
return Pipe(func=compose)
Recall that the __or__
special method defines the bitwise-OR operator (|
),
so this function is where the magic happens. We can now do something like
the following:
pipe = Pipe(f) | g
(and we can generate pipelines of functions on the fly as needed.)
This is a good start, but there’s a little more to be done: we’re not
taking advantage of the type hints functionality to verify the integrity of our
pipeline, and having to wrap the first function in a Pipe
object is
still a little clunky.
Let’s define a utility function to retrieve a callable object’s signature:
import typing
def _get_signature(funclike):
if isinstance(funclike, Pipe):
return funclike.signature
if hasattr(funclike, "signature"):
return funclike.signature
if hasattr(funclike, "__call__"):
sig = typing.get_type_hints(funclike.__call__)
if len(sig) != 0:
return sig
return typing.get_type_hints(funclike)
There’s a few things going on in this function. First, I’ve
added an additional attribute to our Pipe
class called
signature
. This is where we’ll store the signature of
the functions we wrap (since our Pipe
class is generic, we
can’t a priori define good type hints for its __call__
method).
We’re also allowing for the possibility that some other callable
object defines a signature
field, and if the object defines
a __call__
method with non-empty type hints, we grab the
type hints from there. If none of those conditions are satisfied,
we fall back on grabbing the type hints from the object itself.
In action, this function will produce a dictionary defining the
signature, like so:
def f(x: int) -> str:
return "{}".format(x)
_get_signature(f) # returns {"x": int, "return": str}
Checking that two signatures are at least theoretically composable is also straightforward:
def __check_composition(sig1, sig2):
for k, v in sig2.items():
if k == "return":
continue
if v == sig1["return"]:
return True
return False
This composition-compatability checking is leaky in that it only checks that at least one input to the downstream function matches the return type of the upstream function. Thus, it’s possible to “fool” this composition check if the downstream function isn’t unary. (And, of course, type hints are not really the same as type signatures—so a careless programmer could always put incorrect type hints!)
Here’s our new version of the Pipe
class, with signatures
and some additional error handling:
class Pipe(object):
def __init__(self, func=None, signature=None):
if func is not None and not callable(func):
raise TypeError("func argument must be callable")
self.func = func
self.signature = signature
if signature is None and self.func is not None:
self.signature = _get_signature(self.func)
def __call__(self, *args, **kwargs):
if self.func is None:
return
return self.func(*args, **kwargs)
def __or__(self, x):
if not callable(x):
raise TypeError("target must be callable")
if self.func is None:
return Pipe(func=x)
sig1 = self.signature
sig2 = _get_signature(x)
if not _check_composition(sig1, sig2):
raise ValueError("Incompatible signatures: {}, {}".format(sig1, sig2))
def compose(*args, **kwargs):
return x(self.func(*args, **kwargs))
newsig = self.signature.copy()
newsig["return"] = sig2["return"]
return Pipe(func=compose, signature=newsig)
For extra credit, we can also define a special Pipe
subclass to indicate the
start of the pipeline. Let’s call it Input
:
class Input(Pipe):
def __init__(self, signature):
self.signature = {"input": signature, "return": signature}
self.func = lambda x: x
One reason I like doing this is that it makes the pipeline definition a little bit more self-documenting. Here’s a full example:
def f(x: str) -> int:
return len(x)
def g(x: int) -> int:
return x + 5
def adder(x):
def h(y: int) -> int:
return x + y
return h
class Caller(object):
def __init__(self, key: str):
self.key = key
def __call__(self, x: int) -> dict:
return {key: x}
pipeline = ( Input(str)
| f
| adder(-4)
| g
| Caller("hello!")
)
pipeline("abc") # returns {"hello!": 4}
Many of the examples in Python lean heavily into object-oriented programming, but I hope this post shows that Python can actually be very functional if you want it to be!
One additional advantage of defining pipelines in this way is that,
although our implementation of the Pipe
object is quite rudimentary,
we’ve really separated the definition of the pipeline from its execution.
It’s easy to imagine leveraging some more advanced tools (e.g. Dask or
the new-ish futures
module) to make each component in a pipeline execute
concurrently and asynchronously (and perhaps even distributed).