"""RANDOM CATALOGUE.
This module contains a class to create a random catalogue, and to compute
the tile area accounting for overlapping and masked regions.
:Author: Martin Kilbinger <martin.kilbinger@cea.fr>
"""
import os
import re
import numpy as np
import astropy.io.fits as fits
from astropy import wcs
from astropy.table import Table
from reproject import reproject_to_healpix
from shapepipe.pipeline import file_io
from shapepipe.utilities import cfis
[docs]class RandomCat():
"""Random Catalogue.
This class creates a random catalogue given a mask FITS file.
Parameters
----------
input_image_path : str
Path to input image file
input_mask_path : str
Path to input mask file
output_dir : str
Output directory
file_number_pattern : str
ShapePipe image ID string
output_file_pattern : str
Output file pattern (base name) for random catalogue
n_rand : float
Number of random objects on output
density : bool
``n_rand`` is interpreted per square degrees if ``True``
w_log : logging.Logger
Logging instance
healpix_options : dict
Parameters for HEALPix output mask file
"""
def __init__(
self,
input_image_path,
input_mask_path,
output_dir,
file_number_string,
output_file_pattern,
n_rand,
density,
w_log,
healpix_options,
):
self._input_image_path = input_image_path
self._input_mask_path = input_mask_path
self._output_dir = output_dir
self._file_number_string = file_number_string
self._output_file_pattern = output_file_pattern
self._n_rand = n_rand
self._density = density
self._w_log = w_log
self._healpix_options = healpix_options
[docs] def save_as_healpix(self, mask, header):
"""Save As Healpix.
Save mask as healpix FITS file.
Parameters
----------
mask : numpy.ndarray
2D pixel mask image
header : class Header
Image header with WCS information
"""
if not self._healpix_options:
return
# Tranform config entry from str to int
nside = int(self._healpix_options['NSIDE'])
mask_1d, footprint = reproject_to_healpix(
(mask, header),
'galactic',
nside=nside,
)
t = Table()
t['flux'] = mask_1d
t.meta['ORDERING'] = 'RING'
t.meta['COORDSYS'] = 'G'
t.meta['NSIDE'] = nside
t.meta['INDXSCHM'] = 'IMPLICIT'
output_path = (
f'{self._output_dir}/{self._healpix_options["FILE_BASE"]}-'
+ f'{self._file_number_string}.fits'
)
t.write(output_path)
[docs] def process(self):
"""Process.
Main function to identify exposures.
"""
# Read image FITS file header
try:
img = fits.open(self._input_image_path)
header = img[0].header
except (OSError, IOError) as error:
# FITS file might contain only header.
# Try as ascii file
try:
fin = open(self._input_image_path)
header = fits.Header.fromtextfile(fin)
fin.close()
except Exception:
raise
# Get WCS
WCS = wcs.WCS(header)
# Read mask FITS file
hdu_mask = fits.open(self._input_mask_path)
mask = hdu_mask[0].data
# Save mask in healpix format (if option is set)
self.save_as_healpix(mask, header)
# Number of pixels
n_pix_x = mask.data.shape[0]
n_pix_y = mask.data.shape[1]
n_pix = n_pix_x * n_pix_y
# Number of non-masked pixels
n_unmasked = len(np.where(mask == 0)[0])
# Compute various areas
# Pixel area in deg^2
area_pix = wcs.utils.proj_plane_pixel_area(WCS)
# Tile area
area_deg2 = area_pix * n_pix
# Area of unmasked region
area_deg2_eff = area_pix * n_unmasked
# Compute number of requested objects
if n_unmasked > 0:
if not self._density:
# Use value from config file
n_obj = self._n_rand
else:
# Compute number of objects from density
n_obj = int(
self._n_rand / area_deg2 * area_deg2_eff / area_deg2
)
# Check that a reasonably large number of pixels is not masked
if n_unmasked < n_obj:
raise ValueError(
f'Number of un-masked pixels {n_unmasked} is smaller '
+ f'than number of random objects requested {n_obj}'
)
else:
n_obj = 0
self._w_log.info(f'Creating {n_obj} random objects')
# Draw points until n are in mask
n_found = 0
xy_rand = []
while n_found < n_obj:
idx_x = np.random.randint(n_pix_x)
idx_y = np.random.randint(n_pix_y)
# Add points with additional random sub-pixel value
if mask[idx_x, idx_y] == 0:
d = np.random.random(2)
# MKDEBUG: the following seems to work, x and y interchanged
xy_rand.append([idx_y + d[1], idx_x + d[0]])
n_found = n_found + 1
xy_rand = np.array(xy_rand)
# Transform to WCS
res = WCS.all_pix2world(xy_rand, 1)
if n_unmasked > 0:
ra_rand = res[:, 0]
dec_rand = res[:, 1]
x_rand = xy_rand[:, 0]
y_rand = xy_rand[:, 1]
else:
ra_rand = []
dec_rand = []
x_rand = []
y_rand = []
# Tile ID
output_path = (
f'{self._output_dir}/{self._output_file_pattern}-'
+ f'{self._file_number_string}.fits'
)
file_name = os.path.split(output_path)[1]
file_base = os.path.splitext(file_name)[0]
tile_ID_str = re.split('-', file_base)[1:]
tile_id = float('.'.join(tile_ID_str))
tile_id_array = np.ones(n_obj) * tile_id
# Write to output
cat_out = [
ra_rand,
dec_rand,
x_rand,
y_rand,
tile_id_array
]
column_names = ['RA', 'DEC', 'x', 'y', 'TILE_ID']
# TODO: Add units to header
output = file_io.FITSCatalogue(
output_path,
open_mode=file_io.BaseCatalogue.OpenMode.ReadWrite
)
output.save_as_fits(cat_out, names=column_names)
# Write area information to log file
self._w_log.info(f'Total area = {area_deg2:.4f} deg^2')
self._w_log.info(f'Unmasked area = {area_deg2_eff:.4f} deg^2')
self._w_log.info(
f'Ratio masked to total pixels = {n_unmasked / n_pix:.3f}'
)