diff --git a/doc/develop/fixing_data.rst b/doc/develop/fixing_data.rst index 6dbe5fe96b..e1647ad64f 100644 --- a/doc/develop/fixing_data.rst +++ b/doc/develop/fixing_data.rst @@ -27,6 +27,15 @@ known errors that can be fixed automatically. See `Natively supported non-CMIP datasets`_ for a list of currently supported datasets. +Discovering potential issues with the data +========================================== + +You can run `esmvaltool` in a dry run mode by applying the `--dry-run` command line option: that will +run the data finding module and the CMOR checks and fixes modules and anything that is not fixed on the fly +will result in an exception that is logged to `main_log.txt`; you can check those potential issues +and follow up with implementing the needed fixes as described below. This feature works fully only with +`max_parallel_tasks: 1` in `config-user.yml`; however, it is very fast so even limited on a single thread +it should take much less time than running the full recipe. Fix structure ============= diff --git a/esmvalcore/_main.py b/esmvalcore/_main.py index 73272d6ba0..9ac93caf0b 100755 --- a/esmvalcore/_main.py +++ b/esmvalcore/_main.py @@ -46,7 +46,7 @@ """ + __doc__ -def process_recipe(recipe_file, config_user): +def process_recipe(recipe_file, config_user, dry_run): """Process recipe.""" import datetime import os @@ -97,7 +97,7 @@ def process_recipe(recipe_file, config_user): shutil.copy2(recipe_file, config_user['run_dir']) # parse recipe - recipe = read_recipe_file(recipe_file, config_user) + recipe = read_recipe_file(recipe_file, config_user, dry_run) logger.debug("Recipe summary:\n%s", recipe) # run @@ -311,6 +311,7 @@ def run(recipe, skip_nonexistent=False, synda_download=False, diagnostics=None, + dry_run=False, check_level='default', **kwargs): """Execute an ESMValTool recipe. @@ -339,6 +340,11 @@ def run(recipe, Only run the selected diagnostics from the recipe. To provide more than one diagnostic to filter use the syntax 'diag1 diag2/script1' or '("diag1", "diag2/script1")' and pay attention to the quotes. + dry_run: bool, optional + Check data and CMOR compliance and return a report. This feature + works fully only with "max_parallel_tasks: 1" in config-user.yml, + however, it is very fast so even on a single thread it should + take much less time. check_level: str, optional Configure the sensitivity of the CMOR check. Possible values are: `ignore` (all errors will be reported as warnings), @@ -407,7 +413,9 @@ def _check_limit(limit, value): resource_log = os.path.join(cfg['run_dir'], 'resource_usage.txt') from ._task import resource_usage_logger with resource_usage_logger(pid=os.getpid(), filename=resource_log): - process_recipe(recipe_file=recipe, config_user=cfg) + process_recipe(recipe_file=recipe, + config_user=cfg, + dry_run=dry_run) if os.path.exists(cfg["preproc_dir"]) and cfg["remove_preproc_dir"]: logger.info("Removing preproc containing preprocessed data") diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index bf536c84cc..d8c18eb045 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -46,15 +46,18 @@ TASKSEP = os.sep -def read_recipe_file(filename, config_user, initialize_tasks=True): +def read_recipe_file(filename, + config_user, + dry_run=False, + initialize_tasks=True): """Read a recipe from file.""" check.recipe_with_schema(filename) with open(filename, 'r') as file: raw_recipe = yaml.safe_load(file) - return Recipe(raw_recipe, config_user, initialize_tasks, + dry_check=dry_run, recipe_file=filename) @@ -211,7 +214,12 @@ def _dataset_to_file(variable, config_user): if files: variable = required_var break - check.data_availability(files, variable, dirnames, filenames) + dryrun = config_user.get('dry-run') + check.data_availability(files, + variable, + dirnames, + filenames, + dryrun=dryrun) return files[0] @@ -293,14 +301,20 @@ def _get_default_settings(variable, config_user, derive=False): 'units': variable['units'], } + raise_exception = True + if 'dry-run' in config_user: + raise_exception = False + # Configure CMOR metadata check settings['cmor_check_metadata'] = { 'cmor_table': variable['project'], 'mip': variable['mip'], 'short_name': variable['short_name'], 'frequency': variable['frequency'], + 'raise_exception': raise_exception, 'check_level': config_user.get('check_level', CheckLevels.DEFAULT) } + # Configure final CMOR data check settings['cmor_check_data'] = dict(settings['cmor_check_metadata']) @@ -311,7 +325,14 @@ def _get_default_settings(variable, config_user, derive=False): } # Configure saving cubes to file - settings['save'] = {'compress': config_user['compress_netcdf']} + if 'dry-run' in config_user: + if not config_user['dry-run']: + settings['save'] = {'compress': config_user['compress_netcdf']} + else: + settings['save'] = {'dryrun': True} + else: + settings['save'] = {'compress': config_user['compress_netcdf']} + if variable['short_name'] != variable['original_short_name']: settings['save']['alias'] = variable['short_name'] @@ -532,7 +553,15 @@ def _get_ancestors(variable, config_user): logger.info("Using input files for variable %s of dataset %s:\n%s", variable['short_name'], variable['dataset'], '\n'.join(input_files)) - check.data_availability(input_files, variable, dirnames, filenames) + + if (not config_user.get('skip-nonexistent') + or variable['dataset'] == variable.get('reference_dataset')): + dryrun = config_user.get('dry-run') + check.data_availability(input_files, + variable, + dirnames, + filenames, + dryrun=dryrun) # Set up provenance tracking for i, filename in enumerate(input_files): @@ -722,11 +751,14 @@ def _get_preprocessor_products(variables, profile, order, ancestor_products, try: ancestors = _get_ancestors(variable, config_user) except RecipeError as ex: - if _allow_skipping(ancestors, variable, config_user): + if _allow_skipping( + ancestors, variable, config_user + ) or config_user.get('dry-run') and not ancestors: logger.info("Skipping: %s", ex.message) else: missing_vars.add(ex.message) continue + product = PreprocessorFile( attributes=variable, settings=settings, @@ -790,21 +822,25 @@ def _get_single_preprocessor_task(variables, name=name, ) - if not products: + if not products and not config_user.get('dry-run'): raise RecipeError( "Did not find any input data for task {}".format(name)) - task = PreprocessingTask( - products=products, - ancestors=ancestor_tasks, - name=name, - order=order, - debug=config_user['save_intermediary_cubes'], - write_ncl_interface=config_user['write_ncl_interface'], - ) + if not products and config_user.get('dry-run'): + task = [] + else: + task = PreprocessingTask( + products=products, + ancestors=ancestor_tasks, + name=name, + order=order, + debug=config_user['save_intermediary_cubes'], + write_ncl_interface=config_user['write_ncl_interface'], + ) - logger.info("PreprocessingTask %s created. It will create the files:\n%s", - task.name, '\n'.join(p.filename for p in task.products)) + logger.info("PreprocessingTask %s created. \ + It will create the files:\n%s", + task.name, '\n'.join(p.filename for p in task.products)) return task @@ -886,7 +922,8 @@ def append(group_prefix, var): return derive_input -def _get_preprocessor_task(variables, profiles, config_user, task_name): +def _get_preprocessor_task(variables, profiles, config_user, task_name, + dry_check): """Create preprocessor task(s) for a set of datasets.""" # First set up the preprocessor profile variable = variables[0] @@ -895,7 +932,10 @@ def _get_preprocessor_task(variables, profiles, config_user, task_name): raise RecipeError( "Unknown preprocessor {} in variable {} of diagnostic {}".format( preproc_name, variable['short_name'], variable['diagnostic'])) - profile = deepcopy(profiles[variable['preprocessor']]) + if not dry_check: + profile = deepcopy(profiles[variable['preprocessor']]) + else: + profile = deepcopy(profiles['default']) logger.info("Creating preprocessor '%s' task for variable '%s'", variable['preprocessor'], variable['short_name']) variables = _limit_datasets(variables, profile, @@ -945,6 +985,7 @@ def __init__(self, raw_recipe, config_user, initialize_tasks=True, + dry_check=False, recipe_file=None): """Parse a recipe file into an object.""" self._cfg = deepcopy(config_user) @@ -959,6 +1000,7 @@ def __init__(self, raw_recipe['diagnostics'], raw_recipe.get('datasets', [])) self.entity = self._initialize_provenance( raw_recipe.get('documentation', {})) + self.dry_check = dry_check try: self.tasks = self.initialize_tasks() if initialize_tasks else None except RecipeError as ex: @@ -1340,6 +1382,7 @@ def initialize_tasks(self): [variable_group], profiles=self._preprocessors, config_user=self._cfg, + dry_check=self.dry_check, task_name=task_name, ) except RecipeError as ex: @@ -1350,7 +1393,8 @@ def initialize_tasks(self): tasks.add(task) priority += 1 - # Create diagnostic tasks + # Create diagnostic tasks + if not self.dry_check: for script_name, script_cfg in diagnostic['scripts'].items(): task_name = diagnostic_name + TASKSEP + script_name logger.info("Creating diagnostic task %s", task_name) @@ -1367,10 +1411,12 @@ def initialize_tasks(self): recipe_error = RecipeError('Could not create all tasks') recipe_error.failed_tasks.extend(failed_tasks) raise recipe_error + check.tasks_valid(tasks) # Resolve diagnostic ancestors - self._resolve_diagnostic_ancestors(tasks) + if not self.dry_check: + self._resolve_diagnostic_ancestors(tasks) # Select only requested tasks tasks = tasks.flatten() diff --git a/esmvalcore/_recipe_checks.py b/esmvalcore/_recipe_checks.py index 21cff6a4a9..3b7e942a6e 100644 --- a/esmvalcore/_recipe_checks.py +++ b/esmvalcore/_recipe_checks.py @@ -100,31 +100,36 @@ def variable(var, required_keys): missing, var.get('short_name'), var.get('diagnostic'))) -def data_availability(input_files, var, dirnames, filenames): +def data_availability(input_files, var, dirnames, filenames, dryrun=False): """Check if the required input data is available.""" var = dict(var) if not input_files: - var.pop('filename', None) - logger.error("No input files found for variable %s", var) - if dirnames and filenames: - patterns = itertools.product(dirnames, filenames) - patterns = [os.path.join(d, f) for (d, f) in patterns] - if len(patterns) == 1: - msg = f': {patterns[0]}' - else: - msg = '\n{}'.format('\n'.join(patterns)) - logger.error("Looked for files matching%s", msg) - elif dirnames and not filenames: - logger.error( - "Looked for files in %s, but did not find any file pattern " - "to match against", dirnames) - elif filenames and not dirnames: - logger.error( - "Looked for files matching %s, but did not find any existing " - "input directory", filenames) - logger.error("Set 'log_level' to 'debug' to get more information") - raise RecipeError( - f"Missing data for {var['alias']}: {var['short_name']}") + if not dryrun: + var.pop('filename', None) + logger.error("No input files found for variable %s", var) + if dirnames and filenames: + patterns = itertools.product(dirnames, filenames) + patterns = [os.path.join(d, f) for (d, f) in patterns] + if len(patterns) == 1: + msg = f': {patterns[0]}' + else: + msg = '\n{}'.format('\n'.join(patterns)) + logger.error("Looked for files matching%s", msg) + elif dirnames and not filenames: + logger.error( + "Looked for files in %s, but did not find any file " + "pattern to match against", dirnames) + elif filenames and not dirnames: + logger.error( + "Looked for files matching %s, but did not find any " + "existing input directory", filenames) + logger.error("Set 'log_level' to 'debug' to get more information") + raise RecipeError( + f"Missing data for {var['alias']}: {var['short_name']}") + else: + logger.info("DRYRUN: MISSING DATA: \ + no data found for {}".format(var)) + return # check time avail only for non-fx variables if var['frequency'] == 'fx': @@ -139,8 +144,13 @@ def data_availability(input_files, var, dirnames, filenames): missing_years = required_years - available_years if missing_years: - raise RecipeError( - "No input data available for years {} in files {}".format( + if not dryrun: + raise RecipeError( + "No input data available for years {} in files {}".format( + ", ".join(str(year) for year in missing_years), + input_files)) + else: + logger.info("DRYRUN: MISSING DATA for years {} in files {}".format( ", ".join(str(year) for year in missing_years), input_files)) diff --git a/esmvalcore/_task.py b/esmvalcore/_task.py index 7e6ad4429d..1f4fef435a 100644 --- a/esmvalcore/_task.py +++ b/esmvalcore/_task.py @@ -245,11 +245,17 @@ def run(self, input_files=None): if input_files is None: input_files = [] for task in self.ancestors: - input_files.extend(task.run()) + out_file = task.run() + # dryrun outputs no physical run output file + if out_file: + input_files.extend(out_file) logger.info("Starting task %s in process [%s]", self.name, os.getpid()) start = datetime.datetime.now() - self.output_files = self._run(input_files) + # skip tasks that have ancestors but no files (eg dryrun) + if (self.ancestors and input_files) or \ + (not self.ancestors and not input_files): + self.output_files = self._run(input_files) runtime = datetime.datetime.now() - start logger.info("Successfully completed task %s (priority %s) in %s", self.name, self.priority, runtime) diff --git a/esmvalcore/cmor/check.py b/esmvalcore/cmor/check.py index 48a5113d35..1aa0f64f4f 100644 --- a/esmvalcore/cmor/check.py +++ b/esmvalcore/cmor/check.py @@ -110,11 +110,13 @@ def __init__(self, var_info, frequency=None, fail_on_error=False, + raise_exception=True, check_level=CheckLevels.DEFAULT, automatic_fixes=False): self._cube = cube self._failerr = fail_on_error + self._raise_exception = raise_exception self._check_level = check_level self._logger = logging.getLogger(__name__) self._errors = list() @@ -232,7 +234,9 @@ def report_errors(self): msg = 'There were errors in variable {}:\n{}\n in cube:\n{}' msg = msg.format(self._cube.var_name, '\n '.join(self._errors), self._cube) - raise CMORCheckError(msg) + if self._raise_exception: + raise CMORCheckError(msg) + self._logger.error(msg) def report_warnings(self): """Report detected warnings to the given logger. @@ -941,7 +945,8 @@ def _get_cmor_checker(table, mip, short_name, frequency, - fail_on_error=False, + fail_on_error=True, + raise_exception=True, check_level=CheckLevels.DEFAULT, automatic_fixes=False): """Get a CMOR checker/fixer.""" @@ -972,6 +977,7 @@ def cmor_check_metadata(cube, mip, short_name, frequency, + raise_exception=False, check_level=CheckLevels.DEFAULT): """Check if metadata conforms to variable's CMOR definition. @@ -989,6 +995,9 @@ def cmor_check_metadata(cube, Variable's short name. frequency: str Data frequency. + raise_exception: bool + Boolean operator that raises or not + the exception resulted from checker. check_level: CheckLevels Level of strictness of the checks. """ @@ -996,7 +1005,8 @@ def cmor_check_metadata(cube, mip, short_name, frequency, - check_level=check_level) + check_level=check_level, + raise_exception=raise_exception) checker(cube).check_metadata() return cube @@ -1006,6 +1016,7 @@ def cmor_check_data(cube, mip, short_name, frequency, + raise_exception=False, check_level=CheckLevels.DEFAULT): """Check if data conforms to variable's CMOR definition. @@ -1023,6 +1034,9 @@ def cmor_check_data(cube, Variable's short name frequency: str Data frequency + raise_exception: bool + Boolean operator that raises or not + the exception resulted from checker. check_level: CheckLevels Level of strictness of the checks. """ @@ -1030,7 +1044,8 @@ def cmor_check_data(cube, mip, short_name, frequency, - check_level=check_level) + check_level=check_level, + raise_exception=raise_exception) checker(cube).check_data() return cube diff --git a/esmvalcore/preprocessor/__init__.py b/esmvalcore/preprocessor/__init__.py index 9fe86d140e..ede2d61e5b 100644 --- a/esmvalcore/preprocessor/__init__.py +++ b/esmvalcore/preprocessor/__init__.py @@ -377,7 +377,7 @@ def cubes(self, value): def save(self): """Save cubes to disk.""" - if self._cubes is not None: + if self._cubes is not None and 'dryrun' not in self.settings['save']: self.files = preprocess(self._cubes, 'save', **self.settings['save']) self.files = preprocess(self.files, 'cleanup', @@ -484,9 +484,11 @@ def _run(self, _): for product in self.products: product.close() - metadata_files = write_metadata(self.products, - self.write_ncl_interface) - return metadata_files + input_products = [p for p in self.products if step in p.settings] + if 'dryrun' not in input_products[0].settings['save']: + metadata_files = write_metadata(self.products, + self.write_ncl_interface) + return metadata_files def __str__(self): """Get human readable description.""" diff --git a/esmvalcore/preprocessor/_io.py b/esmvalcore/preprocessor/_io.py index 938e4b6f2e..d4863da1ea 100644 --- a/esmvalcore/preprocessor/_io.py +++ b/esmvalcore/preprocessor/_io.py @@ -199,10 +199,14 @@ def concatenate(cubes): return result -def save(cubes, filename, optimize_access='', compress=False, alias='', +def save(cubes, + filename, + optimize_access='', + compress=False, + dryrun=False, + alias='', **kwargs): - """ - Save iris cubes to file. + """Save iris cubes to file. Parameters ---------- diff --git a/tests/integration/test_recipe.py b/tests/integration/test_recipe.py index 81e3ef7720..b1f5e7649d 100644 --- a/tests/integration/test_recipe.py +++ b/tests/integration/test_recipe.py @@ -155,6 +155,7 @@ def _get_default_settings_for_chl(fix_dir, save_filename): 'mip': 'Oyr', 'short_name': 'chl', 'frequency': 'yr', + 'raise_exception': True, }, 'cmor_check_data': { 'check_level': CheckLevels.DEFAULT, @@ -162,6 +163,7 @@ def _get_default_settings_for_chl(fix_dir, save_filename): 'mip': 'Oyr', 'short_name': 'chl', 'frequency': 'yr', + 'raise_exception': True, }, 'add_fx_variables': { 'fx_variables': {}, @@ -475,7 +477,9 @@ def test_default_preprocessor(tmp_path, patched_datafinder, config_user): fix_dir = os.path.join( preproc_dir, 'CMIP5_CanESM2_Oyr_historical_r1i1p1_chl_2000-2005_fixed') + defaults = _get_default_settings_for_chl(fix_dir, product.filename) + assert product.settings == defaults @@ -581,6 +585,7 @@ def test_default_fx_preprocessor(tmp_path, patched_datafinder, config_user): 'mip': 'fx', 'short_name': 'sftlf', 'frequency': 'fx', + 'raise_exception': True, }, 'cmor_check_data': { 'check_level': CheckLevels.DEFAULT, @@ -588,6 +593,7 @@ def test_default_fx_preprocessor(tmp_path, patched_datafinder, config_user): 'mip': 'fx', 'short_name': 'sftlf', 'frequency': 'fx', + 'raise_exception': True, }, 'add_fx_variables': { 'fx_variables': {}, @@ -602,6 +608,7 @@ def test_default_fx_preprocessor(tmp_path, patched_datafinder, config_user): 'filename': product.filename, } } + assert product.settings == defaults