Source code for shapepipe.pipeline.str_handler

"""STRING HANDLER.

This module defines a class for handling string operations.

:Author: Axel Guinot

"""

import itertools
import operator
import re

import numpy as np
from modopt.math.stats import sigma_mad


[docs]class StrInterpreter(object): """String Interpreter Class. Class to handle operation/comparison in a string. Parameters ---------- string : str String to interpret catralog : dict, recarray or str If type(catalogue) == str : assume a SExtractor fits catalogue and read parameter in it else : assume the catalogue is already open and look into it make_compare : bool If true assume a comparison in the string mask_dict : dict Dictionary containing mask usable for the operation autorun : bool If true return directly the result """ def __init__(self, string, catalogue, make_compare=False, mask_dict=None): if type(string) is not str: raise ValueError('string has to be str type') else: self._string = string if catalogue is not None: if type(catalogue) is str: file = FITSCatalogue(catalogue, SEx_catalogue=True) file.open() self._cat = file.get_data() file.close() else: self._cat = catalogue else: raise ValueError('catalogue not provided') self._make_compare = make_compare if mask_dict is not None: self._mask = mask_dict self._init_stat_function() self._comp_dict = { '<': operator.lt, '>': operator.gt, '<=': operator.le, '>=': operator.ge, '==': operator.eq, '!=': operator.ne, } self.result = self.interpret(self._string, self._make_compare)
[docs] def interpret( self, string, make_compare=False, make_func=True, make_operate=True, ): """Interpret. This function handles the different possible operations Parameters ---------- str: string string to interpret make_compare : bool If true look for a comparison make_func : bool If true look for a function make_operate : bool If true look for an operation Returns ------- array or float Result of the current operation. Notes ----- This is a recursive function. """ if make_compare: result = self._compare(string) else: if make_operate: string_split = re.split(r'\*|\/|\-|\+\s*(?![^()]*\))', string) result = self._operate(string, string_split) else: if make_func: result = self._apply_func(string) else: result = self._get_value(string) return result
[docs] def _compare(self, string): """Handle Comparison in a String. This function transform condition in a string to real condition. Parameters ---------- string : str strind containing the comparison. """ comp = '<|>|<=|>=|==|!=' if len(re.split(comp, string)) != 2: raise Exception( 'Only one comparison in [<, >, <=, >=, ==, !=] per line' ) for operator in ['<=', '>=', '<', '>', '==', '!=']: terms = re.split(operator, string) if len(terms) == 2: self._make_compare = False first = self.interpret(terms[0], self._make_compare) second = self.interpret(terms[1], self._make_compare) return self._comp_dict[operator](first, second)
[docs] def _apply_func(self, string): """Parse Input String for Function Name and Apply Function. Parameters ---------- str: string input string Returns ------- float result of the function """ str_split = re.split(r'\(|\)', string) if len(str_split) == 1: return self.interpret( str_split[0], self._make_compare, make_func=False, make_operate=False, ) elif len(str_split) == 3: str_split_2 = re.split(',', str_split[1]) if len(str_split_2) > 1: param = [ self.interpret( char, self._make_compare, make_func=False, make_operate=True, ) for char in str_split_2 ] # Evaluate statistical function, raise error if failure # occurs during computation try: res = self._stat_func[str_split[0]](*param) except Exception: raise return res else: if str_split[0] not in self._stat_func: raise KeyError( f'Invalid function \'{str_split[0]}\' in expression ' + f'\'{string}\'' ) return self._stat_func[str_split[0]](self.interpret( str_split[1], self._make_compare, make_func=False, make_operate=True, )) else: raise Exception( 'Only one function can be applied. Problem with the ' + f'term: {string}' )
[docs] def _init_stat_function(self): """Initialise Available Stat Functions. Create a dictionary containing the functions. """ self._stat_func = {} self._stat_func['mean'] = np.mean self._stat_func['median'] = np.median self._stat_func['mode'] = self._mode self._stat_func['sqrt'] = np.sqrt self._stat_func['pow'] = pow self._stat_func['log'] = np.log self._stat_func['log10'] = np.log10 self._stat_func['exp'] = np.exp self._stat_func['std'] = np.std self._stat_func['var'] = np.var self._stat_func['sigma_mad'] = self._sigma_mad self._stat_func['len'] = len self._stat_func['min'] = min self._stat_func['max'] = max self._stat_func['homogen'] = self._test_homogeneity
[docs] def _mean(self, input): """Get Mean. Compute the mean of a distribution. Parameters ---------- input : numpy.ndarray Numpy array containing the data. Returns ------- float mean, if input array has size > 0; -1, otherwise """ cat_size = len(input) if cat_size == 0: return -1 else: return np.mean()
[docs] def _mode(self, input, eps=0.001, iter_max=1000): """Get Mode. Compute the mode, the most frequent value of a continuous distribution. Parameters ---------- input : numpy.ndarray Numpy array containing the data. eps : float, optional Accuracy to achieve (default is 0.001) iter_max : int, optional Maximum number of iterations Returns ------- float mode, if input array has 10 or more elements; median, if input array has >0 and <10 elements; -1, if input array has 0 elements """ cat_size = len(input) if cat_size > 100: bins = int(float(cat_size) / 10.0) elif cat_size >= 20: bins = int(float(cat_size) / 5.0) elif cat_size > 0: return np.median(input) else: return -1 data = input diff = eps + 1.0 iteration = 0 while diff > eps: hist = np.histogram(data, bins) if hist[0].max() == 1: break b_min = hist[1][hist[0].argmax()] b_max = hist[1][hist[0].argmax() + 1] diff = b_max - b_min data = data[(data > b_min) & (data < b_max)] if iteration == iter_max: break iteration += 1 if iteration == iter_max: raise ValueError('Mode computation failed') else: mode = (b_min + b_max) / 2.0 return mode
[docs] def _sigma_mad(self, input): """Get Mean Absolute Deviation. Compute median absolute deviation (MAD). Parameters ---------- input : numpy.ndarray input data Returns ------- float MAD, if input size > 0; -1 if input size is 0 """ if len(input) == 0: return -1 else: return sigma_mad(input)
[docs] def _test_homogeneity(self, *args): """Test Homogeneity. Test homogeneity on 1D or 2D space. Parameters ---------- param1 : numpy.ndarray Array on which the homogeneity test is performed param2 : numpy.ndarray, optional Array on which the homogeneity test is performed nb_cells : int Number of cells in the space. (note : has to be a square number) Returns ------- float Percentage of inhomogeneity compared to worse possible case (based on the standard deviation) """ if len(args) == 2: n_param = 1 param = [args[0]] n_cells = args[1] elif len(args) == 3: n_param = 2 param = [args[0], args[1]] n_cells = args[2] else: raise ValueError( 'Inputs should be param_1, param_2 [optional], n_cells' ) if n_param == 2: if len(param[0]) != len(param[1]): raise ValueError( 'Both param_1 and param_2 must have the same ' + f'length : {len(param[0])}, {len(param[1])}' ) if np.sqrt(n_cells) % 1 != 0: raise ValueError('N_cells must be a square number') n_tot = len(param[0]) homo_ratio = float(n_tot) / float(n_cells) param_min = [] param_max = [] for idx in param: step = ( (np.max(idx) - np.min(idx)) / pow(n_cells, 1.0 / float(n_param)) ) param_min.append( [val for val in np.arange(np.min(idx), np.max(idx), step)] ) param_max.append( [ val for val in np.arange(np.min(idx) + step, np.max(idx) + step, step) ] ) if n_param == 1: n_obj = np.asarray([ float(len(np.where( (param[0] >= param_min[0][idx]) & (param[0] <= param_max[0][idx]) )[0])) for idx in range(int(n_cells)) ]) elif n_param == 2: it = itertools.product(range(int(np.sqrt(n_cells))), repeat=2) n_obj = np.asarray([ float(len(np.where( (param[0] >= param_min[0][idx_i]) & (param[0] <= param_max[0][idx_i]) & (param[1] >= param_min[1][idx_j]) & (param[1] <= param_max[1][idx_j]) )[0])) for idx_i, idx_j in it ]) actual_std = np.std(n_obj / homo_ratio) worse_std = np.zeros((int(n_cells), 1)) worse_std[0] = n_tot / homo_ratio worse_std = np.std(worse_std) return actual_std / worse_std * 100.0
[docs] def _operate(self, string, string_split): """Handle Operation in a String. Make operation between catalogue's parameters and/or numbers. Parameters ---------- string : str Parameter or linear combination of parameters. string_split : str String split option Returns ------- float Result of the operation Notes ----- It's used as a recursive function """ op = r'\*|\/|\-|\+\s*(?![^()]*\))' if string is None: raise ValueError('Parameter not specified') if string_split is None: raise ValueError('Parameters splited not specified') if len(re.split(op, string)) == 1: return self.interpret(string, make_operate=False) tmp = self._string_op_func( re.split(r'\+\s*(?![^()]*\))', string), string_split, operator.add, 0, ) if not np.isscalar(tmp) or tmp != 'pass': return tmp else: tmp = self._string_op_func( re.split(r'\-\s*(?![^()]*\))', string), string_split, operator.sub, 'init', ) if not np.isscalar(tmp) or tmp != 'pass': return tmp else: tmp = self._string_op_func( re.split(r'\*\s*(?![^()]*\))', string), string_split, operator.mul, 1, ) if not np.isscalar(tmp) or tmp != 'pass': return tmp else: return self._string_op_func( re.split(r'\/\s*(?![^()]*\))', string), string_split, operator.truediv, 'init', )
[docs] def _string_op_func(self, string_op, string_split, op, tmp): r"""Perform a Specified Operation. This function handle the posible operation between parameters. Parameters ---------- string_op : list List of parameters to operate. string_split : list The different parameter splitted using '\*|\/|\-|\+\s*(?![^()]*\))' as delimiter. op : func The kind of operation provided as an operator function (Example : operator.sub). tmp : str or float Temporary result of the global operation or value to initiate operation. Returns ------- float or str Result of the operation or 'pass' if there are remaining operations """ if len(string_op) > 2: for operator in string_op: if tmp == 'init': tmp = self._operate(operator, string_split) else: tmp = op(tmp, self._operate(operator, string_split)) return tmp elif len(string_op) == 2: if string_op[0] in string_split: first = self.interpret(string_op[0], make_operate=False) else: first = self._operate(string_op[0], string_split) if string_op[1] in string_split: second = self.interpret(string_op[1], make_operate=False) else: second = self._operate(string_op[1], string_split) return op(first, second) else: return 'pass'
[docs] def _get_value(self, string): """Get Value. Return the value of the corresponding parameter. Or return a float with a number as parameter. Parameters ---------- string : str Parameter of the catalogue. Returns ------- float Value of the parameter. Or float Notes ----- You can't perform operations here! """ if string is None: raise ValueError("Parameter not specified") try: string_value = float(string) return string_value except Exception: str_split = re.split(r'\{|\}', string) if len(str_split) == 1: try: return self._cat[string] except Exception: raise ValueError( 'String has to be a float or a catalogue parameter. ' + f'{string} not found' ) if len(str_split) == 3: if str_split[1] in self._mask.keys(): try: return ( self._cat[str_split[0]][self._mask[str_split[1]]] ) except Exception: raise ValueError( 'String has to be a catalogue parameter. ' + f'{str_split[0]} not found' ) else: raise ValueError( f'Mask has to be provided. {str_split[1]} not ' + 'found in mask' )