Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
8d30979
started implementing dry data checks
valeriupredoi Oct 11, 2019
7c51fd7
started implementing dry data checks
valeriupredoi Oct 11, 2019
8a16687
inching closer
valeriupredoi Oct 11, 2019
cdf0491
switched to an overall dry-run
valeriupredoi Oct 11, 2019
f64237b
first working version of dry-run
valeriupredoi Oct 11, 2019
febfb6a
adding dry-run to config dict
valeriupredoi Oct 11, 2019
62a3dce
adding dry-run handling
valeriupredoi Oct 11, 2019
f6c7da5
tweaked func argument
valeriupredoi Oct 11, 2019
1ff047b
major simplification
valeriupredoi Oct 11, 2019
da8b8ee
Add option to not raise in cmor checks
Oct 14, 2019
005c9ef
linter test fix
valeriupredoi Oct 15, 2019
6d67cbf
removed saving
valeriupredoi Oct 15, 2019
3cfd1b8
no saving if dry run
valeriupredoi Oct 15, 2019
2c87a4c
no saving if dry run
valeriupredoi Oct 15, 2019
465d070
no saving if dry run
valeriupredoi Oct 15, 2019
df2d765
changed a bit
valeriupredoi Oct 15, 2019
660e33c
changed a bit to accomodate a slightly diffrnt handling
valeriupredoi Oct 15, 2019
120eb43
working version not to save
valeriupredoi Oct 17, 2019
f9cead9
working version not to save
valeriupredoi Oct 17, 2019
207ec8b
working version not to save
valeriupredoi Oct 17, 2019
68c860c
last change so the ancestry is not asked for if dry run
valeriupredoi Oct 17, 2019
b495d1a
added section to documentation for dry running
valeriupredoi Oct 17, 2019
1b78b2d
Merge branch 'development' into development_dry_checks
valeriupredoi Oct 24, 2019
867b454
fixed very odd code change that maight have come through from merging…
valeriupredoi Oct 24, 2019
48d79ea
modified checker in light of dryrun
valeriupredoi Oct 24, 2019
1ff6986
modified data checks in light of dryrun
valeriupredoi Oct 24, 2019
38ee9af
total skip if no data available in dryrun
valeriupredoi Oct 24, 2019
5e7ac8a
fix for None file output for dryrun
valeriupredoi Oct 24, 2019
4d0232b
skip running tasks that dont have physical files
valeriupredoi Oct 24, 2019
0e6e1cc
reverted to old message
valeriupredoi Oct 25, 2019
51c2abe
edited error messages
valeriupredoi Oct 25, 2019
72fd668
added single-thread condition for dryrun
valeriupredoi Oct 25, 2019
3333f5a
added note on single threading
valeriupredoi Oct 25, 2019
9691b4b
Merge branch 'development' into development_dry_checks
valeriupredoi Nov 13, 2019
026c0a0
actual use of Javi's exception handler
valeriupredoi Nov 13, 2019
d0cd401
actual use of Javi's exception handler
valeriupredoi Nov 13, 2019
3203b55
actual use of Javi's exception handler
valeriupredoi Nov 13, 2019
1929d07
Merge branch 'development' into development_dry_checks
valeriupredoi Nov 15, 2019
6345cda
Merge branch 'development' into development_dry_checks
valeriupredoi Nov 19, 2019
753934e
Merge branch 'master' into development_dry_checks
valeriupredoi Feb 3, 2020
12c0c2e
fixed the fix for conflict
valeriupredoi Feb 3, 2020
9cd0e72
fixed test
valeriupredoi Feb 3, 2020
f591b77
Merge branch 'master' into development_dry_checks
stefsmeets May 20, 2021
10e6125
Fix flake8 errors and formatting warnings
stefsmeets May 21, 2021
f4fcbdc
Fix merge errors
stefsmeets May 21, 2021
36165bf
Fix CLI description
stefsmeets May 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions doc/develop/fixing_data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ known errors that can be fixed automatically.
See `Natively supported non-CMIP datasets`_ for a list of currently supported
datasets.

Discovering potential issues with the data
==========================================

You can run `esmvaltool` in a dry run mode by applying the `--dry-run` command line option: that will
run the data finding module and the CMOR checks and fixes modules and anything that is not fixed on the fly
will result in an exception that is logged to `main_log.txt`; you can check those potential issues
and follow up with implementing the needed fixes as described below. This feature works fully only with
`max_parallel_tasks: 1` in `config-user.yml`; however, it is very fast so even limited on a single thread
it should take much less time than running the full recipe.

Fix structure
=============
Expand Down
14 changes: 11 additions & 3 deletions esmvalcore/_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
""" + __doc__


def process_recipe(recipe_file, config_user):
def process_recipe(recipe_file, config_user, dry_run):
"""Process recipe."""
import datetime
import os
Expand Down Expand Up @@ -97,7 +97,7 @@ def process_recipe(recipe_file, config_user):
shutil.copy2(recipe_file, config_user['run_dir'])

# parse recipe
recipe = read_recipe_file(recipe_file, config_user)
recipe = read_recipe_file(recipe_file, config_user, dry_run)
logger.debug("Recipe summary:\n%s", recipe)

# run
Expand Down Expand Up @@ -311,6 +311,7 @@ def run(recipe,
skip_nonexistent=False,
synda_download=False,
diagnostics=None,
dry_run=False,
check_level='default',
**kwargs):
"""Execute an ESMValTool recipe.
Expand Down Expand Up @@ -339,6 +340,11 @@ def run(recipe,
Only run the selected diagnostics from the recipe. To provide more
than one diagnostic to filter use the syntax 'diag1 diag2/script1'
or '("diag1", "diag2/script1")' and pay attention to the quotes.
dry_run: bool, optional
Check data and CMOR compliance and return a report. This feature
works fully only with "max_parallel_tasks: 1" in config-user.yml,
however, it is very fast so even on a single thread it should
take much less time.
check_level: str, optional
Configure the sensitivity of the CMOR check. Possible values are:
`ignore` (all errors will be reported as warnings),
Expand Down Expand Up @@ -407,7 +413,9 @@ def _check_limit(limit, value):
resource_log = os.path.join(cfg['run_dir'], 'resource_usage.txt')
from ._task import resource_usage_logger
with resource_usage_logger(pid=os.getpid(), filename=resource_log):
process_recipe(recipe_file=recipe, config_user=cfg)
process_recipe(recipe_file=recipe,
config_user=cfg,
dry_run=dry_run)

if os.path.exists(cfg["preproc_dir"]) and cfg["remove_preproc_dir"]:
logger.info("Removing preproc containing preprocessed data")
Expand Down
88 changes: 67 additions & 21 deletions esmvalcore/_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,18 @@
TASKSEP = os.sep


def read_recipe_file(filename, config_user, initialize_tasks=True):
def read_recipe_file(filename,
config_user,
dry_run=False,
initialize_tasks=True):
"""Read a recipe from file."""
check.recipe_with_schema(filename)
with open(filename, 'r') as file:
raw_recipe = yaml.safe_load(file)

return Recipe(raw_recipe,
config_user,
initialize_tasks,
dry_check=dry_run,
recipe_file=filename)


Expand Down Expand Up @@ -211,7 +214,12 @@ def _dataset_to_file(variable, config_user):
if files:
variable = required_var
break
check.data_availability(files, variable, dirnames, filenames)
dryrun = config_user.get('dry-run')
check.data_availability(files,
variable,
dirnames,
filenames,
dryrun=dryrun)
return files[0]


Expand Down Expand Up @@ -293,14 +301,20 @@ def _get_default_settings(variable, config_user, derive=False):
'units': variable['units'],
}

raise_exception = True
if 'dry-run' in config_user:
raise_exception = False

# Configure CMOR metadata check
settings['cmor_check_metadata'] = {
'cmor_table': variable['project'],
'mip': variable['mip'],
'short_name': variable['short_name'],
'frequency': variable['frequency'],
'raise_exception': raise_exception,
'check_level': config_user.get('check_level', CheckLevels.DEFAULT)
}

# Configure final CMOR data check
settings['cmor_check_data'] = dict(settings['cmor_check_metadata'])

Expand All @@ -311,7 +325,14 @@ def _get_default_settings(variable, config_user, derive=False):
}

# Configure saving cubes to file
settings['save'] = {'compress': config_user['compress_netcdf']}
if 'dry-run' in config_user:
if not config_user['dry-run']:
settings['save'] = {'compress': config_user['compress_netcdf']}
else:
settings['save'] = {'dryrun': True}
else:
settings['save'] = {'compress': config_user['compress_netcdf']}

if variable['short_name'] != variable['original_short_name']:
settings['save']['alias'] = variable['short_name']

Expand Down Expand Up @@ -532,7 +553,15 @@ def _get_ancestors(variable, config_user):
logger.info("Using input files for variable %s of dataset %s:\n%s",
variable['short_name'], variable['dataset'],
'\n'.join(input_files))
check.data_availability(input_files, variable, dirnames, filenames)

if (not config_user.get('skip-nonexistent')
or variable['dataset'] == variable.get('reference_dataset')):
dryrun = config_user.get('dry-run')
check.data_availability(input_files,
variable,
dirnames,
filenames,
dryrun=dryrun)

# Set up provenance tracking
for i, filename in enumerate(input_files):
Expand Down Expand Up @@ -722,11 +751,14 @@ def _get_preprocessor_products(variables, profile, order, ancestor_products,
try:
ancestors = _get_ancestors(variable, config_user)
except RecipeError as ex:
if _allow_skipping(ancestors, variable, config_user):
if _allow_skipping(
ancestors, variable, config_user
) or config_user.get('dry-run') and not ancestors:
logger.info("Skipping: %s", ex.message)
else:
missing_vars.add(ex.message)
continue

product = PreprocessorFile(
attributes=variable,
settings=settings,
Expand Down Expand Up @@ -790,21 +822,25 @@ def _get_single_preprocessor_task(variables,
name=name,
)

if not products:
if not products and not config_user.get('dry-run'):
raise RecipeError(
"Did not find any input data for task {}".format(name))

task = PreprocessingTask(
products=products,
ancestors=ancestor_tasks,
name=name,
order=order,
debug=config_user['save_intermediary_cubes'],
write_ncl_interface=config_user['write_ncl_interface'],
)
if not products and config_user.get('dry-run'):
task = []
else:
task = PreprocessingTask(
products=products,
ancestors=ancestor_tasks,
name=name,
order=order,
debug=config_user['save_intermediary_cubes'],
write_ncl_interface=config_user['write_ncl_interface'],
)

logger.info("PreprocessingTask %s created. It will create the files:\n%s",
task.name, '\n'.join(p.filename for p in task.products))
logger.info("PreprocessingTask %s created. \
It will create the files:\n%s",
task.name, '\n'.join(p.filename for p in task.products))

return task

Expand Down Expand Up @@ -886,7 +922,8 @@ def append(group_prefix, var):
return derive_input


def _get_preprocessor_task(variables, profiles, config_user, task_name):
def _get_preprocessor_task(variables, profiles, config_user, task_name,
dry_check):
"""Create preprocessor task(s) for a set of datasets."""
# First set up the preprocessor profile
variable = variables[0]
Expand All @@ -895,7 +932,10 @@ def _get_preprocessor_task(variables, profiles, config_user, task_name):
raise RecipeError(
"Unknown preprocessor {} in variable {} of diagnostic {}".format(
preproc_name, variable['short_name'], variable['diagnostic']))
profile = deepcopy(profiles[variable['preprocessor']])
if not dry_check:
profile = deepcopy(profiles[variable['preprocessor']])
else:
profile = deepcopy(profiles['default'])
logger.info("Creating preprocessor '%s' task for variable '%s'",
variable['preprocessor'], variable['short_name'])
variables = _limit_datasets(variables, profile,
Expand Down Expand Up @@ -945,6 +985,7 @@ def __init__(self,
raw_recipe,
config_user,
initialize_tasks=True,
dry_check=False,
recipe_file=None):
"""Parse a recipe file into an object."""
self._cfg = deepcopy(config_user)
Expand All @@ -959,6 +1000,7 @@ def __init__(self,
raw_recipe['diagnostics'], raw_recipe.get('datasets', []))
self.entity = self._initialize_provenance(
raw_recipe.get('documentation', {}))
self.dry_check = dry_check
try:
self.tasks = self.initialize_tasks() if initialize_tasks else None
except RecipeError as ex:
Expand Down Expand Up @@ -1340,6 +1382,7 @@ def initialize_tasks(self):
[variable_group],
profiles=self._preprocessors,
config_user=self._cfg,
dry_check=self.dry_check,
task_name=task_name,
)
except RecipeError as ex:
Expand All @@ -1350,7 +1393,8 @@ def initialize_tasks(self):
tasks.add(task)
priority += 1

# Create diagnostic tasks
# Create diagnostic tasks
if not self.dry_check:
for script_name, script_cfg in diagnostic['scripts'].items():
task_name = diagnostic_name + TASKSEP + script_name
logger.info("Creating diagnostic task %s", task_name)
Expand All @@ -1367,10 +1411,12 @@ def initialize_tasks(self):
recipe_error = RecipeError('Could not create all tasks')
recipe_error.failed_tasks.extend(failed_tasks)
raise recipe_error

check.tasks_valid(tasks)

# Resolve diagnostic ancestors
self._resolve_diagnostic_ancestors(tasks)
if not self.dry_check:
self._resolve_diagnostic_ancestors(tasks)

# Select only requested tasks
tasks = tasks.flatten()
Expand Down
58 changes: 34 additions & 24 deletions esmvalcore/_recipe_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,31 +100,36 @@ def variable(var, required_keys):
missing, var.get('short_name'), var.get('diagnostic')))


def data_availability(input_files, var, dirnames, filenames):
def data_availability(input_files, var, dirnames, filenames, dryrun=False):
"""Check if the required input data is available."""
var = dict(var)
if not input_files:
var.pop('filename', None)
logger.error("No input files found for variable %s", var)
if dirnames and filenames:
patterns = itertools.product(dirnames, filenames)
patterns = [os.path.join(d, f) for (d, f) in patterns]
if len(patterns) == 1:
msg = f': {patterns[0]}'
else:
msg = '\n{}'.format('\n'.join(patterns))
logger.error("Looked for files matching%s", msg)
elif dirnames and not filenames:
logger.error(
"Looked for files in %s, but did not find any file pattern "
"to match against", dirnames)
elif filenames and not dirnames:
logger.error(
"Looked for files matching %s, but did not find any existing "
"input directory", filenames)
logger.error("Set 'log_level' to 'debug' to get more information")
raise RecipeError(
f"Missing data for {var['alias']}: {var['short_name']}")
if not dryrun:
var.pop('filename', None)
logger.error("No input files found for variable %s", var)
if dirnames and filenames:
patterns = itertools.product(dirnames, filenames)
patterns = [os.path.join(d, f) for (d, f) in patterns]
if len(patterns) == 1:
msg = f': {patterns[0]}'
else:
msg = '\n{}'.format('\n'.join(patterns))
logger.error("Looked for files matching%s", msg)
elif dirnames and not filenames:
logger.error(
"Looked for files in %s, but did not find any file "
"pattern to match against", dirnames)
elif filenames and not dirnames:
logger.error(
"Looked for files matching %s, but did not find any "
"existing input directory", filenames)
logger.error("Set 'log_level' to 'debug' to get more information")
raise RecipeError(
f"Missing data for {var['alias']}: {var['short_name']}")
else:
logger.info("DRYRUN: MISSING DATA: \
no data found for {}".format(var))
return

# check time avail only for non-fx variables
if var['frequency'] == 'fx':
Expand All @@ -139,8 +144,13 @@ def data_availability(input_files, var, dirnames, filenames):

missing_years = required_years - available_years
if missing_years:
raise RecipeError(
"No input data available for years {} in files {}".format(
if not dryrun:
raise RecipeError(
"No input data available for years {} in files {}".format(
", ".join(str(year) for year in missing_years),
input_files))
else:
logger.info("DRYRUN: MISSING DATA for years {} in files {}".format(
", ".join(str(year) for year in missing_years), input_files))


Expand Down
10 changes: 8 additions & 2 deletions esmvalcore/_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,17 @@ def run(self, input_files=None):
if input_files is None:
input_files = []
for task in self.ancestors:
input_files.extend(task.run())
out_file = task.run()
# dryrun outputs no physical run output file
if out_file:
input_files.extend(out_file)
logger.info("Starting task %s in process [%s]", self.name,
os.getpid())
start = datetime.datetime.now()
self.output_files = self._run(input_files)
# skip tasks that have ancestors but no files (eg dryrun)
if (self.ancestors and input_files) or \
(not self.ancestors and not input_files):
self.output_files = self._run(input_files)
runtime = datetime.datetime.now() - start
logger.info("Successfully completed task %s (priority %s) in %s",
self.name, self.priority, runtime)
Expand Down
Loading