Pypes: Adding Pipes to Python

Pypes: Adding Pipes to Python

Written on August 2, 2019

Pipes are a mainstay of inter-process communication in Unix, and pipelines are also a hugely powerful feature in functional programming languages. They also map well to data science and machine learning applications: Many models can be thought of as chains of transformations applied to inputs. Defining models as pipelines of functionlike objects reflects this view, and encourages good habits from functional programming (functions without side effects, modular and testable code, avoiding mutation, etc). It turns out that adding pipelines to Python is pretty straightforward, and also provides a mechanism to include some “leaky” type checking along the way.

First, let’s look at what we want a pipeline to look like. As a very simple example, consider two functions that we want to compose:

def f(x: str) -> int:
    return len(x)

def g(x: int) -> int:
    return x + 5

Notice the type hints (a feature introduced in Python 3). We want to define an easy way to do composition of these two functions, as well as raise an error if they have incompatible type signatures—something like:

pipe = f | g
pipe("abc") # returns 8

I’ve chosen a “|” operator to represent composition by analogy with unix, and in this pipeline data flows from left to right.

The easiest way to add a pipe operator is to define a class to wrap our function objects. Class instances are callable (through the __call__ special method), and we can define our own operators in them as well. Let’s define a Pipe class:

class Pipe(object):
    def __init__(self, func):
        self.func = func
    
    def __call__(self, *args, **kwargs):
        return self.func(*args, **kwargs)
    
    def __or__(self, x):
        if not callable(x):
            raise TypeError("target must be callable")
        
        if self.func is None:
            return Pipe(func=x)
        
        def compose(*args, **kwargs):
            return x(self.func(*args, **kwargs))
        
        return Pipe(func=compose)

Recall that the __or__ special method defines the bitwise-OR operator (|), so this function is where the magic happens. We can now do something like the following:

pipe = Pipe(f) | g

(and we can generate pipelines of functions on the fly as needed.)

This is a good start, but there’s a little more to be done: we’re not taking advantage of the type hints functionality to verify the integrity of our pipeline, and having to wrap the first function in a Pipe object is still a little clunky.

Let’s define a utility function to retrieve a callable object’s signature:

import typing

def _get_signature(funclike):
    if isinstance(funclike, Pipe):
        return funclike.signature
    if hasattr(funclike, "signature"):
        return funclike.signature
    if hasattr(funclike, "__call__"):
        sig = typing.get_type_hints(funclike.__call__)
        if len(sig) != 0:
            return sig
    return typing.get_type_hints(funclike)

There’s a few things going on in this function. First, I’ve added an additional attribute to our Pipe class called signature. This is where we’ll store the signature of the functions we wrap (since our Pipe class is generic, we can’t a priori define good type hints for its __call__ method). We’re also allowing for the possibility that some other callable object defines a signature field, and if the object defines a __call__ method with non-empty type hints, we grab the type hints from there. If none of those conditions are satisfied, we fall back on grabbing the type hints from the object itself. In action, this function will produce a dictionary defining the signature, like so:

def f(x: int) -> str:
    return "{}".format(x)

_get_signature(f) # returns {"x": int, "return": str}

Checking that two signatures are at least theoretically composable is also straightforward:

def __check_composition(sig1, sig2):
    for k, v in sig2.items():
        if k == "return":
            continue
        if v == sig1["return"]:
            return True
    return False

This composition-compatability checking is leaky in that it only checks that at least one input to the downstream function matches the return type of the upstream function. Thus, it’s possible to “fool” this composition check if the downstream function isn’t unary. (And, of course, type hints are not really the same as type signatures—so a careless programmer could always put incorrect type hints!)

Here’s our new version of the Pipe class, with signatures and some additional error handling:

class Pipe(object):
    def __init__(self, func=None, signature=None):
        if func is not None and not callable(func):
            raise TypeError("func argument must be callable")
        
        self.func = func
        self.signature = signature
        if signature is None and self.func is not None:
            self.signature = _get_signature(self.func)

    
    def __call__(self, *args, **kwargs):
        if self.func is None:
            return
        return self.func(*args, **kwargs)
    
    def __or__(self, x):
        if not callable(x):
            raise TypeError("target must be callable")
        
        if self.func is None:
            return Pipe(func=x)
        
        sig1 = self.signature
        sig2 = _get_signature(x)

        if not _check_composition(sig1, sig2):
            raise ValueError("Incompatible signatures: {}, {}".format(sig1, sig2))
        
        def compose(*args, **kwargs):
            return x(self.func(*args, **kwargs))

        newsig = self.signature.copy()
        newsig["return"] = sig2["return"]
        
        return Pipe(func=compose, signature=newsig)

For extra credit, we can also define a special Pipe subclass to indicate the start of the pipeline. Let’s call it Input:

class Input(Pipe):
    def __init__(self, signature):
        self.signature = {"input": signature, "return": signature}
        self.func = lambda x: x

One reason I like doing this is that it makes the pipeline definition a little bit more self-documenting. Here’s a full example:

def f(x: str) -> int:
    return len(x)

def g(x: int) -> int:
    return x + 5

def adder(x):
    def h(y: int) -> int:
        return x + y
    return h

class Caller(object):
    def __init__(self, key: str):
        self.key = key
    def __call__(self, x: int) -> dict:
        return {key: x}

pipeline = ( Input(str)
           | f
           | adder(-4)
           | g
           | Caller("hello!")
           )

pipeline("abc") # returns {"hello!": 4}

Many of the examples in Python lean heavily into object-oriented programming, but I hope this post shows that Python can actually be very functional if you want it to be!

One additional advantage of defining pipelines in this way is that, although our implementation of the Pipe object is quite rudimentary, we’ve really separated the definition of the pipeline from its execution. It’s easy to imagine leveraging some more advanced tools (e.g. Dask or the new-ish futures module) to make each component in a pipeline execute concurrently and asynchronously (and perhaps even distributed).