import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import galsim as gs
import wf_psf.utils.utils as utils
from wf_psf.psf_models.tf_psf_field import build_PSF_model
from wf_psf.psf_models import tf_psf_field as psf_field
from wf_psf.sims import SimPSFToolkit as SimPSFToolkit
import logging
logger = logging.getLogger(__name__)
[docs]
def compute_poly_metric(
tf_semiparam_field,
GT_tf_semiparam_field,
simPSF_np,
tf_pos,
tf_SEDs,
n_bins_lda=20,
n_bins_gt=20,
batch_size=16,
dataset_dict=None,
):
"""Calculate metrics for polychromatic reconstructions.
The ``tf_semiparam_field`` should be the model to evaluate, and the
``GT_tf_semiparam_field`` should be loaded with the ground truth PSF field.
Relative values returned in [%] (so multiplied by 100).
Parameters
----------
tf_semiparam_field: PSF field object
Trained model to evaluate.
GT_tf_semiparam_field: PSF field object
Ground truth model to produce GT observations at any position
and wavelength.
simPSF_np: PSF simulator object
Simulation object to be used by ``generate_packed_elems`` function.
tf_pos: Tensor or numpy.ndarray [batch x 2] floats
Positions to evaluate the model.
tf_SEDs: numpy.ndarray [batch x SED_samples x 2]
SED samples for the corresponding positions.
n_bins_lda: int
Number of wavelength bins to use for the polychromatic PSF.
n_bins_gt: int
Number of wavelength bins to use for the ground truth polychromatic PSF.
batch_size: int
Batch size for the PSF calcualtions.
dataset_dict: dict
Dictionary containing the dataset information. If provided, and if the `'stars'` key
is present, the noiseless stars from the dataset are used to compute the metrics.
Otherwise, the stars are generated from the GT model.
Default is `None`.
Returns
-------
rmse: float
RMSE value.
rel_rmse: float
Relative RMSE value. Values in %.
std_rmse: float
Sstandard deviation of RMSEs.
std_rel_rmse: float
Standard deviation of relative RMSEs. Values in %.
"""
# Generate SED data list for the model
packed_SED_data = [
utils.generate_packed_elems(_sed, simPSF_np, n_bins=n_bins_lda)
for _sed in tf_SEDs
]
tf_packed_SED_data = tf.convert_to_tensor(packed_SED_data, dtype=tf.float32)
tf_packed_SED_data = tf.transpose(tf_packed_SED_data, perm=[0, 2, 1])
pred_inputs = [tf_pos, tf_packed_SED_data]
# Model prediction
preds = tf_semiparam_field.predict(x=pred_inputs, batch_size=batch_size)
# GT data preparation
if dataset_dict is None or "stars" not in dataset_dict:
logger.info("Regenerating GT stars from model.")
# Change interpolation parameters for the GT simPSF
interp_pts_per_bin = simPSF_np.SED_interp_pts_per_bin
simPSF_np.SED_interp_pts_per_bin = 0
SED_sigma = simPSF_np.SED_sigma
simPSF_np.SED_sigma = 0
# Generate SED data list for GT model
packed_SED_data = [
utils.generate_packed_elems(_sed, simPSF_np, n_bins=n_bins_gt)
for _sed in tf_SEDs
]
tf_packed_SED_data = tf.convert_to_tensor(packed_SED_data, dtype=tf.float32)
tf_packed_SED_data = tf.transpose(tf_packed_SED_data, perm=[0, 2, 1])
pred_inputs = [tf_pos, tf_packed_SED_data]
# GT model prediction
GT_preds = GT_tf_semiparam_field.predict(x=pred_inputs, batch_size=batch_size)
else:
logger.info("Using GT stars from dataset.")
GT_preds = dataset_dict["stars"]
# Calculate residuals
residuals = np.sqrt(np.mean((GT_preds - preds) ** 2, axis=(1, 2)))
GT_star_mean = np.sqrt(np.mean((GT_preds) ** 2, axis=(1, 2)))
# RMSE calculations
rmse = np.mean(residuals)
rel_rmse = 100.0 * np.mean(residuals / GT_star_mean)
# STD calculations
std_rmse = np.std(residuals)
std_rel_rmse = 100.0 * np.std(residuals / GT_star_mean)
# Print RMSE values
logger.info("Absolute RMSE:\t %.4e \t +/- %.4e" % (rmse, std_rmse))
logger.info("Relative RMSE:\t %.4e %% \t +/- %.4e %%" % (rel_rmse, std_rel_rmse))
return rmse, rel_rmse, std_rmse, std_rel_rmse
[docs]
def compute_mono_metric(
tf_semiparam_field,
GT_tf_semiparam_field,
simPSF_np,
tf_pos,
lambda_list,
batch_size=32,
):
"""Calculate metrics for monochromatic reconstructions.
The ``tf_semiparam_field`` should be the model to evaluate, and the
``GT_tf_semiparam_field`` should be loaded with the ground truth PSF field.
Relative values returned in [%] (so multiplied by 100).
Parameters
----------
tf_semiparam_field: PSF field object
Trained model to evaluate.
GT_tf_semiparam_field: PSF field object
Ground truth model to produce GT observations at any position
and wavelength.
simPSF_np: PSF simulator object
Simulation object capable of calculating ``phase_N`` values from
wavelength values.
tf_pos: list of floats [batch x 2]
Positions to evaluate the model.
lambda_list: list of floats [wavelength_values]
List of wavelength values in [um] to evaluate the model.
batch_size: int
Batch size to process the monochromatic PSF calculations.
Returns
-------
rmse_lda: list of float
List of RMSE as a function of wavelength.
rel_rmse_lda: list of float
List of relative RMSE as a function of wavelength. Values in %.
std_rmse_lda: list of float
List of standard deviation of RMSEs as a function of wavelength.
std_rel_rmse_lda: list of float
List of standard deviation of relative RMSEs as a function of
wavelength. Values in %.
"""
# Initialise lists
rmse_lda = []
rel_rmse_lda = []
std_rmse_lda = []
std_rel_rmse_lda = []
total_samples = tf_pos.shape[0]
# Main loop for each wavelength
for it in range(len(lambda_list)):
# Set the lambda (wavelength) and the required wavefront N
lambda_obs = lambda_list[it]
phase_N = simPSF_np.feasible_N(lambda_obs)
residuals = np.zeros((total_samples))
GT_star_mean = np.zeros((total_samples))
# Total number of epochs
n_epochs = int(np.ceil(total_samples / batch_size))
ep_low_lim = 0
for ep in range(n_epochs):
# Define the upper limit
if ep_low_lim + batch_size >= total_samples:
ep_up_lim = total_samples
else:
ep_up_lim = ep_low_lim + batch_size
# Extract the batch
batch_pos = tf_pos[ep_low_lim:ep_up_lim, :]
# Estimate the monochromatic PSFs
GT_mono_psf = GT_tf_semiparam_field.predict_mono_psfs(
input_positions=batch_pos, lambda_obs=lambda_obs, phase_N=phase_N
)
model_mono_psf = tf_semiparam_field.predict_mono_psfs(
input_positions=batch_pos, lambda_obs=lambda_obs, phase_N=phase_N
)
num_pixels = GT_mono_psf.shape[1] * GT_mono_psf.shape[2]
residuals[ep_low_lim:ep_up_lim] = (
np.sum((GT_mono_psf - model_mono_psf) ** 2, axis=(1, 2)) / num_pixels
)
GT_star_mean[ep_low_lim:ep_up_lim] = (
np.sum((GT_mono_psf) ** 2, axis=(1, 2)) / num_pixels
)
# Increase lower limit
ep_low_lim += batch_size
# Calculate residuals
residuals = np.sqrt(residuals)
GT_star_mean = np.sqrt(GT_star_mean)
# RMSE calculations
rmse_lda.append(np.mean(residuals))
rel_rmse_lda.append(100.0 * np.mean(residuals / GT_star_mean))
# STD calculations
std_rmse_lda.append(np.std(residuals))
std_rel_rmse_lda.append(100.0 * np.std(residuals / GT_star_mean))
return rmse_lda, rel_rmse_lda, std_rmse_lda, std_rel_rmse_lda
[docs]
def compute_opd_metrics(tf_semiparam_field, GT_tf_semiparam_field, pos, batch_size=16):
"""Compute the OPD metrics.
Need to handle a batch size to avoid Out-Of-Memory errors with
the GPUs. This is specially due to the fact that the OPD maps
have a higher dimensionality than the observed PSFs.
The OPD RMSE is computed after having removed the mean from the
different reconstructions. It is computed only on the
non-obscured elements from the OPD.
Parameters
----------
tf_semiparam_field: PSF field object
Trained model to evaluate.
GT_tf_semiparam_field: PSF field object
Ground truth model to produce GT observations at any position
and wavelength.
pos: numpy.ndarray [batch x 2]
Positions at where to predict the OPD maps.
batch_size: int
Batch size to process the OPD calculations.
Returns
-------
rmse: float
Absolute RMSE value.
rel_rmse: float
Relative RMSE value.
rmse_std: float
Absolute RMSE standard deviation.
rel_rmse_std: float
Relative RMSE standard deviation.
"""
# Get OPD obscurations
np_obscurations = np.real(GT_tf_semiparam_field.obscurations.numpy())
# Define total number of samples
n_samples = pos.shape[0]
# Initialise batch variables
opd_batch = None
GT_opd_batch = None
counter = 0
# Initialise result lists
rmse_vals = np.zeros(n_samples)
rel_rmse_vals = np.zeros(n_samples)
while counter < n_samples:
# Calculate the batch end element
if counter + batch_size <= n_samples:
end_sample = counter + batch_size
else:
end_sample = n_samples
# Define the batch positions
batch_pos = pos[counter:end_sample, :]
# We calculate a batch of OPDs
opd_batch = tf_semiparam_field.predict_opd(batch_pos).numpy()
GT_opd_batch = GT_tf_semiparam_field.predict_opd(batch_pos).numpy()
# Remove the mean of the OPD
opd_batch -= np.mean(opd_batch, axis=(1, 2)).reshape(-1, 1, 1)
GT_opd_batch -= np.mean(GT_opd_batch, axis=(1, 2)).reshape(-1, 1, 1)
# Obscure the OPDs
opd_batch *= np_obscurations
GT_opd_batch *= np_obscurations
# Generate obscuration mask
obsc_mask = np_obscurations > 0
nb_mask_elems = np.sum(obsc_mask)
# Compute the OPD RMSE with the masked obscurations
res_opd = np.sqrt(
np.array(
[
np.sum((im1[obsc_mask] - im2[obsc_mask]) ** 2) / nb_mask_elems
for im1, im2 in zip(opd_batch, GT_opd_batch)
]
)
)
GT_opd_mean = np.sqrt(
np.array(
[np.sum(im2[obsc_mask] ** 2) / nb_mask_elems for im2 in GT_opd_batch]
)
)
# RMSE calculations
rmse_vals[counter:end_sample] = res_opd
rel_rmse_vals[counter:end_sample] = 100.0 * (res_opd / GT_opd_mean)
# Add the results to the lists
counter += batch_size
# Calculate final values
rmse = np.mean(rmse_vals)
rel_rmse = np.mean(rel_rmse_vals)
rmse_std = np.std(rmse_vals)
rel_rmse_std = np.std(rel_rmse_vals)
# Print RMSE values
logger.info("Absolute RMSE:\t %.4e \t +/- %.4e" % (rmse, rmse_std))
logger.info("Relative RMSE:\t %.4e %% \t +/- %.4e %%" % (rel_rmse, rel_rmse_std))
return rmse, rel_rmse, rmse_std, rel_rmse_std
[docs]
def compute_shape_metrics(
tf_semiparam_field,
GT_tf_semiparam_field,
simPSF_np,
SEDs,
tf_pos,
n_bins_lda,
n_bins_gt,
output_Q=1,
output_dim=64,
batch_size=16,
opt_stars_rel_pix_rmse=False,
dataset_dict=None,
):
"""Compute the pixel, shape and size RMSE of a PSF model.
This is done at a specific sampling and output image dimension.
It is done for polychromatic PSFs so SEDs are needed.
Parameters
----------
tf_semiparam_field: PSF field object
Trained model to evaluate.
GT_tf_semiparam_field: PSF field object
Ground truth model to produce GT observations at any position
and wavelength.
simPSF_np:
SEDs: numpy.ndarray [batch x SED_samples x 2]
SED samples for the corresponding positions.
tf_pos: Tensor [batch x 2]
Positions at where to predict the PSFs.
n_bins_lda: int
Number of wavelength bins to use for the polychromatic PSF.
n_bins_gt: int
Number of wavelength bins to use for the ground truth polychromatic PSF.
output_Q: int
Downsampling rate to match the specified telescope's sampling. The value
of `output_Q` should be equal to `oversampling_rate` in order to have
the right pixel sampling corresponding to the telescope characteristics
`pix_sampling`, `tel_diameter`, `tel_focal_length`. The final
oversampling obtained is `oversampling_rate/output_Q`.
Default is `1`, so the output psf will be super-resolved by a factor of
`oversampling_rate`. TLDR: better use `1` and measure shapes on the
super-resolved PSFs.
output_dim: int
Output dimension of the square PSF stamps.
batch_size: int
Batch size to process the PSF estimations.
opt_stars_rel_pix_rmse: bool
If `True`, the relative pixel RMSE of each star is added to ther saving dictionary.
The summary statistics are always computed.
Default is `False`.
dataset_dict: dict
Dictionary containing the dataset information. If provided, and if the `'super_res_stars'`
key is present, the noiseless super resolved stars from the dataset are used to compute
the metrics. Otherwise, the stars are generated from the GT model.
Default is `None`.
Returns
-------
result_dict: dict
Dictionary with all the results.
"""
# Save original output_Q and output_dim
original_out_Q = tf_semiparam_field.output_Q
original_out_dim = tf_semiparam_field.output_dim
GT_original_out_Q = GT_tf_semiparam_field.output_Q
GT_original_out_dim = GT_tf_semiparam_field.output_dim
# Set the required output_Q and output_dim parameters in the models
tf_semiparam_field.set_output_Q(output_Q=output_Q, output_dim=output_dim)
GT_tf_semiparam_field.set_output_Q(output_Q=output_Q, output_dim=output_dim)
# Need to compile the models again
tf_semiparam_field = build_PSF_model(tf_semiparam_field)
GT_tf_semiparam_field = build_PSF_model(GT_tf_semiparam_field)
# Generate SED data list
packed_SED_data = [
utils.generate_packed_elems(_sed, simPSF_np, n_bins=n_bins_lda) for _sed in SEDs
]
# Prepare inputs
tf_packed_SED_data = tf.convert_to_tensor(packed_SED_data, dtype=tf.float32)
tf_packed_SED_data = tf.transpose(tf_packed_SED_data, perm=[0, 2, 1])
pred_inputs = [tf_pos, tf_packed_SED_data]
# PSF model
predictions = tf_semiparam_field.predict(x=pred_inputs, batch_size=batch_size)
# GT data preparation
if (
dataset_dict is None
or "super_res_stars" not in dataset_dict
or "SR_stars" not in dataset_dict
):
logger.info("Generating GT super resolved stars from the GT model.")
# Change interpolation parameters for the GT simPSF
interp_pts_per_bin = simPSF_np.SED_interp_pts_per_bin
simPSF_np.SED_interp_pts_per_bin = 0
SED_sigma = simPSF_np.SED_sigma
simPSF_np.SED_sigma = 0
# Generate SED data list for GT model
packed_SED_data = [
utils.generate_packed_elems(_sed, simPSF_np, n_bins=n_bins_gt)
for _sed in SEDs
]
# Prepare inputs
tf_packed_SED_data = tf.convert_to_tensor(packed_SED_data, dtype=tf.float32)
tf_packed_SED_data = tf.transpose(tf_packed_SED_data, perm=[0, 2, 1])
pred_inputs = [tf_pos, tf_packed_SED_data]
# Ground Truth model
GT_predictions = GT_tf_semiparam_field.predict(
x=pred_inputs, batch_size=batch_size
)
else:
logger.info("Using super resolved stars from dataset.")
if "super_res_stars" in dataset_dict:
GT_predictions = dataset_dict["super_res_stars"]
elif "SR_stars" in dataset_dict:
GT_predictions = dataset_dict["SR_stars"]
# Calculate residuals
residuals = np.sqrt(np.mean((GT_predictions - predictions) ** 2, axis=(1, 2)))
GT_star_mean = np.sqrt(np.mean((GT_predictions) ** 2, axis=(1, 2)))
# Pixel RMSE for each star
if opt_stars_rel_pix_rmse:
stars_rel_pix_rmse = 100.0 * residuals / GT_star_mean
# RMSE calculations
pix_rmse = np.mean(residuals)
rel_pix_rmse = 100.0 * np.mean(residuals / GT_star_mean)
# STD calculations
pix_rmse_std = np.std(residuals)
rel_pix_rmse_std = 100.0 * np.std(residuals / GT_star_mean)
# Print pixel RMSE values
logger.info(
"\nPixel star absolute RMSE:\t %.4e \t +/- %.4e " % (pix_rmse, pix_rmse_std)
)
logger.info(
"Pixel star relative RMSE:\t %.4e %% \t +/- %.4e %%"
% (rel_pix_rmse, rel_pix_rmse_std)
)
# Measure shapes of the reconstructions
pred_moments = [
gs.hsm.FindAdaptiveMom(gs.Image(_pred), strict=False) for _pred in predictions
]
# Measure shapes of the reconstructions
GT_pred_moments = [
gs.hsm.FindAdaptiveMom(gs.Image(_pred), strict=False)
for _pred in GT_predictions
]
pred_e1_HSM, pred_e2_HSM, pred_R2_HSM = [], [], []
GT_pred_e1_HSM, GT_pred_e2_HSM, GT_pred_R2_HSM = [], [], []
for it in range(len(GT_pred_moments)):
if (
pred_moments[it].moments_status == 0
and GT_pred_moments[it].moments_status == 0
):
pred_e1_HSM.append(pred_moments[it].observed_shape.g1)
pred_e2_HSM.append(pred_moments[it].observed_shape.g2)
pred_R2_HSM.append(2 * (pred_moments[it].moments_sigma ** 2))
GT_pred_e1_HSM.append(GT_pred_moments[it].observed_shape.g1)
GT_pred_e2_HSM.append(GT_pred_moments[it].observed_shape.g2)
GT_pred_R2_HSM.append(2 * (GT_pred_moments[it].moments_sigma ** 2))
pred_e1_HSM = np.array(pred_e1_HSM)
pred_e2_HSM = np.array(pred_e2_HSM)
pred_R2_HSM = np.array(pred_R2_HSM)
GT_pred_e1_HSM = np.array(GT_pred_e1_HSM)
GT_pred_e2_HSM = np.array(GT_pred_e2_HSM)
GT_pred_R2_HSM = np.array(GT_pred_R2_HSM)
# Calculate metrics
# e1
e1_res = GT_pred_e1_HSM - pred_e1_HSM
e1_res_rel = (GT_pred_e1_HSM - pred_e1_HSM) / GT_pred_e1_HSM
rmse_e1 = np.sqrt(np.mean(e1_res**2))
rel_rmse_e1 = 100.0 * np.sqrt(np.mean(e1_res_rel**2))
std_rmse_e1 = np.std(e1_res)
std_rel_rmse_e1 = 100.0 * np.std(e1_res_rel)
# e2
e2_res = GT_pred_e2_HSM - pred_e2_HSM
e2_res_rel = (GT_pred_e2_HSM - pred_e2_HSM) / GT_pred_e2_HSM
rmse_e2 = np.sqrt(np.mean(e2_res**2))
rel_rmse_e2 = 100.0 * np.sqrt(np.mean(e2_res_rel**2))
std_rmse_e2 = np.std(e2_res)
std_rel_rmse_e2 = 100.0 * np.std(e2_res_rel)
# R2
R2_res = GT_pred_R2_HSM - pred_R2_HSM
rmse_R2_meanR2 = np.sqrt(np.mean(R2_res**2)) / np.mean(GT_pred_R2_HSM)
std_rmse_R2_meanR2 = np.std(R2_res / GT_pred_R2_HSM)
# Print shape/size errors
logger.info("\nsigma(e1) RMSE =\t\t %.4e \t +/- %.4e " % (rmse_e1, std_rmse_e1))
logger.info("sigma(e2) RMSE =\t\t %.4e \t +/- %.4e " % (rmse_e2, std_rmse_e2))
logger.info(
"sigma(R2)/<R2> =\t\t %.4e \t +/- %.4e " % (rmse_R2_meanR2, std_rmse_R2_meanR2)
)
# Print relative shape/size errors
logger.info(
"\nRelative sigma(e1) RMSE =\t %.4e %% \t +/- %.4e %%"
% (rel_rmse_e1, std_rel_rmse_e1)
)
logger.info(
"Relative sigma(e2) RMSE =\t %.4e %% \t +/- %.4e %%"
% (rel_rmse_e2, std_rel_rmse_e2)
)
# Print number of stars
logger.info("\nTotal number of stars: \t\t%d" % (len(GT_pred_moments)))
logger.info(
"Problematic number of stars: \t%d"
% (len(GT_pred_moments) - GT_pred_e1_HSM.shape[0])
)
# Re-et the original output_Q and output_dim parameters in the models
tf_semiparam_field.set_output_Q(
output_Q=original_out_Q, output_dim=original_out_dim
)
GT_tf_semiparam_field.set_output_Q(
output_Q=GT_original_out_Q, output_dim=GT_original_out_dim
)
# Need to compile the models again
tf_semiparam_field = build_PSF_model(tf_semiparam_field)
GT_tf_semiparam_field = build_PSF_model(GT_tf_semiparam_field)
# Moment results
result_dict = {
"pred_e1_HSM": pred_e1_HSM,
"pred_e2_HSM": pred_e2_HSM,
"pred_R2_HSM": pred_R2_HSM,
"GT_pred_e1_HSM": GT_pred_e1_HSM,
"GT_ped_e2_HSM": GT_pred_e2_HSM,
"GT_pred_R2_HSM": GT_pred_R2_HSM,
"rmse_e1": rmse_e1,
"std_rmse_e1": std_rmse_e1,
"rel_rmse_e1": rel_rmse_e1,
"std_rel_rmse_e1": std_rel_rmse_e1,
"rmse_e2": rmse_e2,
"std_rmse_e2": std_rmse_e2,
"rel_rmse_e2": rel_rmse_e2,
"std_rel_rmse_e2": std_rel_rmse_e2,
"rmse_R2_meanR2": rmse_R2_meanR2,
"std_rmse_R2_meanR2": std_rmse_R2_meanR2,
"pix_rmse": pix_rmse,
"pix_rmse_std": pix_rmse_std,
"rel_pix_rmse": rel_pix_rmse,
"rel_pix_rmse_std": rel_pix_rmse_std,
"output_Q": output_Q,
"output_dim": output_dim,
"n_bins_lda": n_bins_lda,
}
if opt_stars_rel_pix_rmse:
result_dict["stars_rel_pix_rmse"] = stars_rel_pix_rmse
return result_dict
[docs]
def gen_GT_wf_model(test_wf_file_path, pred_output_Q=1, pred_output_dim=64):
r"""Generate the ground truth model and output test PSF ar required resolution.
If `pred_output_Q=1` the resolution will be 3 times the one of Euclid.
"""
# Load dataset
wf_test_dataset = np.load(test_wf_file_path, allow_pickle=True)[()]
# Extract parameters from the wf test dataset
wf_test_params = wf_test_dataset["parameters"]
wf_test_C_poly = wf_test_dataset["C_poly"]
wf_test_pos = wf_test_dataset["positions"]
tf_test_pos = tf.convert_to_tensor(wf_test_pos, dtype=tf.float32)
wf_test_SEDs = wf_test_dataset["SEDs"]
# Generate GT model
batch_size = 16
# Generate Zernike maps
zernikes = utils.zernike_generator(
n_zernikes=wf_test_params["max_order"], wfe_dim=wf_test_params["pupil_diameter"]
)
## Generate initializations
# Prepare np input
simPSF_np = SimPSFToolkit(
zernikes,
max_order=wf_test_params["max_order"],
pupil_diameter=wf_test_params["pupil_diameter"],
output_dim=wf_test_params["output_dim"],
oversampling_rate=wf_test_params["oversampling_rate"],
output_Q=wf_test_params["output_Q"],
)
simPSF_np.gen_random_Z_coeffs(max_order=wf_test_params["max_order"])
z_coeffs = simPSF_np.normalize_zernikes(
simPSF_np.get_z_coeffs(), simPSF_np.max_wfe_rms
)
simPSF_np.set_z_coeffs(z_coeffs)
simPSF_np.generate_mono_PSF(lambda_obs=0.7, regen_sample=False)
# Obscurations
obscurations = simPSF_np.generate_pupil_obscurations(
N_pix=wf_test_params["pupil_diameter"],
N_filter=wf_test_params["LP_filter_length"],
)
tf_obscurations = tf.convert_to_tensor(obscurations, dtype=tf.complex64)
## Prepare ground truth model
# Now Zernike's as cubes
np_zernike_cube = np.zeros(
(len(zernikes), zernikes[0].shape[0], zernikes[0].shape[1])
)
for it in range(len(zernikes)):
np_zernike_cube[it, :, :] = zernikes[it]
np_zernike_cube[np.isnan(np_zernike_cube)] = 0
tf_zernike_cube = tf.convert_to_tensor(np_zernike_cube, dtype=tf.float32)
# Initialize the model
GT_tf_semiparam_field = psf_field.TF_SemiParam_field(
zernike_maps=tf_zernike_cube,
obscurations=tf_obscurations,
batch_size=batch_size,
output_Q=wf_test_params["output_Q"],
d_max_nonparam=2,
output_dim=wf_test_params["output_dim"],
n_zernikes=wf_test_params["max_order"],
d_max=wf_test_params["d_max"],
x_lims=wf_test_params["x_lims"],
y_lims=wf_test_params["y_lims"],
)
# For the Ground truth model
GT_tf_semiparam_field.tf_poly_Z_field.assign_coeff_matrix(wf_test_C_poly)
_ = GT_tf_semiparam_field.tf_np_poly_opd.alpha_mat.assign(
tf.zeros_like(GT_tf_semiparam_field.tf_np_poly_opd.alpha_mat)
)
# Set required output_Q
GT_tf_semiparam_field.set_output_Q(
output_Q=pred_output_Q, output_dim=pred_output_dim
)
GT_tf_semiparam_field = psf_field.build_PSF_model(GT_tf_semiparam_field)
packed_SED_data = [
utils.generate_packed_elems(_sed, simPSF_np, n_bins=wf_test_params["n_bins"])
for _sed in wf_test_SEDs
]
# Prepare inputs
tf_packed_SED_data = tf.convert_to_tensor(packed_SED_data, dtype=tf.float32)
tf_packed_SED_data = tf.transpose(tf_packed_SED_data, perm=[0, 2, 1])
pred_inputs = [tf_test_pos, tf_packed_SED_data]
# Ground Truth model
GT_predictions = GT_tf_semiparam_field.predict(x=pred_inputs, batch_size=batch_size)
return GT_predictions, wf_test_pos, GT_tf_semiparam_field
## Below this line there are DEPRECATED functions
[docs]
def compute_metrics(
tf_semiparam_field,
simPSF_np,
test_SEDs,
train_SEDs,
tf_test_pos,
tf_train_pos,
tf_test_stars,
tf_train_stars,
n_bins_lda,
batch_size=16,
):
# Generate SED data list
test_packed_SED_data = [
utils.generate_packed_elems(_sed, simPSF_np, n_bins=n_bins_lda)
for _sed in test_SEDs
]
tf_test_packed_SED_data = tf.convert_to_tensor(
test_packed_SED_data, dtype=tf.float32
)
tf_test_packed_SED_data = tf.transpose(tf_test_packed_SED_data, perm=[0, 2, 1])
test_pred_inputs = [tf_test_pos, tf_test_packed_SED_data]
test_predictions = tf_semiparam_field.predict(
x=test_pred_inputs, batch_size=batch_size
)
# Initialize the SED data list
packed_SED_data = [
utils.generate_packed_elems(_sed, simPSF_np, n_bins=n_bins_lda)
for _sed in train_SEDs
]
# First estimate the stars for the observations
tf_packed_SED_data = tf.convert_to_tensor(packed_SED_data, dtype=tf.float32)
tf_packed_SED_data = tf.transpose(tf_packed_SED_data, perm=[0, 2, 1])
inputs = [tf_train_pos, tf_packed_SED_data]
train_predictions = tf_semiparam_field.predict(x=inputs, batch_size=batch_size)
# Calculate RMSE values
test_res = np.sqrt(np.mean((tf_test_stars - test_predictions) ** 2))
train_res = np.sqrt(np.mean((tf_train_stars - train_predictions) ** 2))
# Calculate relative RMSE values
relative_test_res = test_res / np.sqrt(np.mean((tf_test_stars) ** 2))
relative_train_res = train_res / np.sqrt(np.mean((tf_train_stars) ** 2))
# Print RMSE values
logger.info("Test stars absolute RMSE:\t %.4e" % test_res)
logger.info("Training stars absolute RMSE:\t %.4e" % train_res)
# Print RMSE values
logger.info("Test stars relative RMSE:\t %.4e %%" % (relative_test_res * 100.0))
logger.info(
"Training stars relative RMSE:\t %.4e %%" % (relative_train_res * 100.0)
)
return test_res, train_res
[docs]
def compute_opd_metrics_mccd(
tf_semiparam_field, GT_tf_semiparam_field, test_pos, train_pos
):
"""Compute the OPD metrics."""
np_obscurations = np.real(tf_semiparam_field.obscurations.numpy())
## For test positions
# Param part
zernike_coeffs = tf_semiparam_field.tf_poly_Z_field(test_pos)
P_opd_pred = tf_semiparam_field.tf_zernike_OPD(zernike_coeffs)
# Non-Param part
NP_opd_pred = tf_semiparam_field.tf_NP_mccd_OPD.predict(test_pos)
# OPD prediction
opd_pred = tf.math.add(P_opd_pred, NP_opd_pred)
# GT model
GT_zernike_coeffs = GT_tf_semiparam_field.tf_poly_Z_field(test_pos)
GT_opd_maps = GT_tf_semiparam_field.tf_zernike_OPD(GT_zernike_coeffs)
# Compute residual and obscure the OPD
res_opd = (GT_opd_maps.numpy() - opd_pred.numpy()) * np_obscurations
# Calculate absolute RMSE values
test_opd_rmse = np.sqrt(np.mean(res_opd**2))
# Calculate relative RMSE values
relative_test_opd_rmse = test_opd_rmse / np.sqrt(
np.mean((GT_opd_maps.numpy() * np_obscurations) ** 2)
)
# Print RMSE values
logger.info("Test stars absolute OPD RMSE:\t %.4e" % test_opd_rmse)
logger.info(
"Test stars relative OPD RMSE:\t %.4e %%\n" % (relative_test_opd_rmse * 100.0)
)
## For train part
# Param part
zernike_coeffs = tf_semiparam_field.tf_poly_Z_field(train_pos)
P_opd_pred = tf_semiparam_field.tf_zernike_OPD(zernike_coeffs)
# Non-Param part
NP_opd_pred = tf_semiparam_field.tf_NP_mccd_OPD.predict(train_pos)
# OPD prediction
opd_pred = tf.math.add(P_opd_pred, NP_opd_pred)
# GT model
GT_zernike_coeffs = GT_tf_semiparam_field.tf_poly_Z_field(train_pos)
GT_opd_maps = GT_tf_semiparam_field.tf_zernike_OPD(GT_zernike_coeffs)
# Compute residual and obscure the OPD
res_opd = (GT_opd_maps.numpy() - opd_pred.numpy()) * np_obscurations
# Calculate RMSE values
train_opd_rmse = np.sqrt(np.mean(res_opd**2))
# Calculate relative RMSE values
relative_train_opd_rmse = train_opd_rmse / np.sqrt(
np.mean((GT_opd_maps.numpy() * np_obscurations) ** 2)
)
# Print RMSE values
logger.info("Train stars absolute OPD RMSE:\t %.4e" % train_opd_rmse)
logger.info(
"Train stars relative OPD RMSE:\t %.4e %%\n" % (relative_train_opd_rmse * 100.0)
)
return test_opd_rmse, train_opd_rmse
[docs]
def compute_opd_metrics_polymodel(
tf_semiparam_field, GT_tf_semiparam_field, test_pos, train_pos
):
"""Compute the OPD metrics."""
np_obscurations = np.real(tf_semiparam_field.obscurations.numpy())
## For test positions
# Param part
zernike_coeffs = tf_semiparam_field.tf_poly_Z_field(test_pos)
P_opd_pred = tf_semiparam_field.tf_zernike_OPD(zernike_coeffs)
# Non-Param part
NP_opd_pred = tf_semiparam_field.tf_np_poly_opd(test_pos)
# OPD prediction
opd_pred = tf.math.add(P_opd_pred, NP_opd_pred)
# GT model
GT_zernike_coeffs = GT_tf_semiparam_field.tf_poly_Z_field(test_pos)
GT_opd_maps = GT_tf_semiparam_field.tf_zernike_OPD(GT_zernike_coeffs)
# Compute residual and obscure the OPD
res_opd = (GT_opd_maps.numpy() - opd_pred.numpy()) * np_obscurations
# Calculate RMSE values
test_opd_rmse = np.sqrt(np.mean(res_opd**2))
# Calculate relative RMSE values
relative_test_opd_rmse = test_opd_rmse / np.sqrt(
np.mean((GT_opd_maps.numpy() * np_obscurations) ** 2)
)
# Print RMSE values
logger.info("Test stars OPD RMSE:\t %.4e" % test_opd_rmse)
logger.info(
"Test stars relative OPD RMSE:\t %.4e %%\n" % (relative_test_opd_rmse * 100.0)
)
## For train part
# Param part
zernike_coeffs = tf_semiparam_field.tf_poly_Z_field(train_pos)
P_opd_pred = tf_semiparam_field.tf_zernike_OPD(zernike_coeffs)
# Non-Param part
NP_opd_pred = tf_semiparam_field.tf_np_poly_opd(train_pos)
# OPD prediction
opd_pred = tf.math.add(P_opd_pred, NP_opd_pred)
# GT model
GT_zernike_coeffs = GT_tf_semiparam_field.tf_poly_Z_field(train_pos)
GT_opd_maps = GT_tf_semiparam_field.tf_zernike_OPD(GT_zernike_coeffs)
# Compute residual and obscure the OPD
res_opd = (GT_opd_maps.numpy() - opd_pred.numpy()) * np_obscurations
# Calculate RMSE values
train_opd_rmse = np.sqrt(np.mean(res_opd**2))
# Calculate relative RMSE values
relative_train_opd_rmse = train_opd_rmse / np.sqrt(
np.mean((GT_opd_maps.numpy() * np_obscurations) ** 2)
)
# Pritn RMSE values
logger.info("Train stars OPD RMSE:\t %.4e" % train_opd_rmse)
logger.info(
"Train stars relative OPD RMSE:\t %.4e %%\n" % (relative_train_opd_rmse * 100.0)
)
return test_opd_rmse, train_opd_rmse
[docs]
def compute_opd_metrics_param_model(
tf_semiparam_field, GT_tf_semiparam_field, test_pos, train_pos
):
"""Compute the OPD metrics."""
np_obscurations = np.real(tf_semiparam_field.obscurations.numpy())
## For test positions
# Param part
zernike_coeffs = tf_semiparam_field.tf_poly_Z_field(test_pos)
opd_pred = tf_semiparam_field.tf_zernike_OPD(zernike_coeffs)
# GT model
GT_zernike_coeffs = GT_tf_semiparam_field.tf_poly_Z_field(test_pos)
GT_opd_maps = GT_tf_semiparam_field.tf_zernike_OPD(GT_zernike_coeffs)
# Compute residual and obscure the OPD
res_opd = (GT_opd_maps.numpy() - opd_pred.numpy()) * np_obscurations
# Calculate absolute RMSE values
test_opd_rmse = np.sqrt(np.mean(res_opd**2))
# Calculate relative RMSE values
relative_test_opd_rmse = test_opd_rmse / np.sqrt(
np.mean((GT_opd_maps.numpy() * np_obscurations) ** 2)
)
# Print RMSE values
logger.info("Test stars absolute OPD RMSE:\t %.4e" % test_opd_rmse)
logger.info(
"Test stars relative OPD RMSE:\t %.4e %%\n" % (relative_test_opd_rmse * 100.0)
)
## For train part
# Param part
zernike_coeffs = tf_semiparam_field.tf_poly_Z_field(train_pos)
opd_pred = tf_semiparam_field.tf_zernike_OPD(zernike_coeffs)
# GT model
GT_zernike_coeffs = GT_tf_semiparam_field.tf_poly_Z_field(train_pos)
GT_opd_maps = GT_tf_semiparam_field.tf_zernike_OPD(GT_zernike_coeffs)
# Compute residual and obscure the OPD
res_opd = (GT_opd_maps.numpy() - opd_pred.numpy()) * np_obscurations
# Calculate RMSE values
train_opd_rmse = np.sqrt(np.mean(res_opd**2))
# Calculate relative RMSE values
relative_train_opd_rmse = train_opd_rmse / np.sqrt(
np.mean((GT_opd_maps.numpy() * np_obscurations) ** 2)
)
# Print RMSE values
logger.info("Train stars absolute OPD RMSE:\t %.4e" % train_opd_rmse)
logger.info(
"Train stars relative OPD RMSE:\t %.4e %%\n" % (relative_train_opd_rmse * 100.0)
)
return test_opd_rmse, train_opd_rmse
[docs]
def compute_one_opd_rmse(GT_tf_semiparam_field, tf_semiparam_field, pos, is_poly=False):
"""Compute the OPD map for one position."""
np_obscurations = np.real(tf_semiparam_field.obscurations.numpy())
tf_pos = tf.convert_to_tensor(pos, dtype=tf.float32)
## For test positions
# Param part
zernike_coeffs = tf_semiparam_field.tf_poly_Z_field(tf_pos)
P_opd_pred = tf_semiparam_field.tf_zernike_OPD(zernike_coeffs)
# Non-Param part
if is_poly == False:
NP_opd_pred = tf_semiparam_field.tf_NP_mccd_OPD.predict(tf_pos)
else:
NP_opd_pred = tf_semiparam_field.tf_np_poly_opd(tf_pos)
# OPD prediction
opd_pred = tf.math.add(P_opd_pred, NP_opd_pred)
# GT model
GT_zernike_coeffs = GT_tf_semiparam_field.tf_poly_Z_field(tf_pos)
GT_opd_maps = GT_tf_semiparam_field.tf_zernike_OPD(GT_zernike_coeffs)
# Compute residual and obscure the OPD
res_opd = (GT_opd_maps.numpy() - opd_pred.numpy()) * np_obscurations
# Calculate RMSE values
opd_rmse = np.sqrt(np.mean(res_opd**2))
return opd_rmse
[docs]
def plot_function(mesh_pos, residual, tf_train_pos, tf_test_pos, title="Error"):
vmax = np.max(residual)
vmin = np.min(residual)
plt.figure(figsize=(12, 8))
plt.scatter(
mesh_pos[:, 0],
mesh_pos[:, 1],
s=100,
c=residual.reshape(-1, 1),
cmap="viridis",
marker="s",
vmax=vmax,
vmin=vmin,
)
plt.colorbar()
plt.scatter(
tf_train_pos[:, 0],
tf_train_pos[:, 1],
c="k",
marker="*",
s=10,
label="Train stars",
)
plt.scatter(
tf_test_pos[:, 0],
tf_test_pos[:, 1],
c="r",
marker="*",
s=10,
label="Test stars",
)
plt.title(title)
plt.xlabel("x-axis")
plt.ylabel("y-axis")
plt.show()
[docs]
def plot_residual_maps(
GT_tf_semiparam_field,
tf_semiparam_field,
simPSF_np,
train_SEDs,
tf_train_pos,
tf_test_pos,
n_bins_lda=20,
n_points_per_dim=30,
is_poly=False,
):
# Recover teh grid limits
x_lims = tf_semiparam_field.x_lims
y_lims = tf_semiparam_field.y_lims
# Generate mesh of testing positions
x = np.linspace(x_lims[0], x_lims[1], n_points_per_dim)
y = np.linspace(y_lims[0], y_lims[1], n_points_per_dim)
x_pos, y_pos = np.meshgrid(x, y)
mesh_pos = np.concatenate(
(x_pos.flatten().reshape(-1, 1), y_pos.flatten().reshape(-1, 1)), axis=1
)
tf_mesh_pos = tf.convert_to_tensor(mesh_pos, dtype=tf.float32)
# Testing the positions
rec_x_pos = mesh_pos[:, 0].reshape(x_pos.shape)
rec_y_pos = mesh_pos[:, 1].reshape(y_pos.shape)
# Get random SED from the training catalog
SED_random_integers = np.random.choice(
np.arange(train_SEDs.shape[0]), size=mesh_pos.shape[0], replace=True
)
# Build the SED catalog for the testing mesh
mesh_SEDs = np.array([train_SEDs[_id, :, :] for _id in SED_random_integers])
# Generate SED data list
mesh_packed_SED_data = [
utils.generate_packed_elems(_sed, simPSF_np, n_bins=n_bins_lda)
for _sed in mesh_SEDs
]
# Generate inputs
tf_mesh_packed_SED_data = tf.convert_to_tensor(
mesh_packed_SED_data, dtype=tf.float32
)
tf_mesh_packed_SED_data = tf.transpose(tf_mesh_packed_SED_data, perm=[0, 2, 1])
mesh_pred_inputs = [tf_mesh_pos, tf_mesh_packed_SED_data]
# Predict mesh stars
model_mesh_preds = tf_semiparam_field.predict(x=mesh_pred_inputs, batch_size=16)
GT_mesh_preds = GT_tf_semiparam_field.predict(x=mesh_pred_inputs, batch_size=16)
# Calculate pixel RMSE for each star
pix_rmse = np.array(
[
np.sqrt(np.mean((_GT_pred - _model_pred) ** 2))
for _GT_pred, _model_pred in zip(GT_mesh_preds, model_mesh_preds)
]
)
relative_pix_rmse = np.array(
[
np.sqrt(np.mean((_GT_pred - _model_pred) ** 2))
/ np.sqrt(np.mean((_GT_pred) ** 2))
for _GT_pred, _model_pred in zip(GT_mesh_preds, model_mesh_preds)
]
)
# Plot absolute pixel error
plot_function(
mesh_pos, pix_rmse, tf_train_pos, tf_test_pos, title="Absolute pixel error"
)
# Plot relative pixel error
plot_function(
mesh_pos,
relative_pix_rmse,
tf_train_pos,
tf_test_pos,
title="Relative pixel error",
)
# Compute OPD errors
opd_rmse = np.array(
[
compute_one_opd_rmse(
GT_tf_semiparam_field, tf_semiparam_field, _pos.reshape(1, -1), is_poly
)
for _pos in mesh_pos
]
)
# Plot absolute pixel error
plot_function(
mesh_pos, opd_rmse, tf_train_pos, tf_test_pos, title="Absolute OPD error"
)
[docs]
def plot_imgs(mat, cmap="gist_stern", figsize=(20, 20)):
"""Function to plot 2D images of a tensor.
The Tensor is (batch,xdim,ydim) and the matrix of subplots is
chosen "intelligently".
"""
def dp(n, left): # returns tuple (cost, [factors])
memo = {}
if (n, left) in memo:
return memo[(n, left)]
if left == 1:
return (n, [n])
i = 2
best = n
bestTuple = [n]
while i * i <= n:
if n % i == 0:
rem = dp(n / i, left - 1)
if rem[0] + i < best:
best = rem[0] + i
bestTuple = [i] + rem[1]
i += 1
memo[(n, left)] = (best, bestTuple)
return memo[(n, left)]
n_images = mat.shape[0]
row_col = dp(n_images, 2)[1]
row_n = int(row_col[0])
col_n = int(row_col[1])
plt.figure(figsize=figsize)
idx = 0
for _i in range(row_n):
for _j in range(col_n):
plt.subplot(row_n, col_n, idx + 1)
plt.imshow(mat[idx, :, :], cmap=cmap)
plt.colorbar()
plt.title("matrix id %d" % idx)
idx += 1
plt.show()