The core of thepipe framework.

Module Contents

thepipe.core.STATS_LIMIT = 100000[source]
thepipe.core.MODULE_CONFIGURATION = pipeline.toml[source]
class thepipe.core.Blob(*args, **kwargs)[source]

Bases: collections.OrderedDict

A simple (ordered) dict with a fancy name. This should hold the data.

class thepipe.core.Module(name=None, **parameters)[source]

The module which can be attached to the pipeline


Configure module, like instance variables etc.

expose(self, obj, name)[source]

Expose an object as a service to the Pipeline

print(self, *args, **kwargs)[source]
property name(self)[source]

The name of the module

property parameters(self)[source]
property processed_parameters(self)[source]
add(self, name, value)[source]

Add the parameter with the desired value to the dict

get(self, name, default=None)[source]

Return the value of the requested parameter or default if None.

require(self, name)[source]

Return the value of the requested parameter or raise an error.

require_service(self, name, why='')[source]

Add a service requirement with an optional reason


Prepare! Executed between configure and the first process

process(self, blob)[source]

Knead the blob and return it


Clean everything up.


Do the last few things before calling finish()

open_file(self, filename, gzipped=False)[source]

Open the file with filename

class thepipe.core.Pipeline(blob=None, timeit=False, configfile=None, stats_limit=100000)[source]

The holy pipeline which holds everything together.

If initialised with timeit=True, all modules will be monitored, otherwise only the overall statistics and modules with timeit=True will be shown.

timeit: bool, optional [default=False]

Display time profiling statistics for the pipeline?

configfile: str, optional [default=’pipeline.toml’]

Path to a configuration file (TOML format) which contains parameters for attached modules.

stats_limit: int, optional [default=100000]

The number of cycles to keep track of when using timeit=True

load_configuration(self, configfile)[source]
attach(self, module_factory, name=None, **kwargs)[source]

Attach a module to the pipeline system

drain(self, cycles=None)[source]

Execute _drain while trapping KeyboardInterrupt


Call finish() on each attached module

class thepipe.core.ServiceManager[source]

Takes care of pipeline services.

register(self, name, service)[source]

Service registration


name: Name of the provided service service: Reference to the service

get_missing_services(self, services)[source]

Check if all required services are provided


services: List the service names which are required


List with missing services