Source code for shapepipe.modules.spread_model_package.spread_model

"""SPREAD MODEL.

Class to compute the spread model, criterion to select galaxies

:Author: Axel Guinot

"""

import galsim
import numpy as np
from sqlitedict import SqliteDict

from shapepipe.pipeline import file_io
from shapepipe.utilities import galaxy


[docs]def get_sm(obj_vign, psf_vign, model_vign, weight_vign): """Get Spread Model. This method compute the spread moel for an object. Parameters ---------- obj_vign : numpy.ndarray Vignet of the object psf_vign : numpy.ndarray Vignet of the gaussian model of the PSF model_vign : numpy.ndarray Vignet of the galaxy model weight_vign : numpy.ndarray Vignet of the weight at the object position Returns ------- tuple Spread model and corresponding error values """ # Mask invalid pixels m = (obj_vign > -1e29) & (weight_vign > 0) w = m.astype(float) # Set noise as inverse weight noise_v = (1 / weight_vign).ravel() # Remove infinite noise pixels noise_v[np.isinf(noise_v)] = 0 # Transform 2D vignets to 1D vectors t_v = model_vign.ravel() g_v = obj_vign.ravel() psf_v = psf_vign.ravel() w_v = w.ravel() # Compute scalar products used in spread model tg = np.sum(t_v * w_v * g_v) pg = np.sum(psf_v * w_v * g_v) tp = np.sum(t_v * w_v * psf_v) pp = np.sum(psf_v * w_v * psf_v) tnt = np.sum(t_v * noise_v * t_v * w_v) pnp = np.sum(psf_v * noise_v * psf_v * w_v) tnp = np.sum(t_v * noise_v * psf_v * w_v) err = tnt * pg ** 2 + pnp * tg ** 2 - 2 * tnp * pg * tg # Compute spread model if pg > 0: sm = (tg / pg) - (tp / pp) else: sm = 1 if (pg > 0) & (err > 0): sm_err = np.sqrt(err) / pg**2 else: sm_err = 1 return sm, sm_err
[docs]def get_model(sigma, flux, img_shape, pixel_scale=0.186): """Get Model. This method computes - an exponential galaxy model with scale radius = 1/16 FWHM - a Gaussian model for the PSF Parameters ---------- sigma : float Sigma of the PSF (in pixel units) flux : float Flux of the galaxy for the model img_shape : list Size of the output vignet ``[xsize, ysize]`` pixel_scale : float, optional Pixel scale to use for the model (in arcsec); default is ``0.186`` Returns ------- tuple Vignet of the galaxy model and of the PSF model """ # Get scale radius scale_radius = ( 1 / 16 * galaxy.sigma_to_fwhm(sigma, pixel_scale=pixel_scale) ) # Get galaxy model gal_obj = galsim.Exponential(scale_radius=scale_radius, flux=flux) # Get PSF psf_obj = galsim.Gaussian(sigma=sigma * pixel_scale) # Convolve both gal_obj = galsim.Convolve(gal_obj, psf_obj) # Draw galaxy and PSF on vignets gal_vign = gal_obj.drawImage( nx=img_shape[0], ny=img_shape[1], scale=pixel_scale ).array psf_vign = psf_obj.drawImage( nx=img_shape[0], ny=img_shape[1], scale=pixel_scale ).array return gal_vign, psf_vign
[docs]class SpreadModel(object): """The Spread Model Class. Parameters ---------- sex_cat_path : str Path to SExtractor catalogue psf_cat_path : str Path to PSF catalogue weight_cat_path : str Path to weight catalogue output_path : str Output file path of pasted catalog pixel_scale : float Pixel scale in arcsec output_mode : str Options are ``new`` or ``add`` Notes ----- For the ``output_mode``: - ``new`` will create a new catalogue with ``[number, mag, sm, sm_err]`` - ``add`` will output a copy of the input SExtractor with the columns ``sm`` and ``sm_err`` """ def __init__( self, sex_cat_path, psf_cat_path, weight_cat_path, output_path, pixel_scale, output_mode ): self._sex_cat_path = sex_cat_path self._psf_cat_path = psf_cat_path self._weight_cat_path = weight_cat_path self._output_path = output_path self._pixel_scale = pixel_scale self._output_mode = output_mode
[docs] def process(self): """Process. Process the spread model computation """ # Get data sex_cat = file_io.FITSCatalogue(self._sex_cat_path, SEx_catalogue=True) sex_cat.open() obj_id = np.copy(sex_cat.get_data()['NUMBER']) obj_vign = np.copy(sex_cat.get_data()['VIGNET']) obj_mag = None if self._output_mode == 'new': obj_mag = np.copy(sex_cat.get_data()['MAG_AUTO']) sex_cat.close() psf_cat = SqliteDict(self._psf_cat_path) weight_cat = file_io.FITSCatalogue( self._weight_cat_path, SEx_catalogue=True, ) weight_cat.open() weigh_vign = weight_cat.get_data()['VIGNET'] weight_cat.close() # Get spread model skip_obj = False spread_model_final = [] spread_model_err_final = [] for idx, id_tmp in enumerate(obj_id): sigma_list = [] if psf_cat[str(id_tmp)] == 'empty': spread_model_final.append(-1) spread_model_err_final.append(1) continue psf_expccd_name = list(psf_cat[str(id_tmp)].keys()) for expccd_name_tmp in psf_expccd_name: psf_cat_id_ccd = psf_cat[str(id_tmp)][expccd_name_tmp] sigma_list.append( psf_cat_id_ccd['SHAPES']['SIGMA_PSF_HSM'] ) obj_vign_tmp = obj_vign[idx] obj_flux_tmp = 1. obj_sigma_tmp = np.mean(sigma_list) obj_weight_tmp = weigh_vign[idx] obj_model_tmp, obj_psf_tmp = get_model( obj_sigma_tmp, obj_flux_tmp, obj_vign_tmp.shape, self._pixel_scale ) obj_sm, obj_sm_err = get_sm( obj_vign_tmp, obj_psf_tmp, obj_model_tmp, obj_weight_tmp ) spread_model_final.append(obj_sm) spread_model_err_final.append(obj_sm_err) spread_model_final = np.array(spread_model_final, dtype='float64') spread_model_err_final = np.array( spread_model_err_final, dtype='float64', ) psf_cat.close() self.save_results( spread_model_final, spread_model_err_final, obj_mag, obj_id )
[docs] def save_results(self, sm, sm_err, mag, number): """Save Results. Save output catalogue with spread model and errors. Parameters ---------- sm : numpy.ndarray Value of the spread model for all objects sm_err : numpy.ndarray Value of the spread model error for all objects mag : numpy.ndarray Magnitude of all objects (only for a new catalogue) number : numpy.ndarray ID of all objects (only for a new catalogue) Raises ------ ValueError For incorrect output mode """ if self._output_mode == 'new': new_cat = file_io.FITSCatalogue( self._output_path, SEx_catalogue=True, open_mode=file_io.BaseCatalogue.OpenMode.ReadWrite ) dict_data = { 'NUMBER': number, 'MAG': mag, 'SPREAD_MODEL': sm, 'SPREADERR_MODEL': sm_err } new_cat.save_as_fits( data=dict_data, sex_cat_path=self._sex_cat_path ) elif self._output_mode == 'add': ori_cat = file_io.FITSCatalogue( self._sex_cat_path, SEx_catalogue=True, ) ori_cat.open() new_cat = file_io.FITSCatalogue( self._output_path, SEx_catalogue=True, open_mode=file_io.BaseCatalogue.OpenMode.ReadWrite ) ori_cat.add_col( 'SPREAD_MODEL', sm, new_cat=True, new_cat_inst=new_cat ) ori_cat.close() new_cat.open() new_cat.add_col('SPREADERR_MODEL', sm_err) new_cat.close() else: raise ValueError('Mode must be in [new, add].')