Source code for shapepipe.modules.get_images_package.get_images

"""GET IMAGES.

This module copies all images required for processing.

:Author: Martin Kilbinger <martin.kilbinger@cea.fr>

"""

import glob
import os
import re
import sys

from shapepipe.modules.module_decorator import module_runner
from shapepipe.utilities.canfar import vosHandler


# pragma: no cover
[docs]def read_image_numbers(path): """Read Image Numbers. Read image numbers from file. Parameters ---------- path : str Input file path Returns ------- list Image numbers """ image_number_list = [] with open(path) as file: for line in file: image_number_list.append(line.strip()) return image_number_list
[docs]def in2out_pattern(number): """Get In2out Pattern. Transform input to output number pattern or image ID. Parameters ---------- number : str Input number Returns ------- str Output number """ # replace dots ('.') with dashes ('-') to avoid confusion # with file extension delimiters number_final = re.sub(r'\.', '-', number) # remove letters in number number_final = re.sub('[a-zA-Z]', '', number_final) return number_final
[docs]class GetImages(object): """Get Images. Class handling retrieval of input images. Parameters ---------- retrieve_method : str Copy/download method retrieve_option : str Retrieve options input_file_list : list Input files input_numbering : str Numbering scheme, python regexp input_file_pattern : list File pattern including input number template of input files input_file_ext : list Input file extensions output_file_pattern : list Output file patterns w_log : logging.Logger Log file check_existing_dir : str, optional If not ``None``, only retrieve image if not existing at this path (recursively) n_expected : int, optional Number of expected files per type and ID to download/check for existence n_try : int, optional Number of attempts for VOs download, default is ``3`` """ def __init__( self, retrieve_method, retrieve_options, input_file_list, input_numbering, input_file_pattern, input_file_ext, output_file_pattern, w_log, check_existing_dir=None, n_expected=None, n_try=3, ): self._retrieve_method = retrieve_method self._retrieve_options = retrieve_options self._input_file_list = input_file_list self._input_numbering = input_numbering self._input_file_pattern = input_file_pattern self._input_file_ext = input_file_ext self._output_file_pattern = output_file_pattern self._w_log = w_log self._check_existing_dir = check_existing_dir self._n_expected = n_expected self._n_try = n_try
[docs] def process(self, input_dir, output_dir): """Process. Main function to process GetImages. Parameters ---------- input_dir : str Input directory output_dir : str Output directory """ # Input image numbers from all input tile files all_image_numbers = [] for input_file in self._input_file_list: numbers_from_tile = read_image_numbers(input_file[0]) all_image_numbers.append(numbers_from_tile) # List of unique input images flat_list = [item for sublist in all_image_numbers for item in sublist] self._w_log.info(f'Number of total image IDs = {len(flat_list)}') # Get unique number list image_number_list = list(set(flat_list)) self._w_log.info( f'Number of unique image IDs = {len(image_number_list)}' ) # Create array to make it compatible with input dir nitem = len(input_dir) # Make sure output_dir is list and compatible to input lists output_dir = [output_dir] * nitem # Check consistency of list lengths if any( len(lst) != nitem for lst in [ input_dir, self._input_file_pattern, self._input_file_ext, self._output_file_pattern ] ): raise ValueError( f'Lists INPUT_PATH ({len(input_dir)}), ' + f'INPUT_FILE_PATTERN ({len(self._input_file_pattern)}), ' + f'INPUT_FILE_EXT ({len(self._input_file_ext)}), ' + f'OUTPUT_FILE_PATTERN ({len(self._output_file_pattern)}) ' + 'need to have equal length' ) # Assemble input and output file lists all_inputs = self.get_file_list( image_number_list, input_dir, use_output_file_pattern=False ) all_outputs = self.get_file_list( image_number_list, output_dir, use_output_file_pattern=True ) # Retrieve files self.retrieve(all_inputs, all_outputs)
[docs] def get_file_list( self, image_number_list, dest_dir, use_output_file_pattern=False, ): """Get File List. Return lists of file paths to retrieve. Parameters ---------- image_number_list : list Image numbers dest_dir : list Input directory or url use_output_file_pattern : bool, optional If ``True``, use output file base patterns excluding numbering scheme; if ``False``, use input file patterns; default is ``False`` Returns ------- list Complete file paths, one list for each input file type """ list_all_files = [] for idx in range(len(dest_dir)): in_path = dest_dir[idx] in_pattern = self._input_file_pattern[idx] in_ext = self._input_file_ext[idx] list_files_per_type = [] for number in image_number_list: if use_output_file_pattern: # Transform input to output number patterns number_final = in2out_pattern(number) # Keep initial dot in extension x = in_ext[1:] x2 = re.sub(r'\.', '', x) ext_final = in_ext[0] + x2 fbase = ( f'{self._output_file_pattern[idx]}{number_final}' ) else: fbase = re.sub(self._input_numbering, number, in_pattern) ext_final = in_ext if ( use_output_file_pattern and self._output_file_pattern[idx] == '*' ): # retrieve all input files to output dir, do not append # extension fpath = in_path else: fpath = f'{in_path}/{fbase}{ext_final}' list_files_per_type.append(fpath) list_all_files.append(list_files_per_type) return list_all_files
[docs] def retrieve(self, all_inputs, all_outputs): """Retrieve. Retrieve all files. Parameters ---------- all_inputs: list Input file paths, one list for each input file type all_outputs: list Output file paths, one list for each input file type """ for in_per_type, out_per_type in zip(all_inputs, all_outputs): for idx in range(len(in_per_type)): if self._check_existing_dir: out_base = os.path.basename(out_per_type[idx]) path = glob.glob( f'{self._check_existing_dir}/**/{out_base}', recursive=True, ) if path: if len(path) == self._n_expected: self._w_log.info( f'{path[0]} found, skipping download' ) continue else: self._w_log.info( f'{len(path)} instead of {self._n_expected} ' + 'existing files found at' + f' {self._check_existing_dir}' + ', downloading images' ) else: self._w_log.info( 'No existing images found at' + f' {self._check_existing_dir},' + ' downloading images' ) self.retrieve_one(in_per_type[idx], out_per_type[idx])
[docs] def retrieve_one(self, in_path, out_path): """Retrieve One. Retrieve one file. Parameters ---------- in_path : str Input path out_path : str Output path """ if self._retrieve_method == 'vos': sys.argv = [] sys.argv.append('vcp') if self._retrieve_options: for opt in self._retrieve_options.split(' '): sys.argv.append(opt) sys.argv.append(in_path) sys.argv.append(out_path) log_cmd = ' '.join(sys.argv) vcp = vosHandler('vcp') self._w_log.info(log_cmd) attempt = 0 while attempt < self._n_try: try: vcp() self._w_log.info( 'Success of command vcp after ' + f'{attempt}/{self._n_try} attempts' ) break except Exception: attempt += 1 self._w_log.info( 'Error with command vcp, attempt ' + f'{attempt}/{self._n_try}' ) sys.argv = None elif self._retrieve_method == 'symlink': src = in_path # Get all input file names if INPUT_FILE_PATTERN contains '*' all_src = glob.glob(src) if len(all_src) == 0: raise IndexError( f'No input file found corresponding to \'{src}\'' ) dst = out_path for src in all_src: if os.path.isdir(dst): # OUTPUT_FILE_PATTERN is '*', so dst is not regular file # but directory. Append input file name dst_name = f'{dst}/{os.path.basename(src)}' else: # dst is regular file dst_name = dst os.symlink(src, dst_name)