Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
c129966
Add recognised intake-esm datastores on NCI systems to config_develop…
charles-turner-1 Feb 4, 2025
b1b76fb
Skeleton
charles-turner-1 Feb 5, 2025
dd73d1d
Playing around
charles-turner-1 Feb 5, 2025
ed1676b
Almost at a working IntakeDataset.load()
charles-turner-1 Feb 12, 2025
fa1ea2e
Working intake-esm implementation - probably still some kinks to iron…
charles-turner-1 Feb 25, 2025
648f119
Working with multiple catalogues per project
charles-turner-1 Mar 12, 2025
2b91fec
Cleanup - mypy & ruff errors
charles-turner-1 Mar 13, 2025
c7b8ffb
Remove WIP
charles-turner-1 Mar 13, 2025
31b35cb
Update depenencies & dev environment
charles-turner-1 Mar 13, 2025
a8532a5
Pre-commit modifications
charles-turner-1 Mar 13, 2025
7e56959
Merge branch 'main' into intake-esm
charles-turner-1 Mar 13, 2025
568cb8d
Fixed most of codacy (mypy-strict?) gripes
charles-turner-1 Mar 13, 2025
91fee56
Fix typo
charles-turner-1 Mar 13, 2025
9d894b9
Beginning to work on Bouwe's comments (WIP)
charles-turner-1 Apr 2, 2025
59d0d02
Updates - restructured esmvalcore/data/intake following Bouwe's sugge…
charles-turner-1 Apr 3, 2025
2050081
Reorder imports (ruff maybe?)
charles-turner-1 May 6, 2025
59e4205
Add `_read_facets` to intake configuration: see https://github.com/in…
charles-turner-1 May 12, 2025
2527059
Add `merge_intake_seach_history` function (see https://github.com/int…
charles-turner-1 May 13, 2025
4641965
Merge branch 'main' into intake-esm
charles-turner-1 May 13, 2025
1b26148
Merge branch 'main' into intake-esm
valeriupredoi Dec 4, 2025
b77d194
readd intake
valeriupredoi Dec 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ dependencies:
- fire
- geopy
- humanfriendly
- intake >=2.0.0
- intake-esgf >=2025.10.22
- intake-esm
- intake-esm >=2025.2.3
- iris >=3.12.2 # https://github.com/SciTools/iris/issues/6417
- iris-esmf-regrid >=0.11.0
- iris-grib >=0.20.0 # github.com/ESMValGroup/ESMValCore/issues/2535
Expand Down
171 changes: 171 additions & 0 deletions esmvalcore/config/_intake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
"""esgf-pyclient configuration.

The configuration is read from the file ~/.esmvaltool/esgf-pyclient.yml.
"""

import logging
import os
import stat
from functools import lru_cache
from pathlib import Path
from typing import Any

import yaml

logger = logging.getLogger(__name__)

CONFIG_FILE = Path.home() / ".esmvaltool" / "data-intake.yml"


def read_config_file() -> dict:
"""Read the configuration from file."""
if CONFIG_FILE.exists():
logger.info("Loading Intake-ESM configuration from %s", CONFIG_FILE)
mode = os.stat(CONFIG_FILE).st_mode
if mode & stat.S_IRWXG or mode & stat.S_IRWXO:
logger.warning("Correcting unsafe permissions on %s", CONFIG_FILE)
os.chmod(CONFIG_FILE, stat.S_IRUSR | stat.S_IWUSR)
with CONFIG_FILE.open(encoding="utf-8") as file:
cfg = yaml.safe_load(file)
else:
logger.info(
"Using default Intake-ESM configuration, configuration "
"file %s not present.",
CONFIG_FILE,
)
cfg = {}

return cfg


def load_intake_config():
"""Load the intake-esm configuration."""
cfg = {
"CMIP6": {
"catalogs": {
"NCI": [
{
"file": "/g/data/fs38/catalog/v2/esm/catalog.json",
"facets": {
"activity": "activity_id",
"dataset": "source_id",
"ensemble": "member_id",
"exp": "experiment_id",
"grid": "grid_label",
"institute": "institution_id",
"mip": "table_id",
"short_name": "variable_id",
"version": "version",
"frequency": "frequency",
},
},
{
"file": "/g/data/oi10/catalog/v2/esm/catalog.json",
"facets": {
"activity": "activity_id",
"dataset": "source_id",
"ensemble": "member_id",
"exp": "experiment_id",
"grid": "grid_label",
"institute": "institution_id",
"mip": "table_id",
"short_name": "variable_id",
"version": "version",
"frequency": "frequency",
},
},
]
}
},
"CMIP5": {
"catalogs": {
"NCI": [
{
"file": "/g/data/rr3/catalog/v2/esm/catalog.json",
"facets": {
"activity": "activity_id",
"dataset": "source_id",
"ensemble": "ensemble",
"exp": "experiment",
"grid": "grid_label",
"institute": "institution_id",
"mip": "table_id",
"short_name": "variable",
"version": "version",
},
},
{
"file": "/g/data/al33/catalog/v2/esm/catalog.json",
"facets": {
"activity": "activity_id",
"dataset": "source_id",
"ensemble": "ensemble",
"exp": "experiment",
"institute": "institute",
"mip": "table",
"short_name": "variable",
"version": "version",
"timerange": "time_range",
},
},
]
}
},
}

file_cfg = read_config_file()
cfg.update(file_cfg)

return cfg


@lru_cache()
def get_intake_config():
"""Get the esgf-pyclient configuration."""
return load_intake_config()


def _read_facets(
cfg: dict,
fhandle: str | None,
project: str | None = None,
) -> tuple[dict[str, Any], str]:
"""
Extract facet mapping from ESMValCore configuration for a given catalog file handle.

Recursively traverses the ESMValCore configuration structure to find the
facet mapping that corresponds to the specified file handle.

Parameters
----------
cfg : dict
The ESMValCore intake configuration dictionary.
fhandle : str
The file handle/path of the intake-esm catalog to match.
project : str, optional
The current project name in the configuration hierarchy.

Returns
-------
tuple
A tuple containing:
- dict: Facet mapping between ESMValCore facets and catalog columns
- str: The project name associated with the catalog file
"""
if fhandle is None:
raise ValueError(
"Unable to ascertain facets without valid file handle."
)

for _project, val in cfg.items():
if not (isinstance(val, list)):
return _read_facets(val, fhandle, project or _project)
for facet_info in val:
file, facets = facet_info.get("file"), facet_info.get("facets")
if file == fhandle:
return facets, project # type: ignore[return-value]
else:
raise ValueError(
f"No facets found for {fhandle} in the config file. "
"Please check the config file and ensure it is valid."
)
10 changes: 10 additions & 0 deletions esmvalcore/data/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Find files using an intake-esm catalog and load them."""

from .intake._intake_dataset import clear_catalog_cache, load_catalogs
from .intake._interface import merge_intake_search_history

__all__ = [
"load_catalogs",
"clear_catalog_cache",
"merge_intake_search_history",
]
132 changes: 132 additions & 0 deletions esmvalcore/data/intake/_intake_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""Import datasets using Intake-ESM."""

import logging
from pathlib import Path
from typing import Any, Sequence

import intake
import intake_esm

from esmvalcore.config._intake import get_intake_config
from esmvalcore.local import LocalFile

__all__ = ["load_catalogs", "clear_catalog_cache"]

logger = logging.getLogger(__name__)

_CACHE: dict[Path, intake_esm.core.esm_datastore] = {}


def clear_catalog_cache() -> None:
"""Clear the catalog cache."""
_CACHE.clear()


def load_catalogs(
project: str, drs: dict[str, Any]
) -> tuple[list[intake_esm.core.esm_datastore], list[dict[str, str]]]:
"""Load all intake-esm catalogs for a project and their associated facet mappings.

Parameters
----------
project : str
The project name, eg. 'CMIP6'.
drs : dict
The DRS configuration. Can be obtained from the global configuration drs
field, eg. CFG['drs'].

Returns
-------
intake_esm.core.esm_datastore
The catalog.
dict
The facet mapping - a dictionary mapping ESMVlCore dataset facet names
to the fields in the intake-esm datastore.
"""
catalog_info: dict[str, Any] = (
get_intake_config().get(project, {}).get("catalogs", {})
)

site = drs.get(project, "default")
if site not in catalog_info:
return [None], [{}]

catalog_urls = [
Path(catalog.get("file")).expanduser()
for catalog in catalog_info[site]
]
facet_list = [catalog.get("facets") for catalog in catalog_info[site]]

for catalog_url in catalog_urls:
if catalog_url not in _CACHE:
logger.info(
"Loading intake-esm catalog (this may take some time): %s",
catalog_url,
)
_CACHE[catalog_url] = intake.open_esm_datastore(catalog_url)
logger.info("Successfully loaded catalog %s", catalog_url)

return ([_CACHE[cat_url] for cat_url in catalog_urls], facet_list)


def find_files(
*, project: str, drs: dict, facets: dict
) -> Sequence[LocalFile]:
"""Find files for variable in all intake-esm catalogs associated with a project.

Parameters
----------
facet_map : dict
A dict mapping the variable names used to initialise the IntakeDataset
object to their ESMValCore facet names. For example,
```
ACCESS_ESM1_5 = IntakeDataset(
short_name='tos',
project='CMIP6',
)
```
would result in a variable dict of {'short_name': 'tos', 'project': 'CMIP6'}.
drs : dict
The DRS configuration. Can be obtained from the global configuration drs
field, eg. CFG['drs'].
"""
catalogs, facet_maps = load_catalogs(project, drs)

if not catalogs:
return []

files = []

for catalog, facet_map in zip(catalogs, facet_maps, strict=False):
query = {facet_map.get(key): val for key, val in facets.items()}
query.pop(None, None)

_unused_facets = {
key: val for key, val in facets.items() if key not in facet_map
}

logger.info(
"Unable to refine datastore search on catalog %s with the following facets %s",
catalog.esmcat.catalog_file,
_unused_facets,
)

selection = catalog.search(**query)

if not selection:
continue

# Select latest version
if "version" in facet_map and "version" not in facets:
latest_version = max(
selection.unique().version
) # These are strings - need to double check the sorting here.
query = {
**query,
facet_map["version"]: latest_version,
}
selection = selection.search(**query)

files += [LocalFile(f) for f in selection.unique().path]

return files
54 changes: 54 additions & 0 deletions esmvalcore/data/intake/_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import typing


def merge_intake_search_history(
search_history: list[dict[str, list[typing.Any]]],
) -> dict[str, typing.Any]:
"""Create a facet mapping from an intake-esm search history.

This takes an intake-esm search history, which typically looks something like
```python
[
{'variable_id': ['tos']},
{'table_id': ['Omon']},
{'experiment_id': ['historical']},
{'member_id': ['r1i1p1f1']},
{'source_id': ['ACCESS-ESM1-5']},
{'grid_label': ['gn']},
{'version': ['v.*']},
]
```
and turns it into something like
```python
{
'variable_id': 'tos',
'table_id': 'Omon',
'experiment_id': 'historical',
'member_id': 'r1i1p1f1',
'source_id': 'ACCESS-ESM1-5',
'grid_label': 'gn',
'version': 'v.*',
}
```

Notes
-----
This function is really quite ugly & could probably be improved.
"""
merged: dict[str, typing.Any] = {}

for entry in search_history:
for key, value in entry.items():
if key in merged:
if isinstance(merged[key], list):
merged[key].extend(value)
else:
merged[key] = [merged[key]] + value
else:
merged[key] = value

for key, val in merged.items():
if isinstance(val, list) and len(val) == 1:
merged[key] = val[0]

return merged
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ dependencies = [
"fire",
"geopy",
"humanfriendly",
"intake>=2.0.0",
"intake-esgf>=2025.10.22",
"intake-esm",
"intake-esm>=2025.2.3",
"iris-grib>=0.20.0", # github.com/ESMValGroup/ESMValCore/issues/2535
"isodate>=0.7.0",
"jinja2",
Expand Down