Source code for shapepipe.pipeline.worker_handler


This module defines a class for handling pipeline wokers.

:Author: Samuel Farrens <>


import platform
from os import getpid
from threading import active_count

from modopt.interface.errors import catch_error, warn
from modopt.interface.log import close_log, set_up_log

from shapepipe.pipeline.timeout import with_timeout

[docs]class WorkerHandler(object): """Worker Handler. This class defines the worker to process a given job. """ def __init__(self, verbose=True): self.worker_dict = {} self._stdout = None self._stderr = None self._verbose = verbose
[docs] def worker( self, process, job_name, w_log_name, run_dirs, config, module_config_sec, timeout, module_runner ): """Worker. This method defines a worker. Parameters ---------- process : numpy.ndarray File(s) to be processed w_log_name : str Worker log name module_runner : function Module runner run_dirs : dict Run directories config : CustomParser Configuaration parser instance module_config_sec : str Configuration file section name timeout : int Timeout limit in seconds Returns ------- dict Worker dictionary """ self._w_log_name = w_log_name self._run_dirs = run_dirs self._config = config self._module_config_sec = module_config_sec self._module_runner = module_runner self._prepare_worker(process, job_name, timeout, module_runner.__name__) self._create_worker_log() self._run_worker() close_log(self.w_log, verbose=False) return self.worker_dict
[docs] @staticmethod def _set_job_name(num): """Set Job Name. This method creates a job name for a given process number. Parameters ---------- num : int Process number Returns ------- str Job name """ return f'process{num}'
[docs] def _prepare_worker(self, process, job_name, timeout, module): """Prepare Worker. This method defines a worker instance dictionary. Parameters ---------- Process : str File to be processed config : CustomParser Configuaration parser instance timeout : int Timeout limit in seconds module : str Module runner name """ self.worker_dict['pid'] = getpid() self.worker_dict['threads'] = active_count() self.worker_dict['node'] = platform.node() self.worker_dict['system'] = platform.system() self.worker_dict['machine'] = platform.machine() self.worker_dict['exception'] = False self.worker_dict['stderr'] = False self.worker_dict['process'] = list(process) self.worker_dict['file_number_string'] = job_name self.worker_dict['job_name'] = self._set_job_name(job_name) self.worker_dict['timeout'] = timeout self.worker_dict['module'] = module
[docs] def _create_worker_log(self): """Create Worker Log. This method prepares a logging instance for the worker and logs the worker parameters. """ process_size = len(str(self.worker_dict['process'])) if self._verbose: job_name = self.worker_dict['job_name'] pid = self.worker_dict['pid'] print(f' - {job_name} PID: {pid} ', end='') if ( process_size < self._config.getint('WORKER', 'PROCESS_PRINT_LIMIT') ): print( f'processing {self.worker_dict["file_number_string"]} ' + f'{self.worker_dict["process"]}' ) else: print() self.w_log = set_up_log(self._w_log_name, verbose=False) self.worker_dict['log'] ='Worker process running with:')' - Job Name: {self.worker_dict["job_name"]}')' - PID: {self.worker_dict["pid"]}')' - Threads: {self.worker_dict["threads"]}')' - Node: {self.worker_dict["node"]}')' - System: {self.worker_dict["system"]}')' - Machine: {self.worker_dict["machine"]}')' - Timeout Limit: {self.worker_dict["timeout"]}')' - Process: {self.worker_dict["process"]}')
[docs] def _run_worker(self): """Run Worker. This method runs the worker with a given timeout limit and catches the corresponding errors. """ try: with_timeout(self.worker_dict['timeout'], self._worker_execution )() except Exception as err: catch_error(err, self.w_log) self.worker_dict['exception'] = type(err).__name__
[docs] def _worker_execution(self): """Worker Execution. This method executes a worker job and logs the results. """ self._run_module() self._log_stdout()
[docs] def _run_module(self): """Run Module. This method runs a module script. Raises ------ RuntimeError For non-existent module runner """ f' - Running module: {self.worker_dict["module"]}' ) file_number_string = self.worker_dict['file_number_string'] input_file_list = self.worker_dict['process'] self._stdout, self._stderr = self._module_runner( input_file_list, self._run_dirs, file_number_string, self._config, self._module_config_sec, self.w_log, )
[docs] def _log_stdout(self): """Log STDOUT. This method logs the stdout and stderr output of the job. """ f'Process produced the following output: {self._stdout}' ) if self._stderr: f'Process produced the following error(s): {self._stderr}' ) self.worker_dict['stderr'] = True