Package Contents

class thepipe.Blob(*args, **kwargs)[source]

Bases: collections.OrderedDict

A simple (ordered) dict with a fancy name. This should hold the data.

class thepipe.Module(name=None, **parameters)[source]

The module which can be attached to the pipeline


Configure module, like instance variables etc.

expose(self, obj, name)

Expose an object as a service to the Pipeline

print(self, *args, **kwargs)
property name(self)

The name of the module

property parameters(self)
property processed_parameters(self)
add(self, name, value)

Add the parameter with the desired value to the dict

get(self, name, default=None)

Return the value of the requested parameter or default if None.

require(self, name)

Return the value of the requested parameter or raise an error.

require_service(self, name, why='')

Add a service requirement with an optional reason


Prepare! Executed between configure and the first process

process(self, blob)

Knead the blob and return it


Clean everything up.


Do the last few things before calling finish()

open_file(self, filename, gzipped=False)

Open the file with filename

class thepipe.Pipeline(blob=None, timeit=False, configfile=None, stats_limit=100000)[source]

The holy pipeline which holds everything together.

If initialised with timeit=True, all modules will be monitored, otherwise only the overall statistics and modules with timeit=True will be shown.

timeit: bool, optional [default=False]

Display time profiling statistics for the pipeline?

configfile: str, optional [default=’pipeline.toml’]

Path to a configuration file (TOML format) which contains parameters for attached modules.

stats_limit: int, optional [default=100000]

The number of cycles to keep track of when using timeit=True

load_configuration(self, configfile)
attach(self, module_factory, name=None, **kwargs)

Attach a module to the pipeline system

drain(self, cycles=None)

Execute _drain while trapping KeyboardInterrupt


Call finish() on each attached module

class thepipe.Provenance[source]

The provenance manager.

property outfile(self)
start_activity(self, name)

Starts a new activity and returns its UUID for future reference

finish_activity(self, uuid, status='completed')

Finishes an activity with the given UUID

record_configuration(self, configuration)

Record configuration parameters (e.g. of the pipeline)

record_input(self, url, comment='')
record_output(self, url, comment='')
property current_activity(self)
activity(self, name)
property provenance(self)
property backlog(self)
as_json(self, **kwargs)

Dump provenance as JSON string. kwargs are passed to json.dumps