Skip to content
69 changes: 65 additions & 4 deletions esmvalcore/esgf/_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
import os
import random
import re
import shutil
from pathlib import Path
from statistics import median
Expand Down Expand Up @@ -178,21 +179,24 @@ class ESGFFile:

Attributes
----------
urls : :class:`list` of :class:`str`
The URLs where the file can be downloaded.
dataset : str
The name of the dataset that the file is part of.
facets : dict[str,str]
Facets describing the file.
name : str
The name of the file.
size : int
The size of the file in bytes.
urls : list[str]
The URLs where the file can be downloaded.
"""

def __init__(self, results):
results = list(results)
self.name = str(Path(results[0].filename).with_suffix('.nc'))
self.size = results[0].size
self.dataset = self._get_dataset_id(results)
self.facets = self._get_facets(results)
self.urls = []
self._checksums = []
for result in results:
Expand Down Expand Up @@ -225,10 +229,64 @@ def same_file(result):
logger.debug(
"Ignoring file(s) %s containing wrong variable '%s' in"
" found in search for variable '%s'", file.urls, variable,
facets['variable'])
facets.get('variable', facets.get('variable_id', '?')))

return files

@staticmethod
def _get_facets(results):
"""Read the facets from the `dataset_id`."""
# This reads the facets from the dataset_id because the facets
# provided by ESGF are unreliable.
#
# Example dataset_id_template_ values:
# CMIP3: '%(project)s.%(institute)s.%(model)s.%(experiment)s.
# %(time_frequency)s.%(realm)s.%(ensemble)s.%(variable)s'
# CMIP5: 'cmip5.%(product)s.%(valid_institute)s.%(model)s.
# %(experiment)s.%(time_frequency)s.%(realm)s.%(cmor_table)s.
# %(ensemble)s'
# CMIP6: '%(mip_era)s.%(activity_drs)s.%(institution_id)s.
# %(source_id)s.%(experiment_id)s.%(member_id)s.%(table_id)s.
# %(variable_id)s.%(grid_label)s'
# CORDEX: 'cordex.%(product)s.%(domain)s.%(institute)s.
# %(driving_model)s.%(experiment)s.%(ensemble)s.%(rcm_name)s.
# %(rcm_version)s.%(time_frequency)s.%(variable)s'
# obs4MIPs: '%(project)s.%(institute)s.%(source_id)s.%(realm)s.
# %(time_frequency)s'
project = results[0].json['project'][0]

# Read the keys from `dataset_id_template_` and translate to our keys
template = results[0].json['dataset_id_template_'][0]
keys = re.findall(r"%\((.*?)\)s", template)
reverse_facet_map = {v: k for k, v in FACETS[project].items()}
reverse_facet_map['mip_era'] = 'project' # CMIP6 oddity
reverse_facet_map['variable_id'] = 'short_name' # CMIP6 oddity
reverse_facet_map['valid_institute'] = 'institute' # CMIP5 oddity
keys = [reverse_facet_map.get(k, k) for k in keys]
keys.append('version')
if keys[0] == 'project':
# The project is sometimes hardcoded all lowercase in the template
keys = keys[1:]

# Read values from dataset_id
# Pick the first dataset_id if there are differences in case
dataset_id = sorted(r.json['dataset_id'].split('|')[0]
for r in results)[0]
values = dataset_id.split('.')[1:]

# Compose facets
facets = {
'project': project,
}
for idx, key in enumerate(keys):
facets[key] = values[idx]
# The dataset_id does not contain the short_name for all projects,
# so get it from the filename if needed:
if 'short_name' not in facets:
facets['short_name'] = results[0].json['title'].split('_')[0]

return facets

@staticmethod
def _get_dataset_id(results):
"""Simplify dataset_id so it is always composed of the same facets."""
Expand Down Expand Up @@ -256,7 +314,10 @@ def __repr__(self):

def __eq__(self, other):
"""Compare `self` to `other`."""
return (self.dataset, self.name) == (other.dataset, other.name)
return (
isinstance(other, self.__class__)
and (self.dataset, self.name) == (other.dataset, other.name)
)

def __lt__(self, other):
"""Compare `self` to `other`."""
Expand Down
50 changes: 29 additions & 21 deletions esmvalcore/esgf/_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,7 @@
import pyesgf.search
import requests.exceptions

from .._data_finder import (
_get_timerange_from_years,
_parse_period,
_truncate_dates,
get_start_end_date,
)
from .._data_finder import _parse_period, _truncate_dates, get_start_end_date
from ..config._esgf_pyclient import get_esgf_config
from ._download import ESGFFile
from .facets import DATASET_MAP, FACETS
Expand All @@ -26,6 +21,9 @@ def get_esgf_facets(variable):
for our_name, esgf_name in FACETS[project].items():
if our_name in variable:
values = variable[our_name]
if values == '*':
# Wildcards can be specified on ESGF by omitting the facet
continue

if isinstance(values, (tuple, list)):
values = list(values)
Expand All @@ -42,7 +40,7 @@ def get_esgf_facets(variable):
return facets


def select_latest_versions(files):
def select_latest_versions(files, versions):
"""Select only the latest version of files."""
result = []

Expand All @@ -52,14 +50,24 @@ def same_file(file):
dataset = file.dataset.rsplit('.', 1)[0]
return (dataset, file.name)

if isinstance(versions, str):
versions = (versions, )

files = sorted(files, key=same_file)
for _, versions in itertools.groupby(files, key=same_file):
versions = sorted(versions, reverse=True)
latest_version = versions[0]
for _, group in itertools.groupby(files, key=same_file):
group = sorted(group, reverse=True)
if versions:
selection = [f for f in group if f.facets['version'] in versions]
if not selection:
raise FileNotFoundError(
f"Requested versions {', '.join(versions)} of file not "
f"found. Available files: {group}")
group = selection
latest_version = group[0]
result.append(latest_version)
if len(versions) > 1:
if len(group) > 1:
logger.debug("Only using the latest version %s, not %s",
latest_version, versions[1:])
latest_version, group[1:])
Comment thread
valeriupredoi marked this conversation as resolved.

return result

Expand Down Expand Up @@ -100,7 +108,6 @@ def _search_index_nodes(facets):
context = connection.new_context(
pyesgf.search.context.FileSearchContext,
**facets,
latest=True,
)
logger.debug("Searching %s for datasets using facets=%s", url, facets)
try:
Expand Down Expand Up @@ -134,8 +141,6 @@ def esgf_search_files(facets):

files = ESGFFile._from_results(results, facets)

files = select_latest_versions(files)

msg = 'none' if not files else '\n' + '\n'.join(str(f) for f in files)
logger.debug("Found the following files matching facets %s: %s", facets,
msg)
Expand All @@ -145,6 +150,10 @@ def esgf_search_files(facets):

def select_by_time(files, timerange):
"""Select files containing data between a timerange."""
if '*' in timerange:
# TODO: support * combined with a period
return files
Comment thread
schlunma marked this conversation as resolved.

selection = []

for file in files:
Expand Down Expand Up @@ -231,8 +240,7 @@ def find_files(*, project, short_name, dataset, **facets):
... ensemble='r1i1p1',
... domain='EUR-11',
... driver='MPI-M-MPI-ESM-LR',
... start_year=1990,
... end_year=2000,
... timerange='1990/2000',
... ) # doctest: +SKIP
[ESGFFile:cordex/output/EUR-11/CLMcom-ETH/MPI-M-MPI-ESM-LR/historical/r1i1p1/COSMO-crCLIM-v1-1/v1/mon/tas/v20191219/tas_EUR-11_MPI-M-MPI-ESM-LR_historical_r1i1p1_CLMcom-ETH-COSMO-crCLIM-v1-1_v1_mon_198101-199012.nc,
ESGFFile:cordex/output/EUR-11/CLMcom-ETH/MPI-M-MPI-ESM-LR/historical/r1i1p1/COSMO-crCLIM-v1-1/v1/mon/tas/v20191219/tas_EUR-11_MPI-M-MPI-ESM-LR_historical_r1i1p1_CLMcom-ETH-COSMO-crCLIM-v1-1_v1_mon_199101-200012.nc]
Expand Down Expand Up @@ -282,10 +290,10 @@ def cached_search(**facets):
"""
esgf_facets = get_esgf_facets(facets)
files = esgf_search_files(esgf_facets)
_get_timerange_from_years(facets)
filter_timerange = (facets.get('frequency', '') != 'fx'
and 'timerange' in facets)
if filter_timerange:

files = select_latest_versions(files, facets.get('version'))

if 'timerange' in facets:
files = select_by_time(files, facets['timerange'])
logger.debug("Selected files:\n%s", '\n'.join(str(f) for f in files))

Expand Down
5 changes: 4 additions & 1 deletion esmvalcore/esgf/facets.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
'dataset': 'model',
'ensemble': 'ensemble',
'exp': 'experiment',
'frequency': 'time_frequency',
'mip': 'cmor_table',
'product': 'product',
'short_name': 'variable',
},
'CMIP6': {
'activity': 'activity_drs',
'dataset': 'source_id',
'ensemble': 'variant_label',
'ensemble': 'member_id',
'exp': 'experiment_id',
'institute': 'institution_id',
'grid': 'grid_label',
'mip': 'table_id',
'short_name': 'variable',
Expand Down
29 changes: 19 additions & 10 deletions tests/integration/esgf/test_search_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
'frequency': 'mon',
'project': 'CMIP3',
'short_name': 'tas',
'version': 'v1',
}, {
'dataset': 'inmcm4',
'ensemble': 'r1i1p1',
'exp': ['historical', 'rcp85'],
'mip': 'Amon',
'project': 'CMIP5',
'short_name': 'tas',
'version': 'v20130207',
}, {
'dataset': 'FIO-ESM',
'ensemble': 'r1i1p1',
Expand All @@ -36,17 +38,15 @@
'mip': 'Amon',
'project': 'CMIP5',
'short_name': 'tas',
'start_year': 2080,
'end_year': 2100,
'timerange': '2080/2100',
Comment thread
schlunma marked this conversation as resolved.
}, {
'dataset': 'EC-EARTH',
'ensemble': 'r1i1p1',
'exp': 'historical',
'mip': 'Amon',
'project': 'CMIP5',
'short_name': 'tas',
'start_year': 1990,
'end_year': 1999,
'timerange': '1990/1999',
}, {
'dataset': 'AWI-ESM-1-1-LR',
'ensemble': 'r1i1p1f1',
Expand All @@ -55,8 +55,8 @@
'mip': 'Amon',
'project': 'CMIP6',
'short_name': 'tas',
'start_year': 2000,
'end_year': 2001,
'timerange': '2000/2001',
'version': 'v20200212',
}, {
'dataset': 'RACMO22E',
'driver': 'MOHC-HadGEM2-ES',
Expand All @@ -66,13 +66,14 @@
'frequency': 'mon',
'project': 'CORDEX',
'short_name': 'tas',
'start_year': 1950,
'end_year': 1952,
'timerange': '1950/1952',
'version': 'v20160620',
}, {
'dataset': 'CERES-EBAF',
'frequency': 'mon',
'project': 'obs4MIPs',
'short_name': 'rsutcs',
'version': 'v20160610',
}]


Expand All @@ -84,7 +85,6 @@ def search(self, **kwargs):

class MockConnection:
def new_context(self, *args, **kwargs):
assert kwargs.pop('latest')
assert kwargs == facets
return MockFileSearchContext()

Expand Down Expand Up @@ -209,7 +209,7 @@ def test_real_search_many():
],
[
'cmip5.output1.ICHEC.EC-EARTH.historical.mon.atmos.Amon.r1i1p1'
'.v20121115',
'.v20131231',
],
[
'CMIP6.CMIP.AWI.AWI-ESM-1-1-LR.historical.r1i1p1f1.Amon.tas.gn'
Expand Down Expand Up @@ -238,6 +238,15 @@ def test_real_search_many():
print(found_datasets)
print(datasets)
assert found_datasets == datasets
print(result[0].facets)
for file in result:
for key, value in variable.items():
if key in ('start_year', 'end_year', 'timerange'):
continue
if isinstance(value, list):
assert file.facets.get(key) in value
else:
assert file.facets.get(key) == value


@pytest.mark.skip(reason="This will actually download the data")
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/test_recipe_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ def test_data_availability_nonexistent(tmp_path):
}
result = pyesgf.search.results.FileResult(
json={
'dataset_id': 'ABC',
'dataset_id': 'CMIP6.ABC.v1|something.org',
'dataset_id_template_': ["%(mip_era)s.%(source_id)s"],
'project': ['CMIP6'],
'size': 10,
'title': 'tas_1990-1992.nc',
Expand Down
Loading