Source code for shapepipe.modules.sextractor_package.sextractor_script


This module builds the SExtractor command line.

:Author: Axel Guinot, Martin Kilbinger


import re

import numpy as np
from import fits
from sqlitedict import SqliteDict

from shapepipe.pipeline import file_io

[docs]def get_header_value(image_path, key): """Get Header Value. This function reads a value from the header image. Parameters ---------- image_path: str Path to the input image key: str Key from which the value is requested (has to be a float) Returns ------- float Value associated to the key provided """ h = fits.getheader(image_path) val = h[key] try: val = float(val) except Exception: raise ValueError( f'The key {key} does not return a float value. Got {val}' ) return val
[docs]def make_post_process(cat_path, f_wcs_path, pos_params, ccd_size): """Make Post Processing. This function will add one HDU for each epoch to the SExtractor catalogue. Note that this only works for tiles. The columns will be: - ``NUMBER``: same as SExtractor NUMBER - ``EXP_NAME``: name of the single exposure for this epoch - ``CCD_N``: extension where the object was detected Parameters ---------- cat_path: str Path to the outputed SExtractor catalog f_wcs_path: str Path to the log file containing WCS for all single exp CCDs pos_params: list World coordinates to use to match the objects. ccd_size: list Size of a CCD ``[nx, ny]`` Raises ------ IOError If SQL file not found """ cat = file_io.FITSCatalogue( cat_path, SEx_catalogue=True, open_mode=file_io.BaseCatalogue.OpenMode.ReadWrite, ) f_wcs = SqliteDict(f_wcs_path) key_list = list(f_wcs.keys()) if len(key_list) == 0: raise IOError(f'Could not read sql file \'{f_wcs_path}\'') n_hdu = len(f_wcs[key_list[0]]) history = [] for idx in cat.get_data(1)[0][0]: if re.split('HISTORY', idx)[0] == '': history.append(idx) exp_list = [] pattern = r'([0-9]*)p\.(.*)' for hist in history: m =, hist) exp_list.append( obj_id = np.copy(cat.get_data()['NUMBER']) n_epoch = np.zeros(len(obj_id), dtype='int32') for idx, exp in enumerate(exp_list): pos_tmp = np.ones(len(obj_id), dtype='int32') * -1 for idx_j in range(n_hdu): w = f_wcs[exp][idx_j]['WCS'] pix_tmp = w.all_world2pix( cat.get_data()[pos_params[0]], cat.get_data()[pos_params[1]], 0 ) ind = ( (pix_tmp[0] > int(ccd_size[0])) & (pix_tmp[0] < int(ccd_size[1])) & (pix_tmp[1] > int(ccd_size[2])) & (pix_tmp[1] < int(ccd_size[3])) ) pos_tmp[ind] = idx_j n_epoch[ind] += 1 exp_name = np.array([exp_list[idx] for n in range(len(obj_id))]) a = np.array( [ (obj_id[ii], exp_name[ii], pos_tmp[ii]) for ii in range(len(exp_name)) ], dtype=[ ('NUMBER', obj_id.dtype), ('EXP_NAME', exp_name.dtype), ('CCD_N', pos_tmp.dtype) ] ) cat.save_as_fits(data=a, ext_name=f'EPOCH_{idx}') f_wcs.close() cat.add_col('N_EPOCH', n_epoch) cat.close()
[docs]class SExtractorCaller(): """The SExtractor Caller. This class constructs the command line to call SExtractor based on the input files and parameters. Parameters ---------- path_input_files: list List with all the paths for the input files path_output_dir: str Path for the output directory number_string: str Pipeline internal file numbering path_dot_sex: str Path to the ``.sex`` config file path_dot_param: str Path to the ``.param`` config file path_dot_conv: str Path to the ``.conv`` kernel file use_weight: bool Specify if a weight is profided for the measurement use_flag: bool Specify if a flag is provided for the measurement use_psf: bool Specify if a psf is provided for the model use_detection_image: bool Specify if a detection image is provided use_detection_weight: bool Specify if a detection weight is provided use_zero_point: bool Specify whether or not to use a zero point from the input image use_backgroud: bool Specify whether or not to use a background value form the input image zero_point_key: str, optional Header key corresponding to the zero point background_key: str, optional Header key corresponding to the background value check_image: str, optional If provided, add SExtractor check image to the output output_prefix: str, optional If provided, add a prefix to the output file names """ def __init__( self, path_input_files, path_output_dir, number_string, path_dot_sex, path_dot_param, path_dot_conv, use_weight, use_flag, use_psf, use_detection_image, use_detection_weight, use_zero_point, use_background, zero_point_key=None, background_key=None, check_image=None, output_prefix=None, ): self.cmd_line = '' self._cmd_line_extra = '' self._meas_img_path = path_input_files[0] self._all_input_path = path_input_files self._path_output_dir = path_output_dir self._num_str = number_string self.path_output_file = self.get_output_name(output_prefix) self._path_dot_sex = path_dot_sex self._path_dot_param = path_dot_param self._path_dot_conv = path_dot_conv self.set_input_files(use_weight, use_flag, use_psf, use_detection_image, use_detection_weight) # Collect optional arguments for SExtractor self.get_zero_point(use_zero_point, zero_point_key) self.get_background(use_background, background_key) self.get_check_image(check_image)
[docs] def get_output_name(self, output_prefix=None): """Get Output Names. Construct the output file path. Parameters ---------- output_prefix: str, optional Prefix to add to the output name Returns ------- str Full path of the output file """ if isinstance(output_prefix, type(None)): self.prefix = '' else: if (output_prefix.lower() is not None) & (output_prefix != ''): self.prefix = output_prefix + '_' else: self.prefix = '' output_file_name = self.prefix + f'sexcat{self._num_str}.fits' output_file_path = f'{self._path_output_dir}/{output_file_name}' return output_file_path
[docs] def set_input_files( self, use_weight, use_flag, use_psf, use_detect_img, use_detect_weight, ): """Set Input Files. Set up all of the input image files. Parameters ---------- use_weight: bool Specify if a weight is profided for the measurement use_flag: bool Specify if a flag is provided for the measurement use_psf: bool Specify if a psf is provided for the model use_detect_img: bool Specify if a detection image is provided use_detect_weight: bool Specify if a detection weight is provided Raises ------ ValueError For invalid detection weight """ extra = 1 if use_weight: weight_image = self._all_input_path[extra] extra += 1 if use_flag: self._cmd_line_extra += ( ' -FLAG_IMAGE ' + f'{self._all_input_path[extra]}' ) extra += 1 if use_psf: self._cmd_line_extra += ( ' -PSF_NAME ' + f'{self._all_input_path[extra]}' ) extra += 1 # Check for separate files for detection and measurement # First, consistency checks if use_detect_weight and not use_detect_img: raise ValueError( 'DETECTION_WEIGHT cannot be True ' + 'if DETECTION_IMAGE is False' ) if use_detect_weight and not use_weight: raise ValueError( 'DETECTION_WEIGHT cannot be True ' + 'if WEIGHT_FILE is False' ) # Check for separate image file for detection and measurement if use_detect_img: self._detect_img_path = self._all_input_path[extra] extra += 1 else: self._detect_img_path = self._meas_img_path # Check for separate weight file corresponding to the detection image. # If False, use measurement weight image. # Note: This could be changed, and no weight image could be used, but # this might lead to more user errors. if use_weight: if use_detect_weight: detect_weight_path = self._all_input_path[extra] extra += 1 else: detect_weight_path = weight_image self._cmd_line_extra += ( f' -WEIGHT_IMAGE {detect_weight_path}' + f',{weight_image}' ) else: self._cmd_line_extra += ' -WEIGHT_TYPE None' if extra != len(self._all_input_path): raise ValueError( 'Incoherence between input file number and keys ' + f'related to extra files: 1 regular + {extra-1} extra ' + 'files not compatible with total file list ' + f'length of {len(self._all_input_path)}' )
[docs] def get_zero_point(self, use_zp, zp_key=None): """Get Zero Point. Use a zero point from an input image header. Parameters ---------- use_zp: bool If ``True``, add the zero point to the command line zp_key: str Header key corresponding to the zero point """ if use_zp and not isinstance(zp_key, type(None)): zp_value = get_header_value(self._meas_img_path, zp_key) self._cmd_line_extra += f' -MAG_ZEROPOINT {zp_value}'
[docs] def get_background(self, use_bkg, bkg_key=None): """Get Background. Use a background value from an input image header. Parameters ---------- use_bkg: bool If ``True``, add the background value to the command line bkg_key: str Header key corresponding to the background value """ if use_bkg and not isinstance(bkg_key, type(None)): bkg_value = get_header_value(self._meas_img_path, bkg_key) self._cmd_line_extra += ( f' -BACK_TYPE MANUAL -BACK_VALUE {bkg_value}' )
[docs] def get_check_image(self, check_image): """Get Check Image. Handle the check images if any are requested. Parameters ---------- check_image: list List of SExtractor keys corresponding to check images """ if (len(check_image) == 1) & (check_image[0] == ''): check_type = ['NONE'] check_name = ['none'] else: check_type = [] check_name = [] for key in check_image: check_type.append(key.upper()) check_name.append( self._path_output_dir + '/' + self.prefix + key.lower() + self._num_str + '.fits' ) self._cmd_line_extra += ( f' -CHECKIMAGE_TYPE {",".join(check_type)} ' + f'-CHECKIMAGE_NAME {",".join(check_name)}' )
[docs] def make_command_line(self, exec_path): """Make Command Line. This method constructs the command line to run SExtractor. Parameters ---------- exec_path: str Path to SExtractor executable Returns ------- str Full command line to call SExtractor """ # Base arguments for SExtractor command_line_base = ( f'{exec_path} {self._detect_img_path},{self._meas_img_path} ' + f'-c {self._path_dot_sex} ' + f'-PARAMETERS_NAME {self._path_dot_param} ' + f'-FILTER_NAME {self._path_dot_conv} ' + f'-CATALOG_NAME {self.path_output_file}' ) command_line = f'{command_line_base} {self._cmd_line_extra}' return command_line
[docs] @staticmethod def parse_errors(stderr, stdout): """Parse Errors. This methoid moves errors from the standard output of SExtractor to the standard error. Parameters ---------- stderr: str String containing the standard error contents stdout: str String containing the standard output content Returns ------- tuple Updated standard output and error """ check_error = re.findall('error', stdout.lower()) check_error2 = re.findall('all done', stdout.lower()) if check_error == []: stderr2 = '' else: stderr2 = stdout if check_error2 == []: stderr2 = stdout return stdout, stderr2