"""MASK.
This module contains a class to create star mask for an image.
:Authors: Axel Guinot, Martin Kilbinger
"""
import os
import re
import numpy as np
from astropy import units, wcs
from astropy.coordinates import SkyCoord
from astropy.io import fits
from shapepipe.pipeline import file_io
from shapepipe.pipeline.config import CustomParser
from shapepipe.pipeline.execute import execute
from shapepipe.utilities.file_system import mkdir
[docs]class Mask(object):
"""Mask.
Class to create mask based on a star catalogue.
Parameters
----------
image_path : str
Path to image (FITS format)
weight_path : str
Path to the weight image (FITS format)
image_prefix : str
Prefix to input image name, specify as ``'none'`` for no prefix
image_num : str
File number identified
config_filepath : str
Path to the ``.mask`` config file
output_dir : str
Path to the output directory
w_log : logging.Logger
Log file
path_external_flag : str, optional
Path to external flag file, default is ``None`` (not used)
outname_base : str, optional
Output file name base, default is ``flag``
star_cat_path : str, optional
Path to external star catalogue, default is ``None`` (not used;
instead the star catalogue is produced on the fly at run time)
hdu : int, optional
HDU number, default is ``0``
"""
def __init__(
self,
image_path,
weight_path,
image_prefix,
image_num,
config_filepath,
output_dir,
w_log,
path_external_flag=None,
outname_base='flag',
star_cat_path=None,
hdu=0,
):
# Path to the image to mask
self._image_fullpath = image_path
# Path to the weight associated to the image
self._weight_fullpath = weight_path
# Input image prefix
if (image_prefix.lower() != 'none') and (image_prefix != ''):
self._img_prefix = f'{image_prefix}_'
else:
self._img_prefix = ''
# File number identified
self._img_number = image_num
# Path to mask config file
self._config_filepath = config_filepath
# Path to the output directory
self._output_dir = output_dir
# Log file
self._w_log = w_log
# Path to an external flag file
self._path_external_flag = path_external_flag
# Output file base name
self._outname_base = outname_base
# Set external star catalogue path if given
if star_cat_path is not None:
self._star_cat_path = star_cat_path
self._hdu = hdu
# Read mask config file
self._get_config()
# Set parameters needed for the star detection
self._set_image_coordinates()
# Set error flag
self._err = False
[docs] def _get_config(self):
"""Get Config.
Read the config file and set parameters.
Raises
------
ValueError
If config file name is ``None``
IOError
If config file not found
"""
if self._config_filepath is None:
raise ValueError('No path to config file given')
if not os.path.exists(self._config_filepath):
raise IOError(
f'Config file "{self._config_filepath}" not found'
)
conf = CustomParser()
conf.read(self._config_filepath)
self._config = {
'PATH': {},
'BORDER': {},
'HALO': {},
'SPIKE': {},
'MESSIER': {},
'NGC': {},
'MD': {},
}
if conf.has_option('PROGRAM_PATH', 'WW_PATH'):
self._config['PATH']['WW'] = (
conf.getexpanded('PROGRAM_PATH', 'WW_PATH')
)
else:
self._config['PATH']['WW'] = 'ww'
self._config['PATH']['WW_configfile'] = (
conf.getexpanded('PROGRAM_PATH', 'WW_CONFIG_FILE')
)
if conf.has_option('PROGRAM_PATH', 'CDSCLIENT_PATH'):
self._config['PATH']['CDSclient'] = (
conf.getexpanded('PROGRAM_PATH', 'CDSCLIENT_PATH')
)
elif self._star_cat_path is not None:
self._config['PATH']['star_cat'] = self._star_cat_path
else:
raise ValueError(
'Either [PROGRAM_PATH]:CDSCLIENT_PATH in the mask config file '
+ ' or a star catalogue as module input needs to be present'
)
self._config['PATH']['temp_dir'] = self._get_temp_dir_path(
conf.getexpanded('OTHER', 'TEMP_DIRECTORY')
)
self._config['BORDER']['make'] = (
conf.getboolean('BORDER_PARAMETERS', 'BORDER_MAKE')
)
if self._config['BORDER']['make']:
self._config['BORDER']['width'] = (
conf.getint('BORDER_PARAMETERS', 'BORDER_WIDTH')
)
self._config['BORDER']['flag'] = (
conf.get('BORDER_PARAMETERS', 'BORDER_FLAG_VALUE')
)
for mask_shape in ['HALO', 'SPIKE']:
self._config[mask_shape]['make'] = conf.getboolean(
f'{mask_shape}_PARAMETERS',
f'{mask_shape}_MAKE',
)
self._config[mask_shape]['individual'] = (
conf.getboolean('OTHER', 'KEEP_INDIVIDUAL_MASK')
)
if self._config[mask_shape]['make']:
self._config[mask_shape]['maskmodel_path'] = conf.getexpanded(
f'{mask_shape}_PARAMETERS',
f'{mask_shape}_MASKMODEL_PATH',
)
self._config[mask_shape]['mag_lim'] = conf.getfloat(
f'{mask_shape}_PARAMETERS',
f'{mask_shape}_MAG_LIM',
)
self._config[mask_shape]['scale_factor'] = conf.getfloat(
f'{mask_shape}_PARAMETERS',
f'{mask_shape}_SCALE_FACTOR',
)
self._config[mask_shape]['mag_pivot'] = conf.getfloat(
f'{mask_shape}_PARAMETERS',
f'{mask_shape}_MAG_PIVOT',
)
self._config[mask_shape]['flag'] = conf.getint(
f'{mask_shape}_PARAMETERS',
f'{mask_shape}_FLAG_VALUE',
)
if conf.getboolean('OTHER', 'KEEP_REG_FILE'):
reg_file = conf.getexpanded(
f'{mask_shape}_PARAMETERS',
f'{mask_shape}_REG_FILE',
)
self._config[mask_shape]['reg_file'] = (
f'{self._config["PATH"]["temp_dir"]}/'
+ f'{re.split(".reg", reg_file)[0]}'
+ f'{self._img_number}.reg'
)
else:
self._config[mask_shape]['reg_file'] = None
for mask_type in ['MESSIER', 'NGC']:
self._config[mask_type]['make'] = conf.getboolean(
f'{mask_type}_PARAMETERS',
f'{mask_type}_MAKE'
)
if self._config[mask_type]['make']:
self._config[mask_type]['cat_path'] = conf.getexpanded(
f'{mask_type}_PARAMETERS',
f'{mask_type}_CAT_PATH',
)
self._config[mask_type]['size_plus'] = conf.getfloat(
f'{mask_type}_PARAMETERS',
f'{mask_type}_SIZE_PLUS',
)
self._config[mask_type]['flag'] = conf.getint(
f'{mask_type}_PARAMETERS',
f'{mask_type}_FLAG_VALUE',
)
self._config['MD']['make'] = (
conf.getboolean('MD_PARAMETERS', 'MD_MAKE')
)
if self._config['MD']['make']:
self._config['MD']['thresh_flag'] = (
conf.getfloat('MD_PARAMETERS', 'MD_THRESH_FLAG')
)
self._config['MD']['thresh_remove'] = (
conf.getfloat('MD_PARAMETERS', 'MD_THRESH_REMOVE')
)
self._config['MD']['remove'] = (
conf.getboolean('MD_PARAMETERS', 'MD_REMOVE')
)
[docs] def _set_image_coordinates(self):
"""Set Image Coordinates.
Compute the image coordinates for matching with the star catalogue
and star mask.
"""
img = file_io.FITSCatalogue(self._image_fullpath, hdu_no=0)
img.open()
self._header = img.get_header()
img_shape = img.get_data().shape
img.close()
del img
self._wcs = wcs.WCS(self._header)
# Compute field center
# Note: get_data().shape corresponds to (n_y, n_x)
pix_center = [img_shape[1] / 2.0, img_shape[0] / 2.0]
wcs_center = self._wcs.all_pix2world([pix_center], 1)[0]
self._fieldcenter = {}
self._fieldcenter['pix'] = np.array(pix_center)
self._fieldcenter['wcs'] = (
SkyCoord(ra=wcs_center[0], dec=wcs_center[1], unit='deg')
)
# Get the four corners of the image
corners = self._wcs.calc_footprint()
self._corners_sc = SkyCoord(
ra=corners[:, 0] * units.degree,
dec=corners[:, 1] * units.degree,
)
# Compute image radius = image diagonal
self._img_radius = self._get_image_radius()
[docs] def make_mask(self):
"""Make Mask.
Main function to create the mask.
"""
if self._config['MD']['make']:
self.missing_data()
if self._config['HALO']['make'] or self._config['SPIKE']['make']:
stars = self.find_stars(
np.array([
self._fieldcenter['wcs'].ra.value,
self._fieldcenter['wcs'].dec.value
]),
radius=self._img_radius,
)
if not self._err:
for _type in ('HALO', 'SPIKE'):
if self._config[_type]['make']:
self._create_mask(
stars=stars,
types=_type,
mag_limit=self._config[_type]['mag_lim'],
scale_factor=self._config[_type]['scale_factor'],
mag_pivot=self._config[_type]['mag_pivot'],
)
if not self._err:
mask_name = []
if self._config['HALO']['make'] and self._config['SPIKE']['make']:
self._exec_WW(types='ALL')
mask_name.append(
f'{self._config["PATH"]["temp_dir"]}halo_spike_flag'
+ f'{self._img_number}.fits'
)
mask_name.append(None)
else:
for _type in ('HALO', 'SPIKE'):
if self._config[_type]['make']:
self._exec_WW(types=_type)
mask_name.append(
f'{self._config["PATH"]["temp_dir"]}'
+ f'{_type.lower()}_flag{self._img_number}.fits'
)
else:
mask_name.append(None)
masks_internal = {}
if not self._err:
if self._config['BORDER']['make']:
masks_internal['BORDER'] = self.mask_border(
width=self._config['BORDER']['width']
)
if not self._err:
for _type in ('MESSIER', 'NGC'):
if self._config[_type]['make']:
masks_internal[_type] = self.mask_dso(
self._config[_type]['cat_path'],
size_plus=self._config[_type]['size_plus'],
flag_value=self._config[_type]['flag'],
obj_type=_type,
)
if not self._err:
try:
im_pass = self._config['MD']['im_remove']
except Exception:
im_pass = True
if not self._err:
path_external_flag = self._path_external_flag
if not self._err:
if im_pass:
final_mask = self._build_final_mask(
path_mask1=mask_name[0],
path_mask2=mask_name[1],
masks_internal=masks_internal,
path_external_flag=path_external_flag,
)
if not self._config['HALO']['individual']:
if mask_name[0] is not None:
self._rm_fits1_stdout, self._rm_fits1_stderr = (
execute(f'rm {mask_name[0]}')
)
if mask_name[1] is not None:
self._rm_fits2_stdout, self._rm_fits2_stderr = (
execute(f'rm {mask_name[1]}')
)
output_file_name = (
f'{self._output_dir}/{self._img_prefix}'
+ f'{self._outname_base}{self._img_number}.fits'
)
self._mask_to_file(
input_mask=final_mask,
output_fullpath=output_file_name,
)
# Handle stdout / stderr
general_stdout = f'\nCDSClient\n{self._CDS_stdout}'
general_stderr = ''
if self._CDS_stderr != '':
general_stderr += f'\nCDSClient\n{self._CDS_stderr}'
if hasattr(self, '_WW_stdout') or hasattr(self, '_WW_stdout'):
general_stdout += f'\n\nWeightWatcher\n{self._WW_stdout}'
if self._WW_stderr != '':
general_stderr += f'\n\nWeightWatcher\n{self._WW_stderr}'
if hasattr(self, '_rm_reg_stderr') or hasattr(self, '_rm_reg_stdout'):
general_stdout += f'\n\nrm reg file\n{self._rm_reg_stdout}'
if self._rm_reg_stderr != '':
general_stderr += f'\n\nrm reg file\n{self._rm_reg_stderr}'
if (
hasattr(self, '_rm_fits1_stderr')
or hasattr(self, '_rm_fits1_stdout')
):
general_stdout += f'\n\nrm fits1 file\n{self._rm_fits1_stdout}'
if self._rm_fits1_stderr != '':
general_stderr += f'\n\nrm fits1 file\n{self._rm_fits1_stderr}'
if (
hasattr(self, '_rm_fits2_stderr')
or hasattr(self, '_rm_fits2_stdout')
):
general_stdout += f'\n\nrm fits2 file\n{self._rm_fits2_stdout}'
if self._rm_fits2_stderr != '':
general_stderr += f'\n\nrm fits2 file\n{self._rm_fits2_stderr}'
return general_stdout, general_stderr
[docs] def find_stars(self, position, radius):
"""Find Stars.
Return GSC (Guide Star Catalog) objects for a field with center
(RA, Dec) and radius :math:`r`.
Parameters
----------
position : numpy.ndarray
Position of the center of the field
radius : float
Radius in which the query is done (in arcmin)
Returns
-------
dict
Star dicotionnary for GSC objects in the field
Raises
------
ValueError
For invalid configuration options
"""
if 'CDSclient' in self._config['PATH']:
ra = position[0]
dec = position[1]
if dec > 0.0:
sign = '+'
else:
sign = ''
cmd_line = (
f'{self._config["PATH"]["CDSclient"]} {ra} {sign}{dec} '
+ f'-r {radius} -n 1000000'
)
self._CDS_stdout, self._CDS_stderr = execute(cmd_line)
elif 'star_cat' in self._config['PATH']:
f = open(self._config['PATH']['star_cat'], 'r')
self._CDS_stdout = f.read()
self._CDS_stderr = ''
f.close()
else:
raise ValueError(
'Either [PROGRAM_PATH]:CDSCLIENT_PATH in the mask config file '
+ ' or a star catalogue as module input needs to be present'
)
if self._CDS_stderr != '':
self._err = True
return None
return self._make_star_cat(self._CDS_stdout)
[docs] def mask_border(self, width=100, flag_value=4):
"""Create Mask Border.
Mask ``width`` pixels around the image.
Parameters
----------
width : int
Width of the mask mask border
flag_value : int
Value of the flag for the border (power of 2)
Returns
-------
numpy.ndarray
Array containing the mask
Raises
------
ValueError
If ``width`` is ``None``
"""
if width is None:
raise ValueError('Width for border mask not provided')
# Note that python image array is [y, x]
flag = np.zeros(
(
int(self._fieldcenter['pix'][1] * 2),
int(self._fieldcenter['pix'][0] * 2)
),
dtype='uint16',
)
flag[0:width, :] = flag_value
flag[-width:, :] = flag_value
flag[:, 0:width] = flag_value
flag[:, -width:] = flag_value
return flag
[docs] def mask_dso(
self,
cat_path,
size_plus=0.1,
flag_value=8,
obj_type='Messier',
):
"""Mask DSO.
Create a circular patch for deep-sky objects (DSOs), e.g.
Messier or NGC objects.
Parameters
----------
cat_path : str
Path to the deep-sky catalogue
size_plus : float
Increase the size of the mask by this factor
(e.g. ``0.1`` means 10%)
flag_value : int
Value of the flag, some power of 2
obj_type : {'Messier', 'NGO'}, optional
Object type
Returns
-------
numpy.ndarray or ``None``
If no deep-sky objects are found in the field return ``None`` and
the flag map
Raises
------
ValueError
If ``size_plus`` is negative
ValueError
If ``cat_path`` is ``None``
"""
if size_plus < 0:
raise ValueError(
'deep-sky mask size increase variable cannot be negative'
)
if cat_path is None:
raise ValueError('Path to deep-sky object catalogue not provided')
m_cat, header = fits.getdata(cat_path, header=True)
unit_ra = file_io.get_unit_from_fits_header(header, 'ra')
unit_dec = file_io.get_unit_from_fits_header(header, 'dec')
m_sc = SkyCoord(
ra=m_cat['ra'] * unit_ra,
dec=m_cat['dec'] * unit_dec,
)
unit_size_X = file_io.get_unit_from_fits_header(header, 'size_X')
unit_size_Y = file_io.get_unit_from_fits_header(header, 'size_Y')
# Loop through all deep-sky objects and check whether any corner is
# closer than the object's radius
indices = []
size_max_deg = []
for idx, m_obj in enumerate(m_cat):
# DSO size
# r = max(m_obj['size']) * units.arcmin
r = max(
m_obj['size_X'] * unit_size_X,
m_obj['size_Y'] * unit_size_Y,
)
r_deg = r.to(units.degree)
size_max_deg.append(r_deg)
# Add index to list if distance between DSO and any image corner
# is closer than DSO size
if np.any(self._corners_sc.separation(m_sc[idx]) < r_deg):
indices.append(idx)
self._w_log.info(
f'Found {len(indices)} {obj_type} objects overlapping with'
' image'
)
if len(indices) == 0:
# No closeby deep-sky object found
return None
# Compute number of DSO center coordinates in footprint, for logging
# purpose only
n_dso_center_in_footprint = 0
for idx in indices:
in_img = self._wcs.footprint_contains(m_sc[idx])
self._w_log.info(
'(obj_type, ra, dec, in_img) = '
+ f'({obj_type}, '
+ f'{m_cat["ra"][idx]}, '
+ f'{m_cat["dec"][idx]}, '
+ f'{in_img})'
)
# Note: python image array is [y, x]
flag = np.zeros(
(
int(self._fieldcenter['pix'][1] * 2),
int(self._fieldcenter['pix'][0] * 2)
),
dtype='uint16',
)
nx = self._fieldcenter['pix'][0] * 2
ny = self._fieldcenter['pix'][1] * 2
for idx in indices:
m_center = np.hstack(self._wcs.all_world2pix(
m_cat['ra'][idx],
m_cat['dec'][idx],
0,
))
r_pix = (
size_max_deg[idx].to(units.deg).value * (1 + size_plus)
/ np.abs(self._wcs.pixel_scale_matrix[0][0])
)
# The following accounts for deep-sky centers outside of image,
# without creating masks for coordinates out of range
y_c, x_c = np.ogrid[0:ny, 0:nx]
mask_tmp = (
(x_c - m_center[0]) ** 2 + (y_c - m_center[1]) ** 2
<= r_pix ** 2
)
flag[mask_tmp] = flag_value
return flag
[docs] def missing_data(self):
"""Find Missing Data.
Look for zero-valued pixels in image. Flag if their relative number
is larger than a threshold.
"""
# Open image
img = file_io.FITSCatalogue(self._image_fullpath, hdu_no=0)
img.open()
# Get total number of pixels
im_shape = img.get_data().shape
tot = float(im_shape[0] * im_shape[1])
# Compute number and ratio of missing data (zero-valued pixels)
missing = float(len(np.where(img.get_data() == 0.)[0]))
self._ratio = missing / tot
# Mark image as to be flagged if ratio larger than 'flag' threshold
if self._ratio >= self._config['MD']['thresh_flag']:
self._config['MD']['im_flagged'] = True
else:
self._config['MD']['im_flagged'] = False
# Mark image as to be removed if flag is True and
# ratio large than 'remove' threshold.
# Reset all other mask 'make' flags to False (no other mask needs
# to be created)
if self._config['MD']['remove']:
if self._ratio >= self._config['MD']['thresh_remove']:
self._config['MD']['im_remove'] = True
for idx in ['HALO', 'SPIKE', 'MESSIER', 'BORDER']:
self._config[idx]['make'] = False
else:
self._config['MD']['im_remove'] = False
img.close()
[docs] def sphere_dist(self, position1, position2):
"""Compute Spherical Distance.
Compute spherical distance between 2 points.
Parameters
----------
position1 : numpy.ndarray
[x,y] first point (in pixels)
position2 : numpy.ndarray
[x,y] second point (in pixels)
Returns
-------
float
The distance in degrees.
Raises
------
ValueError
If input positions are not Numpy arrays
"""
if (
type(position1) is not np.ndarray
or type(position2) is not np.ndarray
):
raise ValueError('Object coordinates need to be a numpy.ndarray')
p1 = (np.pi / 180.0) * np.hstack(
self._wcs.all_pix2world(position1[0], position1[1], 1)
)
p2 = (np.pi / 180.0) * np.hstack(
self._wcs.all_pix2world(position2[0], position2[1], 1)
)
dTheta = p1 - p2
dLong = dTheta[0]
dLat = dTheta[1]
dist = 2 * np.arcsin(np.sqrt(
np.sin(dLat / 2.0) ** 2.0 + np.cos(p1[1]) * np.cos(p2[1])
* np.sin(dLong / 2.0) ** 2.0
))
return dist * (180.0 / np.pi) * 3600.0
[docs] def _get_image_radius(self, center=None):
"""Get Image Radius.
Compute the diagonal distance of the image in arcmin.
Parameters
----------
center : numpy.ndarray, optional
Coordinates of the center of the image (in pixels)
Returns
-------
float
The diagonal distance of the image in arcmin
Raises
------
TypeError
If centre is not a Numpy array
"""
if center is None:
return (
self.sphere_dist(self._fieldcenter['pix'], np.zeros(2)) / 60.0
)
else:
if isinstance(center, np.ndarray):
return self.sphere_dist(center, np.zeros(2)) / 60.0
else:
raise TypeError(
'Image center coordinates has to be a numpy.ndarray'
)
[docs] def _make_star_cat(self, CDSclient_output):
"""Make Star Catalogue.
Make a dictionary from findgsc2.2 output.
Parameters
----------
CDSclient_output : str
Output of findgsc2.2
Returns
-------
dict
Star dictionary containing all information
"""
header = []
stars = {}
# get header
for key in CDSclient_output.splitlines()[3].split(' '):
if (key != '') and (key != ';'):
# cleaning output
key = key.replace(' ', '')
for key_split in re.split(',|#|;', key):
if key_split != '':
key = key_split
header.append(key)
stars[key] = []
# get data
for elem in range(4, len(CDSclient_output.splitlines()) - 5):
idx = 0
for key in CDSclient_output.splitlines()[elem].split(' '):
if (key != '') and (key != ';'):
# cleaning output
key = key.replace(' ', '')
for key_split in re.split(',|#|;', key):
if key_split != '':
key = key_split
# handle missing data
try:
key = float(key)
stars[header[idx]].append(key)
except Exception:
if key == '---':
stars[header[idx]].append(None)
else:
stars[header[idx]].append(key)
idx += 1
return stars
[docs] def _create_mask(
self,
stars,
types='HALO',
mag_limit=18.0,
mag_pivot=13.8,
scale_factor=0.3,
):
"""Create Mask.
Apply mask from model to stars and save into DS9 region file.
Parameters
----------
stars : dict
Stars dictionary (output of ``find_stars``)
types : {'HALO', 'SPIKE'}, optional
Type of mask, options are ``HALO`` or ``SPIKE``
mag_limit : float, optional
Faint magnitude limit for mask, default is ``18.0``
mag_pivot : float, optional
Pivot magnitude for the model, default is ``13.8``
scale_factor : float, optional
Scaling for the model, default is ``0.3``
Raises
------
ValueError
If no star catalogue is provided
ValueError
If an invalid option is provided for type
"""
if stars is None:
raise ValueError('Star catalogue dictionary not provided')
if types not in ('HALO', 'SPIKE'):
raise ValueError('Mask types need to be in ["HALO", "SPIKE"]')
if self._config[types]['reg_file'] is None:
reg = (
f'{self._config["PATH"]["temp_dir"]}{types.lower()}'
+ f'{self._img_number}.reg'
)
else:
reg = self._config[types]['reg_file']
mask_model = np.loadtxt(
self._config[types]['maskmodel_path']
).transpose()
mask_reg = open(reg, 'w')
stars_used = [[], [], []]
star_zip = zip(
stars['RA(J2000)'],
stars['Dec(J2000)'],
stars['Fmag'],
stars['Jmag'],
stars['Vmag'],
stars['Nmag'],
stars['Clas'],
)
for ra, dec, Fmag, Jmag, Vmag, Nmag, clas in star_zip:
mag = 0.0
idx = 0.0
if Fmag is not None:
mag += Fmag
idx += 1.0
if Jmag is not None:
mag += Jmag
idx += 1.0
if Vmag is not None:
mag += Vmag
idx += 1.0
if Nmag is not None:
mag += Nmag
idx += 1.0
if idx == 0.0:
mag = None
else:
mag /= idx
if (
ra is not None and dec is not None and mag is not None
and clas is not None
):
if (mag < mag_limit) and (clas == 0):
scaling = 1.0 - scale_factor * (mag - mag_pivot)
pos = self._wcs.all_world2pix(ra, dec, 0)
stars_used[0].append(pos[0])
stars_used[1].append(pos[1])
stars_used[2].append(scaling)
for idx in range(len(stars_used[0])):
poly = 'polygon('
for x, y in zip(mask_model[0], mask_model[1]):
angle = np.arctan2(y, x)
ll = stars_used[2][idx] * np.sqrt(x ** 2 + y ** 2)
xnew = ll * np.cos(angle)
ynew = ll * np.sin(angle)
poly = (
f'{poly}{str(stars_used[0][idx] + xnew + 0.5)} '
+ f'{str(stars_used[1][idx] + ynew + 0.5)} '
)
poly = f'{poly})\n'
mask_reg.write(poly)
mask_reg.close()
[docs] def _exec_WW(self, types='HALO'):
"""Execute WeightWatcher.
Execute WeightWatcher to transform ``.reg`` to ``.fits`` flag map.
Parameters
----------
types : {'HALO', 'SPIKE', 'ALL'}, optional
Type of WeightWatcher execution, options are ``HALO``,
``SPIKE`` or ``ALL``
Raises
------
BaseCatalogue.CatalogFileNotFound
If catalogue file not found
"""
if types in ('HALO', 'SPIKE'):
default_reg = (
f'{self._config["PATH"]["temp_dir"]}{types.lower()}'
+ f'{self._img_number}.reg'
)
default_out = (
f'{self._config["PATH"]["temp_dir"]}{types.lower()}_flag'
+ f'{self._img_number}.fits'
)
if self._config[types]['reg_file'] is None:
reg = default_reg
if not file_io.BaseCatalogue(reg)._file_exists(reg):
raise file_io.BaseCatalogue.CatalogFileNotFound(reg)
cmd = (
f'{self._config["PATH"]["WW"]} '
+ f'-c {self._config["PATH"]["WW_configfile"]} '
+ f'-WEIGHT_NAMES {self._weight_fullpath} '
+ f'-POLY_NAMES {reg} '
+ f'-POLY_OUTFLAGS {self._config[types]["flag"]} '
+ f'-FLAG_NAMES "" -OUTFLAG_NAME {default_out} '
+ '-OUTWEIGHT_NAME ""'
)
self._WW_stdout, self._WW_stderr = execute(cmd)
self._rm_reg_stdout, self._rm_reg_stderr = (
execute(f'rm {reg}')
)
else:
reg = self._config[types]['reg_file']
if not file_io.BaseCatalogue(reg)._file_exists(reg):
raise file_io.BaseCatalogue.CatalogFileNotFound(reg)
cmd = (
f'{self._config["PATH"]["WW"]} '
+ f'-c {self._config["PATH"]["WW_configfile"]} '
+ f'-WEIGHT_NAMES {self._weight_fullpath} '
+ f'-POLY_NAMES {reg} '
+ f'-POLY_OUTFLAGS {self._config[types]["flag"]} '
+ f'-FLAG_NAMES "" -OUTFLAG_NAME {default_out} '
+ '-OUTWEIGHT_NAME ""'
)
self._WW_stdout, self._WW_stderr = execute(cmd)
elif types == 'ALL':
default_reg = [
(
f'{self._config["PATH"]["temp_dir"]}'
+ f'halo{self._img_number}.reg'
),
(
f'{self._config["PATH"]["temp_dir"]}'
+ f'spike{self._img_number}.reg'
)
]
default_out = (
f'{self._config["PATH"]["temp_dir"]}'
+ f'halo_spike_flag{self._img_number}.fits'
)
if self._config['HALO']['reg_file'] is None:
reg = default_reg
for idx in range(2):
if not (
file_io.BaseCatalogue(reg[idx])._file_exists(reg[idx])
):
raise (
file_io.BaseCatalogue.CatalogFileNotFound(reg[idx])
)
cmd = (
f'{self._config["PATH"]["WW"]} '
+ f'-c {self._config["PATH"]["WW_configfile"]} '
+ f'-WEIGHT_NAMES {self._weight_fullpath} '
+ f'-POLY_NAMES {reg[0]},{reg[1]} '
+ f'-POLY_OUTFLAGS {self._config["HALO"]["flag"]},'
+ f'{self._config["SPIKE"]["flag"]} '
+ f'-FLAG_NAMES "" -OUTFLAG_NAME {default_out} '
+ '-OUTWEIGHT_NAME ""'
)
self._WW_stdout, self._WW_stderr = execute(cmd)
self._rm_reg_stdout, self._rm_reg_stderr = (
execute(f'rm {reg[0]} {reg[1]}')
)
else:
reg = [
self._config['HALO']['reg_file'],
self._config['SPIKE']['reg_file']
]
for idx in range(2):
if not (
file_io.BaseCatalogue(reg[idx])._file_exists(reg[idx])
):
raise (
file_io.BaseCatalogue.CatalogFileNotFound(reg[idx])
)
cmd = (
f'{self._config["PATH"]["WW"]} '
+ f'-c {self._config["PATH"]["WW_configfile"]} '
+ f'-WEIGHT_NAMES {self._weight_fullpath} '
+ f'-POLY_NAMES {reg[0]},{reg[1]} '
+ f'-POLY_OUTFLAGS {self._config["HALO"]["flag"]},'
+ f'{self._config["SPIKE"]["flag"]} '
+ f'-FLAG_NAMES "" -OUTFLAG_NAME {default_out} '
+ '-OUTWEIGHT_NAME ""'
)
self._WW_stdout, self._WW_stderr = execute(cmd)
else:
ValueError("Types must be in ['HALO','SPIKE','ALL']")
if (self._WW_stderr != '') or (self._rm_reg_stderr != ''):
self._err = True
[docs] def _build_final_mask(
self,
path_mask1,
path_mask2=None,
masks_internal=None,
path_external_flag=None,
):
"""Create Final Mask.
Create the final mask by combining the individual masks.
Parameters
----------
path_mask1 : str
Path to a mask (FITS format)
path_mask2 : str, optional
Path to a mask (FITS format)
masks_internal : dict, optional
Internally created masks
path_external_flag : str, optional
Path to an external flag file
Returns
-------
numpy.ndarray
Array containing the final mask
Raises
------
ValueError
If all masks are of type ``None``
TypeError
If border is not a Numpy array
TypeError
If Messier mask is not a Numpy array
"""
final_mask = None
if (
path_mask1 is None and path_mask2 is None
and not masks_internal
):
raise ValueError(
'No paths to mask files containing halos and/or spikes,'
+ ' borders, or deep-sky objects provided'
)
if path_mask1 is not None:
mask1 = file_io.FITSCatalogue(path_mask1, hdu_no=self._hdu)
mask1.open()
dat = mask1.get_data()
final_mask = dat[:, :]
if path_mask2 is not None:
mask2 = file_io.FITSCatalogue(path_mask2, hdu_no=self._hdu)
mask2.open()
if final_mask is not None:
final_mask += mask2.get_data()[:, :]
else:
final_mask = mask2.get_data()[:, :]
for typ in masks_internal:
if masks_internal[typ] is not None:
if type(masks_internal[typ]) is np.ndarray:
if final_mask is not None:
final_mask += masks_internal[typ]
else:
final_mask = masks_internal[typ]
else:
raise TypeError(
f'internally created mask of type {typ} '
+ 'has to be numpy.ndarray'
)
if path_external_flag is not None:
external_flag = file_io.FITSCatalogue(
path_external_flag,
hdu_no=self._hdu,
)
external_flag.open()
if final_mask is not None:
final_mask += external_flag.get_data()[:, :]
else:
final_mask = external_flag.get_data()[:, :]
external_flag.close()
return final_mask.astype(np.int16, copy=False)
[docs] def _mask_to_file(self, input_mask, output_fullpath):
"""Mask to File.
Save the mask to a fits file.
Parameters
----------
input_mask : numpy.ndarray
Mask to save
output_fullpath : str
Path of the output file
Raises
------
ValueError
If input_mask is type ``None``
ValueError
If output_fullpath is type ``None``
"""
if input_mask is None:
raise ValueError('input mask file path not provided')
if output_fullpath is None:
raise ValueError('output mask file path not provided')
out = file_io.FITSCatalogue(
output_fullpath,
open_mode=file_io.BaseCatalogue.OpenMode.ReadWrite,
hdu_no=0,
)
out.save_as_fits(data=input_mask, image=True)
if self._config['MD']['make']:
out.open()
out.add_header_card(
'MRATIO',
self._ratio,
'ratio missing_pixels/all_pixels',
)
out.add_header_card(
'MFLAG',
self._config['MD']['im_flagged'],
f'threshold value {self._config["MD"]["thresh_flag"]:.3}',
)
# Write WCS information to header
if self._wcs:
header_wcs = self._wcs.to_header()
for card in header_wcs:
out.add_header_card(
card,
header_wcs[card],
header_wcs.comments[card],
)
out.close()
[docs] def _get_temp_dir_path(self, temp_dir_path):
"""Get Temporary Directory Path.
Create the path and the directory for temporary files.
Parameters
----------
temp_dir_path : str
Path to the temporary directory, a value of ``OUTPUT`` will include
the temporary files in the run directory
Returns
-------
str
Path to the temporary directory
Raises
------
ValueError
If ``temp_dir_path`` is of type ``None``
"""
if temp_dir_path is None:
raise ValueError('Temporary directory path not provided')
path = temp_dir_path.replace(' ', '')
if path == 'OUTPUT':
path = f'{self._output_dir}/temp'
path += '/'
if not os.path.isdir(path):
mkdir(path)
return path