Source code for shapepipe.run

"""SHAPEPIPE RUN.

This module sets up a given run of the shape measurement pipeline.

:Author: Samuel Farrens <samuel.farrens@cea.fr>

"""

import sys
from datetime import datetime

from joblib import cpu_count
from modopt.interface.errors import catch_error
from modopt.interface.log import close_log, set_up_log

from shapepipe.info import __installs__, line, shapepipe_logo
from shapepipe.pipeline.args import create_arg_parser
from shapepipe.pipeline.config import create_config_parser
from shapepipe.pipeline.dependency_handler import DependencyHandler
from shapepipe.pipeline.file_handler import FileHandler
from shapepipe.pipeline.job_handler import JobHandler
from shapepipe.pipeline.mpi_run import split_mpi_jobs, submit_mpi_jobs

try:
    from mpi4py import MPI
except ImportError:  # pragma: no cover
    import_mpi = False
else:
    import_mpi = True


[docs]class ShapePipe(): """ShapePipe. ShapePipe runner class. """ def __init__(self): self.log = None
[docs] def set_up(self): """Set Up. Set up ShapePipe properties. """ self._args = create_arg_parser() self.config = create_config_parser(self._args.config) self._set_run_name() self.modules = self.config.getlist('EXECUTION', 'MODULE') self.mode = self.config.get('EXECUTION', 'MODE').lower() self.verbose = self.config.getboolean('DEFAULT', 'VERBOSE') self.filehd = FileHandler( self._run_name, self.modules, self.config, self.verbose, ) self.error_count = 0 self._prep_run()
[docs] def _set_run_name(self): """Set Run Name. Set the name of the current pipeline run. """ self._run_name = self.config.get('DEFAULT', 'RUN_NAME') if self.config.getboolean('DEFAULT', 'RUN_DATETIME'): self._run_name += datetime.now().strftime('_%Y-%m-%d_%H-%M-%S')
[docs] def _create_pipeline_log(self): """Create Pipeline Log. Create a general logging instance for the pipeline run. """ self.log = set_up_log(self.filehd.log_name, verbose=False) start_text = f'Starting ShapePipe Run: {self._run_name}' self.log.info(shapepipe_logo()) self.log.info(start_text) self.log.info('') if self.verbose: print(shapepipe_logo(colour=True)) print(start_text) print('') # Temporary fix to give file handler access to the log. This should # be improved at some point. self.filehd.log = self.log
[docs] def close_pipeline_log(self): """Close Pipeline Log. Close general logging instance for the pipeline run. Raises ------ RunTimeError if error occurs during pipeline run """ if self.error_count == 1: plur = ' was' else: plur = 's were' final_error_count = ( f'A total of {self.error_count} error{plur} recorded.' ) end_text = 'Finishing ShapePipe Run' self.log.info(final_error_count) self.log.info(end_text) self.log.info(line()) close_log(self.log, verbose=False) if self.verbose: print(final_error_count) print(end_text) print(line()) if self.error_count > 0: raise RuntimeError(final_error_count)
[docs] def _get_module_depends(self, property): """Get Module Dependencies. List the Python packages and executables needed to run the modules. Parameters ---------- property : str Module property to be checked Returns ------- tuple List of python dependencies, list of system executables """ prop_list = [] module_runners = self.filehd.module_runners for module in module_runners.keys(): if self.config.has_option(module.upper(), property.upper()): prop_list += self.config.getlist( module.upper(), property.upper(), ) else: prop_list += getattr(module_runners[module], property) if self.filehd.get_add_module_property(module, property): prop_list += self.filehd.get_add_module_property( module, property, ) return prop_list
[docs] def _check_dependencies(self): """Check Dependencies. Check that all pipeline dependencies have been installed. """ module_dep = self._get_module_depends('depends') + __installs__ module_exe = self._get_module_depends('executes') module_dep += ['mpi4py'] if import_mpi else module_dep dh = DependencyHandler(module_dep, module_exe) dep_text = 'Checking Python Dependencies:' exe_text = 'Checking System Executables:' self.log.info(dep_text) if self.verbose: print(dep_text) for dep in dh.check_dependencies(): self.log.info(dep) if self.verbose: print(dep) self.log.info('') if self.verbose: print('') self.log.info(exe_text) if self.verbose: print(exe_text) for exe in dh.check_executables(): self.log.info(exe) if self.verbose: print(exe) self.log.info('') if self.verbose: print('')
[docs] def _check_module_versions(self): """Check Module Version. Check versions of the modules. """ ver_text = 'Checking Module Versions:' self.log.info(ver_text) if self.verbose: print(ver_text) for module in set(self.modules): module_txt = ( f' - {module} {self.filehd.module_runners[module].version}' ) self.log.info(module_txt) if self.verbose: print(module_txt) self.log.info('') if self.verbose: print('')
[docs] def _check_system_setup(self): """Check System Set Up. Check the set up of the machine on which the pipeline is running. """ setup_text = 'Checking System Set Up:' cpu_info = f' - Number of available CPUs: {cpu_count()}' self.log.info(setup_text) self.log.info(cpu_info) self.log.info('') if self.verbose: print(setup_text) print(cpu_info) print('')
[docs] def _get_module_run_methods(self): """Get Module Run Method. Create a dictionary of modules with corresponding run methods. """ self.run_method = {} for module in self.modules: self.run_method[module] = ( self.filehd.module_runners[module].run_method )
[docs] def _prep_run(self): """Prepare Run. Prepare to run the pipeline. """ # Make output directories for the pipeline run self.filehd.create_global_run_dirs() # Make a log for the pipeline run self._create_pipeline_log() # Check the pipeline dependencies self._check_dependencies() # Check the versions of the modules self._check_module_versions() # Check the system set up self._check_system_setup() # Get run method for each module self._get_module_run_methods()
[docs] def record_mode(self): """Record Mode. Log mode in which ShapePipe is running. """ mode_text = f'Running ShapePipe using {self.mode}' self.log.info(mode_text) self.log.info('') if self.verbose: print(mode_text) print('')
[docs]def run_smp(pipe): """Run SMP. Run ShapePipe using SMP. Parameters ---------- pipe : ShapePipe ShapePipe instance """ # Loop through modules to be run for module in pipe.modules: # Create a job handler for the current module jh = JobHandler( module, filehd=pipe.filehd, config=pipe.config, log=pipe.log, job_type=pipe.run_method[module], verbose=pipe.verbose, ) # Submit jobs jh.submit_jobs() # Update error count pipe.error_count += jh.error_count # Delete job handler del jh # Finish and close the pipeline log pipe.close_pipeline_log()
[docs]def run_mpi(pipe, comm): """Run MPI. Run ShapePipe using MPI. Parameters ---------- pipe : ShapePipe ShapePipe instance comm : MPI.COMM_WORLD MPI common world instance """ # Assign master node master = comm.rank == 0 # Get the module to be run modules = pipe.modules if master else None modules = comm.bcast(modules, root=0) # Get ShapePipe objects if master: config = pipe.config verbose = pipe.verbose else: config = verbose = None config = comm.bcast(config, root=0) verbose = comm.bcast(verbose, root=0) # Loop through modules to be run for module in modules: # Run set up on master if master: # Create a job handler for the current module jh = JobHandler( module, filehd=pipe.filehd, config=config, log=pipe.log, job_type=pipe.run_method[module], parallel_mode='mpi', verbose=verbose, ) # Get job type job_type = jh.job_type # Handle serial jobs if job_type == 'serial': jh.submit_jobs() # Handle parallel jobs else: # Get JobHandler objects timeout = jh.timeout # Get file handler objects run_dirs = jh.filehd.module_run_dirs module_runner = jh.filehd.module_runners[module] worker_log = jh.filehd.get_worker_log_name # Define process list process_list = jh.filehd.process_list # Define job list jobs = split_mpi_jobs(process_list, comm.size) del process_list else: job_type = module_runner = worker_log = timeout = \ jobs = run_dirs = None # Broadcast job type to all nodes job_type = comm.bcast(job_type, root=0) if job_type == 'parallel': # Broadcast objects to all nodes run_dirs = comm.bcast(run_dirs, root=0) module_runner = comm.bcast(module_runner, root=0) worker_log = comm.bcast(worker_log, root=0) timeout = comm.bcast(timeout, root=0) jobs = comm.scatter(jobs, root=0) # Submit the MPI jobs and gather results results = comm.gather( submit_mpi_jobs( jobs, config, timeout, run_dirs, module_runner, worker_log, verbose ), root=0, ) # Delete broadcast objects del module_runner, worker_log, timeout, jobs # Finish up parallel jobs if master: # Assign worker dictionaries jh.worker_dicts = jh.filehd.flatten_list(results) # Finish up job handler session jh.finish_up() # Delete results del results if master: # Update error count pipe.error_count += jh.error_count # Delete job handler del jh # Finish and close the pipeline log pipe.close_pipeline_log() if master else None
[docs]def run(*args): """Run ShapePipe. This function runs ShapePipe. """ try: if import_mpi: comm = MPI.COMM_WORLD master = comm.rank == 0 else: master = True if master: pipe = ShapePipe() pipe.set_up() mode = pipe.mode else: pipe = None mode = None mode = comm.bcast(mode, root=0) if import_mpi else 'smp' if master: pipe.mode = mode pipe.record_mode() if mode == 'mpi': run_mpi(pipe, comm) else: run_smp(pipe) except Exception as err: if master: catch_error(err, log=pipe.log) return 1