"""MERGE STAR CATALOGUES.
This module contains a class to identify single exposures that were used
to create tiles.
:Authors: Martin Kilbinger <martin.kilbinger@cea.fr>, Tobias Liaudat,
Morgan Schmitz, Axel Guinot
"""
import re
import numpy as np
from astropy.io import fits
from shapepipe.pipeline import file_io
[docs]class MergeStarCatMCCD(object):
"""Merge Star Catalogue MCCD.
Merge star catalogues of MCCD PSF model output.
Parameters
----------
input_file_list : list
Input files
output_dir : str
Output directory
w_log : logging.Logger
Logging instance
stamp_size : int, optional
Stamp size, in pixels; default is ``51``
rad : int, optional
Radius for mask, in pixels; default is ``10``
hdu_table : int, optional
HDU number; default is ``1``
"""
def __init__(
self,
input_file_list,
output_dir,
w_log,
stamp_size=51,
rad=10,
hdu_table=1
):
self._input_file_list = input_file_list
self._output_dir = output_dir
self._w_log = w_log
self._stamp_size = stamp_size
self._rad = rad
self._hdu_table = hdu_table
[docs] @staticmethod
def rmse_calc(values, sizes):
r"""Calculate RMSE.
Calculate square root of mean over input values.
If ``values`` is an array with element :math:`j` being
:math:`\sum_j^{N_j}x_{i, j}^2`, where :math:`x_{ij}`
is the residual (ground truth - estimation), and
sizes is the array :math:`N_j`, then
this function computes the RMSE.
Parameters
----------
values : list
Sums of pixel values for a list of input images
sizes : list
Number of pixels for a list of input images
Returns
-------
rmse : float
Root mean square error
See Also
--------
MergeStarCatMCCD.mean_calc
"""
rmse = np.sqrt(MergeStarCatMCCD.mean_calc(values, sizes))
return rmse
[docs] @staticmethod
def rmse_calc_2(values, sizes):
r"""Calculate RMSE 2.
Calculate square root of mean over squared input valuess.
If ``values`` is an array with element :math:`j` being
:math:`\sum_j^{N_j}x_{i, j}`, where :math:`x_{ij}`
is the residual (ground truth - estimation), and
sizes is the array :math:`N_j`, then
this function computes the RMSE.
Parameters
----------
values : list
Sums of pixel values for a list of input images
sizes : list
Number of pixels for a list of input images
Returns
-------
float
Root mean square error
"""
rmse = np.sqrt(
np.nansum(np.array(values) ** 2) / np.nansum(np.array(sizes))
)
return rmse
[docs] @staticmethod
def mean_calc(values, sizes):
"""Calculate Mean.
Calculate pixel mean over all input images.
Parameters
----------
values : list
Sums of pixel values for a list of input images
sizes : list
Number of pixels for a list of input images
Returns
-------
float
Mean
"""
mean = (
np.nansum(np.array(values)) / np.nansum(np.array(sizes))
)
return mean
[docs] @staticmethod
def std_calc(values):
"""Calculate Standard Deviation.
Calculate pixel standard deviation over all input images.
Parameters
----------
values : list
Sums of pixel values for a list of input images
sizes : list
Number of pixels for a list of input images
Returns
-------
float
Standard deviation
"""
std = np.nanstd(np.array(values))
return std
[docs] @staticmethod
def stats_calculator(val_ref, val_model):
"""Calculate Stats.
Calculate RMSE, mean, and standard deviation of residuals.
Parameters
----------
val_ref : list
Reference values
val_model : list
Model values
Returns
-------
tuple
Root mean square error, mean and standard deviation
"""
residual = val_ref - val_model
rmse = np.sqrt(np.mean(residual ** 2))
mean = np.mean(residual)
std_dev = np.std(residual)
return rmse, mean, std_dev
[docs] def process(self):
"""Process.
Process merging.
"""
x, y = [], []
ra, dec = [], []
g1_psf, g2_psf, size_psf = [], [], []
g1, g2, size = [], [], []
flag_psf, flag_star = [], []
ccd_nb = []
pixel_mse = []
pixel_sum = []
masked_pixel_mse = []
masked_pixel_sum = []
size_mse = []
masked_size = []
pix_norm_mse, size_norm_mse = [], []
pix_filt_mse, size_filt_mse = [], []
star_noise_var, star_noise_size = [], []
model_var, model_var_size = [], []
bad_catalogs = 0
# Construction of the mask
shap = np.array([self._stamp_size, self._stamp_size])
stamp_size_half = int(self._stamp_size / 2)
cent = np.array([stamp_size_half, stamp_size_half])
my_mask = np.zeros((self._stamp_size, self._stamp_size), dtype=bool)
idx = np.arange(0, shap[0])
jdx = np.arange(0, shap[1])
inside_circle = np.sqrt(
(idx[np.newaxis, :] - cent[0]) ** 2
+ (jdx[:, np.newaxis] - cent[1]) ** 2
) <= self._rad
my_mask[inside_circle] = True
for name in self._input_file_list:
starcat_j = fits.open(name[0], memmap=False)
try:
stars = np.copy(starcat_j[self._hdu_table].data['VIGNET_LIST'])
except ValueError:
print(f'Error for file {name[0]}, check FITS file integrity')
raise
stars[stars < -1e6] = 0
psfs = np.copy(starcat_j[self._hdu_table].data['PSF_VIGNET_LIST'])
# Pixel mse calculation
pix_val = np.sum((stars - psfs) ** 2)
pix_sum = np.sum((stars - psfs))
masked_diffs = np.array(
[(_star - _psf)[my_mask] for _star, _psf in zip(stars, psfs)]
)
masked_pix_val = np.sum(masked_diffs ** 2)
masked_pix_sum = np.sum(masked_diffs)
# Star noise variance (using masked stars)
star_noise_var_val = np.array(
[np.var(_star[np.invert(my_mask)]) for _star in stars]
)
res_var_val = np.array(
[np.var(_star - _psf) for _star, _psf in zip(stars, psfs)]
)
# Variance of the model
# (Residual variance - Star variance (using masked stars))
model_var_val = res_var_val - star_noise_var_val
model_var_val = model_var_val[model_var_val > 0]
# if pix_val < 1e20:
# Normalised pixel mse calculation
stars_norm_vals = np.sqrt(np.sum(stars ** 2, axis=(1, 2)))
psfs_norm_vals = np.sqrt(np.sum(psfs ** 2, axis=(1, 2)))
# Select non zero stars & psfs
non_zero_elems = np.logical_and(
(psfs_norm_vals != 0),
(stars_norm_vals != 0)
)
# Calculate the filtered mse calculation
pix_filt_val = np.sum(
(stars[non_zero_elems] - psfs[non_zero_elems]) ** 2
)
# Calculate the normalized (& filtered) mse calculation
stars_norm_vals = stars_norm_vals[non_zero_elems].reshape(-1, 1, 1)
psfs_norm_vals = psfs_norm_vals[non_zero_elems].reshape(-1, 1, 1)
pix_norm_val = np.sum(
(
stars[non_zero_elems] / stars_norm_vals
- psfs[non_zero_elems] / psfs_norm_vals
) ** 2
)
# Calculate sizes
filt_size = stars[non_zero_elems].size
regular_size = stars.size
regular_masked_size = stars.shape[0] * (np.sum(my_mask))
# Append the results to the lists
pixel_mse.append(pix_val)
pixel_sum.append(pix_sum)
masked_pixel_mse.append(masked_pix_val)
masked_pixel_sum.append(masked_pix_sum)
size_mse.append(regular_size)
masked_size.append(regular_masked_size)
pix_norm_mse.append(pix_norm_val)
size_norm_mse.append(filt_size)
pix_filt_mse.append(pix_filt_val)
size_filt_mse.append(filt_size)
star_noise_var.append(star_noise_var_val)
star_noise_size.append(star_noise_var_val.size)
model_var.append(model_var_val)
model_var_size.append(model_var_val.size)
# positions
x += list(
starcat_j[self._hdu_table].data['GLOB_POSITION_IMG_LIST'][:, 0]
)
y += list(
starcat_j[self._hdu_table].data['GLOB_POSITION_IMG_LIST'][:, 1]
)
# RA and DEC positions
try:
ra += list(starcat_j[self._hdu_table].data['RA_LIST'][:])
dec += list(starcat_j[self._hdu_table].data['DEC_LIST'][:])
except Exception:
ra += list(np.zeros(
starcat_j[self._hdu_table].data[
'GLOB_POSITION_IMG_LIST'
][:, 0].shape,
dtype=int,
))
dec += list(np.zeros(
starcat_j[self._hdu_table].data[
'GLOB_POSITION_IMG_LIST'
][:, 0].shape,
dtype=int,
))
# shapes (convert sigmas to R^2)
g1_psf += list(
starcat_j[self._hdu_table].data['PSF_MOM_LIST'][:, 0]
)
g2_psf += list(
starcat_j[self._hdu_table].data['PSF_MOM_LIST'][:, 1]
)
size_psf += list(
starcat_j[self._hdu_table].data['PSF_MOM_LIST'][:, 2] ** 2
)
g1 += list(starcat_j[self._hdu_table].data['STAR_MOM_LIST'][:, 0])
g2 += list(starcat_j[self._hdu_table].data['STAR_MOM_LIST'][:, 1])
size += list(
starcat_j[self._hdu_table].data['STAR_MOM_LIST'][:, 2] ** 2
)
# flags
flag_psf += list(
starcat_j[self._hdu_table].data['PSF_MOM_LIST'][:, 3]
)
flag_star += list(
starcat_j[self._hdu_table].data['STAR_MOM_LIST'][:, 3]
)
# ccd id list
ccd_nb += list(starcat_j[self._hdu_table].data['CCD_ID_LIST'])
starcat_j.close()
# Shortcut name
MSC = MergeStarCatMCCD
# Regular pixel RMSE
tot_pixel_rmse = MSC.rmse_calc(pixel_mse, size_mse)
self._w_log.info(
f'MCCD_merge_starcat: Regular Total pixel RMSE ='
+ f' {tot_pixel_rmse:.5e}\n'
)
# Regular Total pixel mean
tot_pixel_mean = MSC.mean_calc(pixel_sum, size_mse)
self._w_log.info(
f'MCCD_merge_starcat: Regular Total pixel mean ='
+ f' {tot_pixel_mean:.5e}\n'
)
# Regular Total MASKED pixel RMSE
tot_masked_pixel_rmse = MSC.rmse_calc(masked_pixel_mse, masked_size)
self._w_log.info(
f'MCCD_merge_starcat: Regular Total MASKED pixel RMSE ='
+ f' {tot_masked_pixel_rmse:.5e}\n'
)
# Regular Total MASKED pixel mean
tot_masked_pixel_mean = MSC.mean_calc(masked_pixel_sum, masked_size)
self._w_log.info(
f'MCCD_merge_starcat: Regular Total MASKED pixel mean ='
+ f' {tot_masked_pixel_mean:.5e}\n'
)
# Normalized pixel RMSE
tot_pix_norm_rmse = MSC.rmse_calc(pix_norm_mse, size_norm_mse)
self._w_log.info(
'MCCD_merge_starcat: Normalized Total pixel RMSE ='
+ f' {tot_pix_norm_rmse:.5e}\n'
)
# Normalized filtered pixel RMSE
tot_pix_filt_rmse = MSC.rmse_calc(pix_filt_mse, size_filt_mse)
self._w_log.info(
'MCCD_merge_starcat: Filtered Total pixel RMSE ='
+ f' {tot_pix_filt_rmse:.5e}\n'
)
concat_model_var = np.concatenate(np.array(model_var))
concat_star_noise_var = np.concatenate(np.array(star_noise_var))
# Model variance
mean_model_var = MSC.mean_calc(concat_model_var, model_var_size)
std_model_var = MSC.std_calc(concat_model_var)
rmse_model_var = MSC.rmse_calc_2(concat_model_var, model_var_size)
self._w_log.info(
f'MCCD-RCA variance:\nMean Variance= {mean_model_var:.5e}\n'
+ f'Std Variance= {std_model_var:.5e}\n'
+ f'RMSE Variance= {rmse_model_var:.5e}\n'
)
# Star Noise Variance
mean_star_var = MSC.mean_calc(concat_star_noise_var, star_noise_size)
std_star_var = MSC.std_calc(concat_star_noise_var)
rmse_star_var = MSC.rmse_calc_2(concat_star_noise_var, star_noise_size)
self._w_log.info(
f'Masked stars variance:\nMean Variance= {mean_star_var:.5e}\n'
+ f'Std Variance= {std_star_var:.5e}\n'
+ f'RMSE Variance= {rmse_star_var:.5e}\n'
)
# Mask and transform to numpy arrays
flagmask = (
np.abs(np.array(flag_star) - 1) * np.abs(np.array(flag_psf) - 1)
)
psf_e1 = np.array(g1_psf)[flagmask.astype(bool)]
psf_e2 = np.array(g2_psf)[flagmask.astype(bool)]
psf_r2 = np.array(size_psf)[flagmask.astype(bool)]
star_e1 = np.array(g1)[flagmask.astype(bool)]
star_e2 = np.array(g2)[flagmask.astype(bool)]
star_r2 = np.array(size)[flagmask.astype(bool)]
rmse, mean, std_dev = MSC.stats_calculator(star_e1, psf_e1)
self._w_log.info(
f'Moment residual e1:\nMean= {mean:.5e}\nStd Dev= {std_dev:.5e}\n'
+ f'RMSE= {rmse:.5e}\n'
)
rmse, mean, std_dev = MSC.stats_calculator(star_e2, psf_e2)
self._w_log.info(
f'Moment residual e2:\nMean= {mean:.5e}\nStd Dev= {std_dev:.5e}\n'
+ f'RMSE= {rmse:.5e}\n'
)
rmse, mean, std_dev = MSC.stats_calculator(star_r2, psf_r2)
self._w_log.info(
f'Moment residual R2:\nMean= {mean:.5e}\nStd Dev= {std_dev:.5e}\n'
+ f'RMSE= {rmse:.5e}\n'
)
self._w_log.info(f'MCCD: Number of stars: {star_e1.shape[0]:d}')
# Prepare output FITS catalogue
output = file_io.FITSCatalogue(
f'{self._output_dir}/full_starcat-0000000.fits',
open_mode=file_io.BaseCatalogue.OpenMode.ReadWrite,
SEx_catalogue=True
)
# Collect columns
# convert back to sigma for consistency
data = {
'X': x,
'Y': y,
'RA': ra,
'DEC': dec,
'E1_PSF_HSM': g1_psf,
'E2_PSF_HSM': g2_psf,
'SIGMA_PSF_HSM': np.sqrt(size_psf),
'E1_STAR_HSM': g1,
'E2_STAR_HSM': g2,
'SIGMA_STAR_HSM': np.sqrt(size),
'FLAG_PSF_HSM': flag_psf,
'FLAG_STAR_HSM': flag_star,
'CCD_NB': ccd_nb
}
# Write file
output.save_as_fits(data, sex_cat_path=self._input_file_list[0][0])
[docs]class MergeStarCatPSFEX(object):
"""Merge Star Catalogue PSFEx.
Merge star catalogues of PSFEx PSF model output.
Parameters
----------
input_file_list : list
Input files
output_dir : str
Output directory
w_log : logging.Logger
Logging instance
hdu_table : int, optional
HDU number; default is ``2``
"""
def __init__(self, input_file_list, output_dir, w_log, hdu_table=2):
self._input_file_list = input_file_list
self._output_dir = output_dir
self._w_log = w_log
self._hdu_table = hdu_table
[docs] def process(self):
"""Process.
Process merging.
"""
x, y, ra, dec = [], [], [], []
g1_psf, g2_psf, size_psf = [], [], []
g1, g2, size = [], [], []
flag_psf, flag_star = [], []
mag, snr, psfex_acc = [], [], []
ccd_nb = []
self._w_log.info(
f'Merging {len(self._input_file_list)} star catalogues'
)
for name in self._input_file_list:
starcat_j = fits.open(name[0], memmap=False)
data_j = starcat_j[self._hdu_table].data
# positions
x += list(data_j['X'])
y += list(data_j['Y'])
ra += list(data_j['RA'])
dec += list(data_j['DEC'])
# shapes (convert sigmas to R^2)
g1_psf += list(data_j['E1_PSF_HSM'])
g2_psf += list(data_j['E2_PSF_HSM'])
size_psf += list(data_j['SIGMA_PSF_HSM']**2)
g1 += list(data_j['E1_STAR_HSM'])
g2 += list(data_j['E2_STAR_HSM'])
size += list(data_j['SIGMA_STAR_HSM']**2)
# flags
flag_psf += list(data_j['FLAG_PSF_HSM'])
flag_star += list(data_j['FLAG_STAR_HSM'])
# misc
mag += list(data_j['MAG'])
snr += list(data_j['SNR'])
psfex_acc += list(data_j['ACCEPTED'])
# CCD number
ccd_nb += [
re.split(r"\-([0-9]*)\-([0-9]+)\.", name[0])[-2]
] * len(data_j['RA'])
# Prepare output FITS catalogue
output = file_io.FITSCatalogue(
f'{self._output_dir}/full_starcat-0000000.fits',
open_mode=file_io.BaseCatalogue.OpenMode.ReadWrite,
SEx_catalogue=True
)
# Collect columns
# convert back to sigma for consistency
data = {
'X': x,
'Y': y,
'RA': ra,
'DEC': dec,
'E1_PSF_HSM': g1_psf,
'E2_PSF_HSM': g2_psf,
'SIGMA_PSF_HSM': np.sqrt(size_psf),
'E1_STAR_HSM': g1,
'E2_STAR_HSM': g2,
'SIGMA_STAR_HSM': np.sqrt(size),
'FLAG_PSF_HSM': flag_psf,
'FLAG_STAR_HSM': flag_star,
'MAG': mag,
'SNR': snr,
'ACCEPTED': psfex_acc,
'CCD_NB': ccd_nb
}
# Write file
output.save_as_fits(
data,
overwrite=True,
sex_cat_path=self._input_file_list[0][0],
)