In early April I added an “update” command to Singularity Registry HPC (see the pull request here and needed to start with a list of docker tags and parse them into version strings to sort, and still return the original tag for later use. I wound up creating a custom class and set of functions that use distutils.LooseVersion to support that, but in creating this “hard coded thing” I stepped back and had a question.

Can we more intelligentally compose custom parsing pipelines?

Specifically I wanted to:

  1. Start with a list of container tags for an image from a registry
  2. Filter out anything that looks like a commit, but isn't a string (e.g., latest)
  3. Derive a major, minor, and patch version for each, and filter to newest
  4. Sort!

For step 3, as an example if there was a 1.2.3-commitA and 1.2.3-commitB I’d only want to keep one, and the newer one of the two, so I could ask for “unique by patch” and filter the older one out. Ultimately of course I dove right in, and this led to the creation of Pipelib, which was an itch I terribly wanted to scratch! In this quick post, I want to share the overall design, because it was really fun to make.

Design

Before we talk about the design, let me show it to you.


import pipelib.steps as step
import pipelib.pipeline as pipeline

# A pipeline to process a list of strings
steps = (

   # convert everything to lowercase
   step.transform.ToLowercase(),

   # don't include anything with "two"
   ~step.filters.HasPatterns(filters=["two"])
)

# Strings to process
items = ['item-ONE', 'item-TWO', 'item-two-THREE']

p = pipeline.Pipeline(steps)

# The updated and transformed items
updated = p.run(items)
# ['item-one']

In the above, we take a pipeline object and add steps to it. That design is fairly simple, as the Pipeline class takes an optional iterable of things to process. I say “things” because we can give it steps, composed steps, or even entire other pipelines. Here is an example of adding an entire other Pipeline!


import pipelib.steps as step
import pipelib.pipeline as pipeline

fruits = ["Orange", "Melon", "Watermelon", "Fruit23"]
preprocess = pipeline.Pipeline(
    steps = (
        # Example of chaining steps together
        step.filters.HasMaxLength(length=8) & step.filters.HasAllLetters(),
    )
)

# Add this preprocess step alongside other steps (make lowercase)
steps = (
   step.transform.ToLowercase(),
   preprocess,
)

# Create a new pipeline and run
p = pipeline.Pipeline(steps)

# We should expect orange and melon!
updated = p.run(fruits)
['orange', 'melon']

Implementation-wise, this is also fairly simple. We can check the underlying class of the provided object and either add a single step, or insert a set of steps given another pipeline. In fact, pipelib comes with a small set of “pipelines” that are ready for you to use. For example, here is one to filter out “things that look like complete or partial git commits”


import pipelib.steps as step
import pipelib.pipeline as pipeline

# Pre-generated sets of steps we can use
import pipelib.pipelines as pipelines

pipeline.Pipeline(
    pipelines.git.RemoveCommits
).run(["832b1c", "832b1c645e562d5cc6e376e5a3e058c02a40d92a", "123-abcd"])
["123-abcd"]

This is something I found useful because people sometimes use commits as Docker tags, and I don’t find this incredibly meaningful as a version to compare to (and want to remove them). Under the hood, it looks like this:


RemoveCommits = pipeline.Pipeline(
    steps=(
        step.filters.HasMinLength(length=8) & ~step.filters.HasAllLowerLettersNumbers(),
    )
)

Do you also notice something interesting in the above? We are actually combining steps akin to logical operations. The above “pipeline” is actually just one step that combined other steps!


pipelines.git.RemoveCommits.steps
[HasMinLength_AND_NotHasAllLowerLettersNumbers]

Let’s step back and talk about some concepts that allow this.

Concepts

Pipeline

As we’ve seen above, a pipeline is a collection of steps that take, as input, a listing of items and return a parser and filtered list.

Step

A step is some action in a pipeline. The way this works is that we have different kinds of steps, and this makes them easy to implement and even test. A boolean step is akin to a filter, and is expected to return True or False to indicate if the item passes, e.g., False means it’s filtered out. Boolean steps are neat because they afford different kinds of logic and combination.

Logical Operations

Let’s say that we have a step that checks that an input is all letters:

step.filters.HasAllLetters()

For the above, anything that had a number (e.g., orange123) would be filtered out. But what if we wanted to inverse that, and allow passing of inputs that don’t have all letters (meaning we want numbers or special characters?) We can simply do that:

~step.filters.HasAllLetters()

Implementation wise, this was really fun to do! For Python to respect the logical operator ~ I simply define the “invert” function for the BooleanStep class.

def __invert__(self):
    """
    We can say "~step" and reverse the logic.
    """
    self.reverse = True
    return self

It sets an attribute “reverse” to True, and returns itself, that way we use the same step, but with this variable set to be true. What does that do? In the “run” function of the BooleanStep we basically retrieve an outcome from the underlying step (True or False) and simply reverse it given that boolean is True! Again, it’s very simple, and allows for doing things like this:


from pipelib.pipeline import Pipeline
import pipelib.steps as steps

Pipeline(~steps.filters.HasAllLetters()).run(["I-have-special-characters", "Idonot"])
['I-have-special-characters']

Pipeline(steps.filters.HasAllLetters()).run(["I-have-special-characters", "Idonot"])
['Idonot']

What if we wanted to combine steps? E.g., what if I want to say “has all letters” OR “has minimum length 10?” If we put the steps side by side we would only be able to support an AND - allowing passing through of entries that have all letters and the minimum length of 10. Pipelib supports both those operators - AND and OR as follows:


> step = steps.filters.HasAllLetters() & steps.filters.HasMinLength(length=10)
> step
HasAllLetters_AND_HasMinLength

Pipeline(step).run(["thisonewillpass", "thisoneno", "notthisone2"])
['thisonewillpass']

For both cases above, we are using the “and” and “or functions, respectively, and:

  1. Checking for class compatibility (both must be BooleanStep)
  2. Creating a list of composed steps to added to a class attribute "composed"
  3. Add the previous run functions too, naming based on the step class name
  4. Define a new run function that loops through the composed set, runs, updates and returns a shared result
  5. Name the class based on the combined names of the composed classes

For step 4 above, the operation (AND or OR) will vary depending on if the initial call was to “and” or “or”. The main difference between the two is that “OR” starts with a default of False (otherwise it would always return True) and AND starts with a default of True (otherwise it would always return False). And since we are always taking the first class “composed” attribute, this means that you can compose steps with other steps as many times as you like - a new check is simply added to the front or back of the list. The result (returned) is the new class that is ready to run. Here is what an OR looks like:


> step = steps.filters.HasAllLetters() | steps.filters.HasMinLength(length=10)
> step
HasAllLetters_OR_HasMinLength

Pipeline(step).run(["thisonewillpass", "veryshort", "12345"])
['thisonewillpass', 'veryshort']

If you are interested in this function, you can see the entire thing here.

Transformation Operations

A base step can be thought of as a transformation. Instead of expecting a boolean to be returned, we are instead expecting a new value or None. In this respect the transform step can also act as a boolean as a return of “None” will be removed from the list, however in most cases a transform is intended to perform an operation on the item passed. Here is an example of a transformation operation:

Pipeline(steps.transform.ToLowercase()).run(["AHHHH"])
['ahhhh']

Sort Operations

A sort operation is a step that is one level up. Instead of operating on individual items, the step re-defines a the higher level “run” function and does operations across the iterable. A good example from Pipelib is the use case that originally inspired me - to start with a messy list of Docker tags, do some parsing to derive versions, and return back a sorted list.


pipeline.Pipeline(steps.container.ContainerTagSort(ascending=False)).run(["1.2.3", "0.1.0", "8.3.2"])
['8.3.2', '1.2.3', '0.1.0']

pipeline.Pipeline(steps.container.ContainerTagSort(ascending=True)).run(["1.2.3", "0.1.0", "8.3.2"])
['0.1.0', '1.2.3', '8.3.2']

In the above we also demonstrate that steps can take parameters, such as the order of a sort! This particular sorting step also allows you to say you want to return unique major, minor, or patch versions.


pipeline.Pipeline(steps.container.ContainerTagSort(unique_major=True)).run(["1.2.3", "1.1.0", "8.3.2"])
['8.3.2', '1.2.3']

And if you wanted to do a more comprehensive clean up and sort, you could do something like this.

Wrapper

Pipelib needed a way to be able to pass around some parsed version of an item, but still maintain the original. For example, let’s say I’m parsing Docker tags into something that resembles a loose semantic version, I might have filtered 1.2.3-boop to be just 1.2.3, but at the end of the day I need the original tag to pull. Pipelib accomplishes this via wrappers.

A wrapper is conceptually that - an internal wrapper class to an item that allows for storing an original value, and still doing operations to change a current state. Wrappers are used inside steps and allow for things like sorting and comparison. You probably don’t need to worry about wrappers unless you want to develop for pipelib. By default, wrappers and “extracted away” to return the basic types. However, you can ask Pipelib to not do this unwrapping, and then you can get back the derived and original values:


tags  = ["1.2.3", "1.1.0", "8.3.2"]
updated = pipeline.Pipeline(steps.container.ContainerTagSort()).run(tags, unwrap=False)

# Notice that this just looks like a set of strings...
updated
['8.3.2', '1.2.3']

# But actually we have wrappers, that each have an _original attribute
type(updated[0])
pipelib.wrappers.version.VersionWrapper

Conclusion

I’ve had so much fun making this library! Like many of my projects it’s probably not super useful, but if you see a cool use case please let me know! I’m also happy to develop custom pipelines or steps for a use case that you might be interested in. Please don’t hesitate to ask me for help, I’m always running out of fun things to do :)

Why should I care?

Arguably you could just hard code this kind of filtering and sorting, but I think the idea of being able to customize and assemble steps is a cool one. If the steps are provided in a library it might might it slightly easier, or your work more reproducible because someone else can use the steps. And if you don’t care? That’s okay too. I recognize this was mostly a fun project, and yet-another-itch I really wanted to scratch because I’ve never made a design like this before, either in terms of the idea or underlying testing and automation.




Suggested Citation:
Sochat, Vanessa. "Pipelib: Simple Library to Parse, Filter, and Sort Things." @vsoch (blog), 07 May 2022, https://vsoch.github.io/2022/pipelib/ (accessed 28 Nov 22).