Source code for thepipe.core

# -*- coding: utf-8 -*-
# Filename: core.py
"""
The core of thepipe framework.

"""
from collections import deque, OrderedDict
import inspect
import gzip
import signal
import os
import time
from timeit import default_timer as timer
import types

import toml
import numpy as np

from .tools import peak_memory_usage, ignored, Timer
from .logger import get_logger, get_printer
from .provenance import Provenance

__author__ = "Tamas Gal"
__credits__ = ["Moritz Lotze", "Thomas Heid", "Johannes Schumann"]
__license__ = "MIT"
__email__ = "tgal@km3net.de"

[docs]STATS_LIMIT = 100000
[docs]MODULE_CONFIGURATION = 'pipeline.toml'
[docs]RESERVED_ARGS = set(['every', 'only_if', 'timeit'])
[docs]class Blob(OrderedDict): """A simple (ordered) dict with a fancy name. This should hold the data.""" def __init__(self, *args, **kwargs): OrderedDict.__init__(self, *args, **kwargs) self.log = get_logger("Blob") def __str__(self): if not self: return "Empty blob" padding = max(len(k) for k in self.keys()) + 3 output = ["Blob ({} entries):".format(len(self))] for key, value in self.items(): output.append(" '{}'".format(key).ljust(padding) + " => {}".format(repr(value))) return "\n".join(output) def __getitem__(self, key): try: val = OrderedDict.__getitem__(self, key) except KeyError: self.log.error("No key named '%s' found in Blob.\n" "Available keys: %s" % (key, ', '.join(self.keys()))) raise return val
[docs]class Module: """The module which can be attached to the pipeline""" def __init__(self, name=None, **parameters): if name is None: name = self.__class__.__name__ self._name = name self.services = ServiceManager() self.provided_services = {} self.required_services = {} self._parameters = parameters self._processed_parameters = [] self.only_if = set() self.every = 1 if self.__module__ == '__main__': self.logger_name = self.__class__.__name__ else: self.logger_name = self.__module__ + '.' + self.__class__.__name__ if name != self.logger_name: self.logger_name += '.{}'.format(name) self.log = get_logger(self.logger_name) self.log.debug("Initialising %s", name) self.log.debug("The logger is called '%s'", self.logger_name) self.cprint = get_printer(self.logger_name) self.timeit = self.get('timeit') or False self._timeit = { 'process': deque(maxlen=100000), 'process_cpu': deque(maxlen=100000), 'finish': 0, 'finish_cpu': 0 } self.configure() self._check_unused_parameters()
[docs] def configure(self):
"""Configure module, like instance variables etc."""
[docs] def expose(self, obj, name): """Expose an object as a service to the Pipeline""" self.provided_services[name] = obj
[docs] def print(self, *args, **kwargs): self.log.deprecation( "`Module.print` has been deprecated, please use `cprint` instead!") self.cprint(*args, **kwargs)
@property
[docs] def name(self): """The name of the module""" return self._name
@property
[docs] def parameters(self): return self._parameters
@property
[docs] def processed_parameters(self): return self._processed_parameters
[docs] def add(self, name, value): """Add the parameter with the desired value to the dict""" self.parameters[name] = value
[docs] def get(self, name, default=None): """Return the value of the requested parameter or `default` if None.""" value = self.parameters.get(name) self.processed_parameters.append(name) if value is None: return default return value
[docs] def require(self, name): """Return the value of the requested parameter or raise an error.""" value = self.get(name) if value is None: raise TypeError("{0} requires the parameter '{1}'.".format( self.__class__, name)) return value
[docs] def require_service(self, name, why=''): """Add a service requirement with an optional reason""" self.required_services[name] = why
[docs] def prepare(self): """Prepare! Executed between configure and the first process""" return
[docs] def process(self, blob): # pylint: disable=R0201 """Knead the blob and return it""" return blob
[docs] def finish(self): """Clean everything up.""" return
[docs] def pre_finish(self): """Do the last few things before calling finish()""" return self.finish()
[docs] def open_file(self, filename, gzipped=False): """Open the file with filename""" try: if gzipped or filename.endswith('.gz'): return gzip.open(filename, 'rb') else: return open(filename, 'rb') except TypeError: self.log.error("Please specify a valid filename.") raise SystemExit except IOError as error_message: self.log.error(error_message) raise SystemExit
def _check_unused_parameters(self): """Check if any of the parameters passed in are ignored""" all_params = set(self.parameters.keys()) processed_params = set(self.processed_parameters) unused_params = all_params - processed_params - RESERVED_ARGS if unused_params: self.log.warning("The following parameters were ignored: %s", ', '.join(sorted(unused_params))) def __call__(self, *args, **kwargs): """Run process if directly called.""" self.log.info("Calling process") return self.process(*args, **kwargs)
[docs]class Pipeline: """The holy pipeline which holds everything together. If initialised with timeit=True, all modules will be monitored, otherwise only the overall statistics and modules with `timeit=True` will be shown. Parameters ---------- timeit: bool, optional [default=False] Display time profiling statistics for the pipeline? configfile: str, optional [default='pipeline.toml'] Path to a configuration file (TOML format) which contains parameters for attached modules. stats_limit: int, optional [default=100000] The number of cycles to keep track of when using `timeit=True` """ def __init__(self, blob=None, timeit=False, configfile=None, stats_limit=100000): self.log = get_logger(self.__class__.__name__) self.cprint = get_printer(self.__class__.__name__) self.provenance = Provenance() self._activity_uuid = self.provenance.start_activity("pipeline") if configfile is None and os.path.exists(MODULE_CONFIGURATION): configfile = MODULE_CONFIGURATION self.load_configuration(configfile) self.init_timer = Timer("Pipeline and module initialisation") self.init_timer.start() self.modules = [] self.services = ServiceManager() self.required_services = {} self.blob = blob or Blob() self.timeit = timeit self._timeit = { 'init': timer(), 'init_cpu': time.process_time(), 'cycles': deque(maxlen=stats_limit), 'cycles_cpu': deque(maxlen=stats_limit) } self._cycle_count = 0 self._stop = False self._finished = False self.was_interrupted = False
[docs] def load_configuration(self, configfile): if configfile is not None: self.cprint( "Reading module configuration from '{}'".format(configfile)) self.log.warning( "Keep in mind that the module configuration file has " "precedence over keyword arguments in the attach method!") with open(configfile, 'r') as fobj: config = toml.load(fobj) variables = config.pop('VARIABLES', None) if variables is not None: for _, entries in config.items(): for key, value in entries.items(): print(key, value) if value in variables: entries[key] = variables[value] else: config = {} self.module_configuration = config
[docs] def attach(self, module_factory, name=None, **kwargs): """Attach a module to the pipeline system""" fac = module_factory if name is None: name = fac.__name__ self.log.info("Attaching module '{0}'".format(name)) if (inspect.isclass(fac) and issubclass(fac, Module)) or \ name == 'GenericPump': self.log.debug("Attaching as regular module") if name in self.module_configuration: self.log.debug( "Applying pipeline configuration file for module '%s'" % name) for key, value in self.module_configuration[name].items(): if key in kwargs: self.log.info( "Overwriting parameter '%s' in module '%s' from " "the pipeline configuration file." % (key, name)) kwargs[key] = value module = fac(name=name, **kwargs) if hasattr(module, "provided_services"): for service_name, obj in module.provided_services.items(): self.services.register(service_name, obj) if hasattr(module, "required_services"): updated_required_services = {} updated_required_services.update(self.required_services) updated_required_services.update(module.required_services) self.required_services = updated_required_services module.services = self.services else: if isinstance(fac, types.FunctionType): self.log.debug("Attaching as function module") else: self.log.critical("Don't know how to attach module '{0}'!\n" "But I'll do my best".format(name)) module = fac module.name = name module.timeit = self.timeit # Special parameters if 'only_if' in kwargs: required_keys = kwargs['only_if'] if isinstance(required_keys, str): required_keys = [required_keys] module.only_if = set(required_keys) else: module.only_if = set() if 'blob_keys' in kwargs: module.blob_keys = kwargs['blob_keys'] else: module.blob_keys = None if 'every' in kwargs: module.every = kwargs['every'] else: module.every = 1 self._timeit[module] = { 'process': deque(maxlen=100000), 'process_cpu': deque(maxlen=100000), 'finish': 0, 'finish_cpu': 0 } self.modules.append(module)
def _drain(self, cycles=None): """Activate the pump and let the flow go. This will call the process() method on each attached module until a StopIteration is raised, usually by a pump when it reached the EOF. A StopIteration is also raised when self.cycles was set and the number of cycles has reached that limit. """ self.log.info("Now draining...") if not cycles: self.log.info( "No cycle count, the pipeline may be drained forever.") try: while not self._stop: cycle_start = timer() cycle_start_cpu = time.process_time() self.log.debug("Pumping blob #%d", self._cycle_count) self.blob = Blob() for module in self.modules: if self.blob is None: self.log.debug("Skipping %s, due to empty blob.", module.name) continue if module.only_if and not module.only_if.issubset( set(self.blob.keys())): self.log.debug( "Skipping %s, due to missing required key" "'%s'.", module.name, module.only_if) continue if (self._cycle_count + 1) % module.every != 0: self.log.debug("Skipping %s (every %s iterations).", module.name, module.every) continue if module.blob_keys is not None: blob_to_send = Blob({ k: self.blob[k] for k in module.blob_keys if k in self.blob }) else: blob_to_send = self.blob self.log.debug("Processing %s", module.name) start = timer() start_cpu = time.process_time() new_blob = module(blob_to_send) if self.timeit or module.timeit: self._timeit[module]['process'] \ .append(timer() - start) self._timeit[module]['process_cpu'] \ .append(time.process_time() - start_cpu) if module.blob_keys is not None: if new_blob is not None: for key in new_blob.keys(): self.blob[key] = new_blob[key] else: self.blob = new_blob self._timeit['cycles'].append(timer() - cycle_start) self._timeit['cycles_cpu'].append(time.process_time() - cycle_start_cpu) self._cycle_count += 1 if cycles and self._cycle_count >= cycles: raise StopIteration except StopIteration: self.log.info("Nothing left to pump through.") return self.finish() def _check_service_requirements(self): """Final comparison of provided and required modules""" missing = self.services.get_missing_services( self.required_services.keys()) if missing: self.log.critical( "Following services are required and missing: %s", ', '.join(missing)) return False return True
[docs] def drain(self, cycles=None): """Execute _drain while trapping KeyboardInterrupt""" if self._finished: self.log.error("The pipeline has already been drained...") return self.provenance.current_activity.record_configuration({"planned_cycles": cycles}) module_parameters = [] for module in self.modules: try: parameters = module.parameters except AttributeError: parameters = None module_parameters.append(dict( name=module.name, parameters=parameters )) self.provenance.current_activity.record_configuration({"modules": module_parameters}) if not self._check_service_requirements(): self.init_timer.stop() self.provenance.finish_activity(self._activity_uuid, "error") return self.finish() self.log.info("Preparing modules to process") for module in self.modules: if hasattr(module, 'prepare'): self.log.info("Preparing %s" % module.name) module.prepare() self.init_timer.stop() self.log.info("Trapping CTRL+C and starting to drain.") signal.signal(signal.SIGINT, self._handle_ctrl_c) with ignored(KeyboardInterrupt): results = self._drain(cycles) self.provenance.current_activity.record_configuration({"cycles": self._cycle_count}) self.provenance.finish_activity(self._activity_uuid) return results
[docs] def finish(self): """Call finish() on each attached module""" finish_blob = Blob() for module in self.modules: if hasattr(module, 'pre_finish'): self.log.info("Finishing %s" % module.name) start_time = timer() start_time_cpu = time.process_time() finish_blob[module.name] = module.pre_finish() self._timeit[module]['finish'] = timer() - start_time self._timeit[module]['finish_cpu'] = \ time.process_time() - start_time_cpu else: self.log.info("Skipping function module %s" % module.name) self._timeit['finish'] = timer() self._timeit['finish_cpu'] = time.process_time() self._print_timeit_statistics() self._finished = True return finish_blob
def _handle_ctrl_c(self, *_): """Handle the keyboard interrupts.""" if self._stop: print("\nForced shutdown...") self.provenance.current_activity.record_configuration({"forced_shutdown": True}) raise SystemExit if not self._stop: hline = 42 * '=' print('\n' + hline + "\nGot CTRL+C, waiting for current cycle...\n" "Press CTRL+C again if you're in hurry!\n" + hline) self.was_interrupted = True self._stop = True self.provenance.current_activity.record_configuration({"interrupted": True}) def _print_timeit_statistics(self): if self._cycle_count < 1: return def calc_stats(values): """Return a tuple of statistical values""" return [f(values) for f in (np.mean, np.median, min, max, np.std)] def timef(seconds): """Return a string of formatted time value for given seconds""" elapsed_time = seconds if elapsed_time > 180: elapsed_time /= 60 unit = 'min' else: unit = 's' return "{0:.6f}{1}".format(elapsed_time, unit) def statsf(prefix, values): stats = " mean: {0} medi: {1} min: {2} max: {3} std: {4}" values = [timef(v) for v in values] return " " + prefix + stats.format(*values) cycles = self._timeit['cycles'] n_cycles = len(cycles) cycles_cpu = self._timeit['cycles_cpu'] overall = self._timeit['finish'] - self._timeit['init'] overall_cpu = self._timeit['finish_cpu'] - self._timeit['init_cpu'] memory = peak_memory_usage() print(60 * '=') print("{0} cycles drained in {1} (CPU {2}). Memory peak: {3:.2f} MB". format(self._cycle_count, timef(overall), timef(overall_cpu), memory)) if self._cycle_count > n_cycles: print("Statistics are based on the last {0} cycles.".format( n_cycles)) if cycles: print(statsf('wall', calc_stats(cycles))) if cycles_cpu: print(statsf('CPU ', calc_stats(cycles_cpu))) for module in self.modules: if not module.timeit and not self.timeit: continue finish_time = self._timeit[module]['finish'] finish_time_cpu = self._timeit[module]['finish_cpu'] process_times = self._timeit[module]['process'] process_times_cpu = self._timeit[module]['process_cpu'] print(module.name + " - process: {0:.3f}s (CPU {1:.3f}s)" " - finish: {2:.3f}s (CPU {3:.3f}s)".format( sum(process_times), sum(process_times_cpu), finish_time, finish_time_cpu)) if process_times: print(statsf('wall', calc_stats(process_times))) if process_times_cpu: print(statsf('CPU ', calc_stats(process_times_cpu)))
[docs]class ServiceManager: """ Takes care of pipeline services. """ def __init__(self): self._services = {} self.log = get_logger(self.__class__.__name__)
[docs] def register(self, name, service): """ Service registration Args: name: Name of the provided service service: Reference to the service """ self._services[name] = service
[docs] def get_missing_services(self, services): """ Check if all required services are provided Args: services: List the service names which are required Returns: List with missing services """ required_services = set(services) provided_services = set(self._services.keys()) missing_services = required_services.difference(provided_services) return sorted(missing_services)
def __getitem__(self, name): return self._services[name] def __getattr__(self, name): return self._service[name] def __contains__(self, name): return name in self._services