Skip to content
69 changes: 65 additions & 4 deletions esmvalcore/esgf/_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
import os
import random
import re
import shutil
from pathlib import Path
from statistics import median
Expand Down Expand Up @@ -178,21 +179,24 @@ class ESGFFile:

Attributes
----------
urls : :class:`list` of :class:`str`
The URLs where the file can be downloaded.
dataset : str
The name of the dataset that the file is part of.
facets : dict[str,str]
Facets describing the file.
name : str
The name of the file.
size : int
The size of the file in bytes.
urls : list[str]
The URLs where the file can be downloaded.
"""

def __init__(self, results):
results = list(results)
self.name = str(Path(results[0].filename).with_suffix('.nc'))
self.size = results[0].size
self.dataset = self._get_dataset_id(results)
self.facets = self._get_facets(results)
self.urls = []
self._checksums = []
for result in results:
Expand Down Expand Up @@ -225,10 +229,64 @@ def same_file(result):
logger.debug(
"Ignoring file(s) %s containing wrong variable '%s' in"
" found in search for variable '%s'", file.urls, variable,
facets['variable'])
facets.get('variable', facets.get('variable_id', '?')))

return files

@staticmethod
def _get_facets(results):
"""Read the facets from the `dataset_id`."""
# This reads the facets from the dataset_id because the facets
# provided by ESGF are unreliable.
#
# Example dataset_id_template_ values:
# CMIP3: '%(project)s.%(institute)s.%(model)s.%(experiment)s.
# %(time_frequency)s.%(realm)s.%(ensemble)s.%(variable)s'
# CMIP5: 'cmip5.%(product)s.%(valid_institute)s.%(model)s.
# %(experiment)s.%(time_frequency)s.%(realm)s.%(cmor_table)s.
# %(ensemble)s'
# CMIP6: '%(mip_era)s.%(activity_drs)s.%(institution_id)s.
# %(source_id)s.%(experiment_id)s.%(member_id)s.%(table_id)s.
# %(variable_id)s.%(grid_label)s'
# CORDEX: 'cordex.%(product)s.%(domain)s.%(institute)s.
# %(driving_model)s.%(experiment)s.%(ensemble)s.%(rcm_name)s.
# %(rcm_version)s.%(time_frequency)s.%(variable)s'
# obs4MIPs: '%(project)s.%(institute)s.%(source_id)s.%(realm)s.
# %(time_frequency)s'
project = results[0].json['project'][0]

# Read the keys from `dataset_id_template_` and translate to our keys
template = results[0].json['dataset_id_template_'][0]
keys = re.findall(r"%\((.*?)\)s", template)
reverse_facet_map = {v: k for k, v in FACETS[project].items()}
reverse_facet_map['mip_era'] = 'project' # CMIP6 oddity
reverse_facet_map['variable_id'] = 'short_name' # CMIP6 oddity
reverse_facet_map['valid_institute'] = 'institute' # CMIP5 oddity
keys = [reverse_facet_map.get(k, k) for k in keys]
keys.append('version')
if keys[0] == 'project':
# The project is sometimes hardcoded all lowercase in the template
keys = keys[1:]

# Read values from dataset_id
# Pick the first dataset_id if there are differences in case
dataset_id = sorted(r.json['dataset_id'].split('|')[0]
for r in results)[0]
values = dataset_id.split('.')[1:]

# Compose facets
facets = {
'project': project,
}
for idx, key in enumerate(keys):
facets[key] = values[idx]
# The dataset_id does not contain the short_name for all projects,
# so get it from the filename if needed:
if 'short_name' not in facets:
facets['short_name'] = results[0].json['title'].split('_')[0]

return facets

@staticmethod
def _get_dataset_id(results):
"""Simplify dataset_id so it is always composed of the same facets."""
Expand Down Expand Up @@ -256,7 +314,10 @@ def __repr__(self):

def __eq__(self, other):
"""Compare `self` to `other`."""
return (self.dataset, self.name) == (other.dataset, other.name)
return (
isinstance(other, self.__class__)
and (self.dataset, self.name) == (other.dataset, other.name)
)

def __lt__(self, other):
"""Compare `self` to `other`."""
Expand Down
97 changes: 78 additions & 19 deletions esmvalcore/esgf/_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ def get_esgf_facets(variable):
for our_name, esgf_name in FACETS[project].items():
if our_name in variable:
values = variable[our_name]
if values == '*':
# Wildcards can be specified on ESGF by omitting the facet
continue

if isinstance(values, (tuple, list)):
values = list(values)
Expand All @@ -42,7 +45,7 @@ def get_esgf_facets(variable):
return facets


def select_latest_versions(files):
def select_latest_versions(files, versions):
"""Select only the latest version of files."""
result = []

Expand All @@ -52,14 +55,24 @@ def same_file(file):
dataset = file.dataset.rsplit('.', 1)[0]
return (dataset, file.name)

if isinstance(versions, str):
versions = (versions, )

files = sorted(files, key=same_file)
for _, versions in itertools.groupby(files, key=same_file):
versions = sorted(versions, reverse=True)
latest_version = versions[0]
for _, group in itertools.groupby(files, key=same_file):
group = sorted(group, reverse=True)
if versions:
selection = [f for f in group if f.facets['version'] in versions]
if not selection:
raise FileNotFoundError(
f"Requested versions {', '.join(versions)} of file not "
f"found. Available files: {group}")
group = selection
latest_version = group[0]
result.append(latest_version)
if len(versions) > 1:
if len(group) > 1:
logger.debug("Only using the latest version %s, not %s",
latest_version, versions[1:])
latest_version, group[1:])

return result

Expand Down Expand Up @@ -100,7 +113,6 @@ def _search_index_nodes(facets):
context = connection.new_context(
pyesgf.search.context.FileSearchContext,
**facets,
latest=True,
)
logger.debug("Searching %s for datasets using facets=%s", url, facets)
try:
Expand Down Expand Up @@ -139,8 +151,6 @@ def esgf_search_files(facets):

files = ESGFFile._from_results(results, facets)

files = select_latest_versions(files)

msg = 'none' if not files else '\n' + '\n'.join(str(f) for f in files)
logger.debug("Found the following files matching facets %s: %s", facets,
msg)
Expand All @@ -150,6 +160,10 @@ def esgf_search_files(facets):

def select_by_time(files, timerange):
"""Select files containing data between a timerange."""
if '*' in timerange:
# TODO: support * combined with a period
return files

selection = []

for file in files:
Expand Down Expand Up @@ -180,9 +194,11 @@ def find_files(*, project, short_name, dataset, **facets):
The name of the variable.
dataset : str
The name of the dataset.
**facets:
Any other search facets. Values can be strings, list of strings, or
'start_year' and 'end_year' with values of type :obj:`int`.
**facets : typing.Union[str, list[str]]
Any other search facets. The special value ``'*'`` will match anything.
If no ``version`` facet is specified, the function returns only the
latest version of each file, while other omitted facets will default
to ``'*'``.

Examples
--------
Expand Down Expand Up @@ -236,13 +252,12 @@ def find_files(*, project, short_name, dataset, **facets):
... ensemble='r1i1p1',
... domain='EUR-11',
... driver='MPI-M-MPI-ESM-LR',
... start_year=1990,
... end_year=2000,
... timerange='1990/2000',
... ) # doctest: +SKIP
[ESGFFile:cordex/output/EUR-11/CLMcom-ETH/MPI-M-MPI-ESM-LR/historical/r1i1p1/COSMO-crCLIM-v1-1/v1/mon/tas/v20191219/tas_EUR-11_MPI-M-MPI-ESM-LR_historical_r1i1p1_CLMcom-ETH-COSMO-crCLIM-v1-1_v1_mon_198101-199012.nc,
ESGFFile:cordex/output/EUR-11/CLMcom-ETH/MPI-M-MPI-ESM-LR/historical/r1i1p1/COSMO-crCLIM-v1-1/v1/mon/tas/v20191219/tas_EUR-11_MPI-M-MPI-ESM-LR_historical_r1i1p1_CLMcom-ETH-COSMO-crCLIM-v1-1_v1_mon_199101-200012.nc]
ESGFFile:cordex/output/EUR-11/CLMcom-ETH/MPI-M-MPI-ESM-LR/historical/r1i1p1/COSMO-crCLIM-v1-1/v1/mon/tas/v20191219/tas_EUR-11_MPI-M-MPI-ESM-LR_historical_r1i1p1_CLMcom-ETH-COSMO-crCLIM-v1-1_v1_mon_199101-200012.nc]

Search for a obs4MIPs dataset:
Search for an obs4MIPs dataset:

>>> find_files(
... project='obs4MIPs',
Expand All @@ -252,6 +267,48 @@ def find_files(*, project, short_name, dataset, **facets):
... ) # doctest: +SKIP
[ESGFFile:obs4MIPs/NASA-LaRC/CERES-EBAF/atmos/mon/v20160610/rsutcs_CERES-EBAF_L3B_Ed2-8_200003-201404.nc]

Search for any ensemble member:

>>> find_files(
... project='CMIP6',
... mip='Amon',
... short_name='tas',
... dataset='BCC-CSM2-MR',
... exp='historical',
... ensemble='*',
... ) # doctest: +SKIP
[ESGFFile:CMIP6/CMIP/BCC/BCC-CSM2-MR/historical/r1i1p1f1/Amon/tas/gn/v20181126/tas_Amon_BCC-CSM2-MR_historical_r1i1p1f1_gn_185001-201412.nc,
ESGFFile:CMIP6/CMIP/BCC/BCC-CSM2-MR/historical/r2i1p1f1/Amon/tas/gn/v20181115/tas_Amon_BCC-CSM2-MR_historical_r2i1p1f1_gn_185001-201412.nc,
ESGFFile:CMIP6/CMIP/BCC/BCC-CSM2-MR/historical/r3i1p1f1/Amon/tas/gn/v20181119/tas_Amon_BCC-CSM2-MR_historical_r3i1p1f1_gn_185001-201412.nc]

Search for all available versions of a file:

>>> find_files(
... project='CMIP5',
... mip='Amon',
... short_name='tas',
... dataset='CCSM4',
... exp='historical',
... ensemble='r1i1p1',
... version='*',
... ) # doctest: +SKIP
[ESGFFile:cmip5/output1/NCAR/CCSM4/historical/mon/atmos/Amon/r1i1p1/v20121031/tas_Amon_CCSM4_historical_r1i1p1_185001-200512.nc,
ESGFFile:cmip5/output1/NCAR/CCSM4/historical/mon/atmos/Amon/r1i1p1/v20130425/tas_Amon_CCSM4_historical_r1i1p1_185001-200512.nc,
ESGFFile:cmip5/output1/NCAR/CCSM4/historical/mon/atmos/Amon/r1i1p1/v20160829/tas_Amon_CCSM4_historical_r1i1p1_185001-200512.nc]

Search for a specific version of a file:

>>> find_files(
... project='CMIP5',
... mip='Amon',
... short_name='tas',
... dataset='CCSM4',
... exp='historical',
... ensemble='r1i1p1',
... version='v20130425',
... ) # doctest: +SKIP
[ESGFFile:cmip5/output1/NCAR/CCSM4/historical/mon/atmos/Amon/r1i1p1/v20130425/tas_Amon_CCSM4_historical_r1i1p1_185001-200512.nc]

Returns
-------
:obj:`list` of :obj:`ESGFFile`
Expand Down Expand Up @@ -287,10 +344,12 @@ def cached_search(**facets):
"""
esgf_facets = get_esgf_facets(facets)
files = esgf_search_files(esgf_facets)

if 'version' not in facets or facets['version'] != '*':
files = select_latest_versions(files, facets.get('version'))

_get_timerange_from_years(facets)
filter_timerange = (facets.get('frequency', '') != 'fx'
and 'timerange' in facets)
if filter_timerange:
if 'timerange' in facets:
files = select_by_time(files, facets['timerange'])
logger.debug("Selected files:\n%s", '\n'.join(str(f) for f in files))

Expand Down
5 changes: 4 additions & 1 deletion esmvalcore/esgf/facets.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
'dataset': 'model',
'ensemble': 'ensemble',
'exp': 'experiment',
'frequency': 'time_frequency',
'mip': 'cmor_table',
'product': 'product',
'short_name': 'variable',
},
'CMIP6': {
'activity': 'activity_drs',
'dataset': 'source_id',
'ensemble': 'variant_label',
'ensemble': 'member_id',
'exp': 'experiment_id',
'institute': 'institution_id',
'grid': 'grid_label',
'mip': 'table_id',
'short_name': 'variable',
Expand Down
Loading