Source code for shapepipe.modules.merge_sep_cats_package.merge_sep_cats

"""MERGE SCRIPT.

Class to merge separate catalogues.

:Author: Martin Kilbinger <martin.kilbinger@cea.fr>

"""

import os
import re
import warnings

import numpy as np
from astropy.io import fits

from shapepipe.pipeline import file_io


[docs]class MergeSep(object): """Merge Sep. Merge separate catalogues. Parameters ---------- input_file_list : list Input file paths file_number_string : str File number following ShapePipe numbering scheme file_pattern : str File base name file_ext : str File extension output_dir : str Output directory n_split_max : int Number of separate input catalogues warning : str Action when warning occurs; options are ``error`` or ``warning`` w_log : logging.Logger Logging instance """ def __init__( self, input_file_list, file_number_string, file_pattern, file_ext, output_dir, n_split_max, warning, w_log ): self._input_file_list = input_file_list self._file_number_string = file_number_string self._file_pattern = file_pattern self._file_ext = file_ext self._output_dir = output_dir self._n_split_max = n_split_max self._warning = warning self._w_log = w_log
[docs] def process(self): """Process. Process merging of separate catalogues. """ # Set warning action warnings.simplefilter(self._warning, UserWarning) # Loop over input files = outputs from different modules for idx, input_file in enumerate(self._input_file_list): # Get all input directories input_path_n = [] input_path_n.append(input_file) for n in range(2, self._n_split_max + 1): res = re.sub('1', str(n), input_file, 1) input_path_n.append(res) # Open first catalogue, read number of extensions and columns cat0 = file_io.FITSCatalogue(input_file, SEx_catalogue=True) cat0.open() list_ext_name = cat0.get_ext_name() list_col_name = cat0.get_col_names() cat0.close() # Create empty dictionary # data dimension = n_extension x n_column x n_obj data = {} for hdu_ind, ext_name in enumerate(list_ext_name): if ext_name == 'PRIMARY': continue data[ext_name] = {} for col_name in list_col_name: data[ext_name][col_name] = [] # Read and append all data, including first catalogue for n in range(self._n_split_max): cat_path = input_path_n[n] if os.path.exists(cat_path): cat = file_io.FITSCatalogue(cat_path, SEx_catalogue=True) cat.open() for hdu_ind, ext_name in enumerate(list_ext_name): if ext_name == 'PRIMARY': continue for col_name in list_col_name: data[ext_name][col_name] += list( cat.get_data(hdu_ind)[col_name] ) cat.close() else: msg = f'Input catalogue \'{cat_path}\' not found' warnings.warn(msg) wmsg = f'Warning: {msg}' self._w_log.info(wmsg) print(wmsg) # Save combined catalogue output_name = ( f'{self._output_dir}/{self._file_pattern[idx]}' + f'{self._file_number_string}{self._file_ext[idx]}' ) output = file_io.FITSCatalogue( output_name, open_mode=file_io.BaseCatalogue.OpenMode.ReadWrite ) for hdu_ind, ext_name in enumerate(list_ext_name): if ext_name == 'PRIMARY': continue output.save_as_fits( data[ext_name], names=list_col_name, ext_name=ext_name ) return None, None