Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let XarrayZarrRecipe use fsspec references for opening netCDF inputs #218

Merged
merged 3 commits into from
Oct 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/py3.8.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ dependencies:
- pip
- pip:
- pytest-timeout
- fsspec-reference-maker
- fsspec-reference-maker>=0.0.4
2 changes: 1 addition & 1 deletion ci/py3.9.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ dependencies:
- zarr>=2.6.0
- pip:
- pytest-timeout
- fsspec-reference-maker
- fsspec-reference-maker>=0.0.4
148 changes: 122 additions & 26 deletions pangeo_forge_recipes/recipes/xarray_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
from typing import Callable, Dict, Hashable, Iterator, List, Optional, Sequence, Set, Tuple

import dask
import fsspec
import numpy as np
import xarray as xr
import zarr

from ..chunk_grid import ChunkGrid
from ..patterns import CombineOp, DimIndex, FilePattern, Index
from ..reference import create_hdf5_reference, unstrip_protocol
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that _unstrip_protocol is now available in fsspec.utils. A more thorough one for chained FSs would be good, but not necessary for the work here.

from ..storage import AbstractTarget, CacheFSSpecTarget, MetadataTarget, file_opener
from ..utils import calc_subsets, fix_scalar_attr_encoding, lock_for_conflicts
from .base import BaseRecipe, FilePatternRecipeMixin
Expand All @@ -45,11 +47,16 @@
# (e.g. {'time': 5, 'depth': 2})


def _input_metadata_fname(input_key):
def _input_metadata_fname(input_key: InputKey) -> str:
key_str = "-".join([f"{k.name}_{k.index}" for k in input_key])
return "input-meta-" + key_str + ".json"


def _input_reference_fname(input_key: InputKey) -> str:
key_str = "-".join([f"{k.name}_{k.index}" for k in input_key])
return "input-reference-" + key_str + ".json"


def inputs_for_chunk(
chunk_key: ChunkKey, inputs_per_chunk: int, ninputs: int
) -> Sequence[InputKey]:
Expand Down Expand Up @@ -147,6 +154,7 @@ def cache_input_metadata(
xarray_open_kwargs: dict,
delete_input_encoding: bool,
process_input: Optional[Callable[[xr.Dataset, str], xr.Dataset]],
open_input_with_fsspec_reference: bool,
) -> None:
if metadata_cache is None:
raise ValueError("metadata_cache is not set.")
Expand All @@ -160,11 +168,41 @@ def cache_input_metadata(
xarray_open_kwargs=xarray_open_kwargs,
delete_input_encoding=delete_input_encoding,
process_input=process_input,
open_input_with_fsspec_reference=open_input_with_fsspec_reference,
metadata_cache=metadata_cache,
) as ds:
input_metadata = ds.to_dict(data=False)
metadata_cache[_input_metadata_fname(input_key)] = input_metadata


def make_input_reference(
input_key: InputKey,
metadata_cache: MetadataTarget,
file_pattern: FilePattern,
input_cache: Optional[CacheFSSpecTarget],
copy_input_to_local_file: bool,
) -> None:

fname = file_pattern[input_key]
if input_cache is None:
protocol = fsspec.utils.get_protocol(fname)
url = unstrip_protocol(fname, protocol)
else:
url = unstrip_protocol(input_cache._full_path(fname), input_cache.fs.protocol)
with file_opener(
fname,
cache=input_cache,
copy_to_local=copy_input_to_local_file,
bypass_open=False,
secrets=file_pattern.query_string_secrets,
**file_pattern.fsspec_open_kwargs,
) as fp:
ref_data = create_hdf5_reference(fp, url, fname)

ref_fname = _input_reference_fname(input_key)
metadata_cache[ref_fname] = ref_data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is a recipe detail, but it would be good to figure out where we can store these reference files for the future, perhaps together with a hash/uid of the file they were created from.



def cache_input(
input_key: InputKey,
cache_inputs: bool,
Expand All @@ -176,6 +214,7 @@ def cache_input(
delete_input_encoding: bool,
process_input: Optional[Callable[[xr.Dataset, str], xr.Dataset]],
metadata_cache: Optional[MetadataTarget],
open_input_with_fsspec_reference: bool,
) -> None:
if cache_inputs:
if file_pattern.is_opendap:
Expand All @@ -188,8 +227,18 @@ def cache_input(
fname, file_pattern.query_string_secrets, **file_pattern.fsspec_open_kwargs
)

if open_input_with_fsspec_reference:
cache = input_cache if cache_inputs else None
if file_pattern.is_opendap:
raise ValueError("Can't make references for opendap inputs")
if metadata_cache is None:
raise ValueError("Can't make references; no metadata_cache assigned")
make_input_reference(
input_key, metadata_cache, file_pattern, cache, copy_input_to_local_file,
)

if cache_metadata:
return cache_input_metadata(
cache_input_metadata(
input_key,
file_pattern=file_pattern,
input_cache=input_cache,
Expand All @@ -199,6 +248,7 @@ def cache_input(
delete_input_encoding=delete_input_encoding,
process_input=process_input,
metadata_cache=metadata_cache,
open_input_with_fsspec_reference=open_input_with_fsspec_reference,
)


Expand Down Expand Up @@ -259,6 +309,8 @@ def open_input(
xarray_open_kwargs: dict,
delete_input_encoding: bool,
process_input: Optional[Callable[[xr.Dataset, str], xr.Dataset]],
open_input_with_fsspec_reference: bool,
metadata_cache: Optional[MetadataTarget],
) -> xr.Dataset:
fname = file_pattern[input_key]
logger.info(f"Opening input with Xarray {input_key!s}: '{fname}'")
Expand All @@ -268,35 +320,56 @@ def open_input(
raise ValueError("Can't cache opendap inputs")
if copy_input_to_local_file:
raise ValueError("Can't copy opendap inputs to local file")
if open_input_with_fsspec_reference:
raise ValueError("Can't open opendap inputs with fsspec-reference-maker")

if open_input_with_fsspec_reference:
if metadata_cache is None:
raise ValueError("metadata_cache is not set.")
from fsspec.implementations.reference import ReferenceFileSystem

reference_data = metadata_cache[_input_reference_fname(input_key)]
# TODO: figure out how to set this for the cache target
remote_protocol = fsspec.utils.get_protocol(next(file_pattern.items())[1])
ref_fs = ReferenceFileSystem(
reference_data, remote_protocol=remote_protocol, skip_instance_cache=True
)

cache = input_cache if cache_inputs else None
mapper = ref_fs.get_mapper("/")
# Doesn't really need to be a context manager, but that's how this function works
with xr.open_dataset(mapper, engine="zarr", chunks={}, consolidated=False) as ds:
yield ds

with file_opener(
fname,
cache=cache,
copy_to_local=copy_input_to_local_file,
bypass_open=file_pattern.is_opendap,
secrets=file_pattern.query_string_secrets,
**file_pattern.fsspec_open_kwargs,
) as f:
with dask.config.set(scheduler="single-threaded"): # make sure we don't use a scheduler
kw = xarray_open_kwargs.copy()
if "engine" not in kw:
kw["engine"] = "h5netcdf"
logger.debug(f"about to enter xr.open_dataset context on {f}")
with xr.open_dataset(f, **kw) as ds:
logger.debug("successfully opened dataset")
ds = fix_scalar_attr_encoding(ds)
else:

if delete_input_encoding:
for var in ds.variables:
ds[var].encoding = {}
cache = input_cache if cache_inputs else None

if process_input is not None:
ds = process_input(ds, str(fname))
with file_opener(
fname,
cache=cache,
copy_to_local=copy_input_to_local_file,
bypass_open=file_pattern.is_opendap,
secrets=file_pattern.query_string_secrets,
**file_pattern.fsspec_open_kwargs,
) as f:
with dask.config.set(scheduler="single-threaded"): # make sure we don't use a scheduler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we not want a scheduler? Because we are dealing with local files?

kw = xarray_open_kwargs.copy()
if "engine" not in kw:
kw["engine"] = "h5netcdf"
logger.debug(f"about to enter xr.open_dataset context on {f}")
with xr.open_dataset(f, **kw) as ds:
logger.debug("successfully opened dataset")
ds = fix_scalar_attr_encoding(ds)

logger.debug(f"{ds}")
yield ds
if delete_input_encoding:
for var in ds.variables:
ds[var].encoding = {}

if process_input is not None:
ds = process_input(ds, str(fname))

logger.debug(f"{ds}")
yield ds


def subset_dataset(ds: xr.Dataset, subset_spec: DimIndex) -> xr.Dataset:
Expand Down Expand Up @@ -327,6 +400,8 @@ def open_chunk(
xarray_open_kwargs: dict,
delete_input_encoding: bool,
process_input: Optional[Callable[[xr.Dataset, str], xr.Dataset]],
open_input_with_fsspec_reference: bool,
metadata_cache: Optional[MetadataTarget],
) -> xr.Dataset:
logger.info(f"Opening inputs for chunk {chunk_key!s}")
ninputs = file_pattern.dims[file_pattern.concat_dims[0]]
Expand All @@ -345,6 +420,8 @@ def open_chunk(
xarray_open_kwargs=xarray_open_kwargs,
delete_input_encoding=delete_input_encoding,
process_input=process_input,
open_input_with_fsspec_reference=open_input_with_fsspec_reference,
metadata_cache=metadata_cache,
)
)
for i in inputs
Expand Down Expand Up @@ -440,6 +517,7 @@ def prepare_target(
delete_input_encoding: bool,
process_input: Optional[Callable[[xr.Dataset, str], xr.Dataset]],
metadata_cache: Optional[MetadataTarget],
open_input_with_fsspec_reference: bool,
) -> None:
try:
ds = open_target(target)
Expand Down Expand Up @@ -469,6 +547,8 @@ def prepare_target(
xarray_open_kwargs=xarray_open_kwargs,
delete_input_encoding=delete_input_encoding,
process_input=process_input,
open_input_with_fsspec_reference=open_input_with_fsspec_reference,
metadata_cache=metadata_cache,
) as ds:
# ds is already chunked

Expand Down Expand Up @@ -544,6 +624,7 @@ def store_chunk(
delete_input_encoding: bool,
process_input: Optional[Callable[[xr.Dataset, str], xr.Dataset]],
metadata_cache: Optional[MetadataTarget],
open_input_with_fsspec_reference: bool,
) -> None:
if target is None:
raise ValueError("target has not been set.")
Expand All @@ -562,6 +643,8 @@ def store_chunk(
xarray_open_kwargs=xarray_open_kwargs,
delete_input_encoding=delete_input_encoding,
process_input=process_input,
open_input_with_fsspec_reference=open_input_with_fsspec_reference,
metadata_cache=metadata_cache,
) as ds_chunk:
# writing a region means that all the variables MUST have concat_dim
to_drop = [v for v in ds_chunk.variables if concat_dim not in ds_chunk[v].dims]
Expand Down Expand Up @@ -715,6 +798,9 @@ class XarrayZarrRecipe(BaseRecipe, FilePatternRecipeMixin):
along dimension according to the specified mapping. For example,
``{'time': 5}`` would split each input file into 5 chunks along the
time dimension. Multiple dimensions are allowed.
:param open_input_with_fsspec_reference: If True, use fsspec-reference-maker
to generate a reference filesystem for each input, to be used when opening
the file with Xarray as a virtual Zarr dataset.
"""

inputs_per_chunk: int = 1
Expand All @@ -733,6 +819,7 @@ class XarrayZarrRecipe(BaseRecipe, FilePatternRecipeMixin):
process_chunk: Optional[Callable[[xr.Dataset], xr.Dataset]] = None
lock_timeout: Optional[int] = None
subset_inputs: SubsetSpec = field(default_factory=dict)
open_input_with_fsspec_reference: bool = False

# internal attributes not meant to be seen or accessed by user
_concat_dim: str = field(default_factory=str, repr=False, init=False)
Expand All @@ -758,6 +845,8 @@ def __post_init__(self):
raise ValueError("Can't cache opendap inputs.")
else:
self.cache_inputs = False
if self.open_input_with_fsspec_reference:
raise ValueError("Can't generate references on opendap inputs")
if "engine" in self.xarray_open_kwargs:
if self.xarray_open_kwargs["engine"] != "netcdf4":
raise ValueError(
Expand Down Expand Up @@ -848,6 +937,7 @@ def prepare_target(self) -> Callable[[], None]:
delete_input_encoding=self.delete_input_encoding,
process_input=self.process_input,
metadata_cache=self.metadata_cache,
open_input_with_fsspec_reference=self.open_input_with_fsspec_reference,
)

@property
Expand All @@ -863,6 +953,7 @@ def cache_input(self) -> Callable[[Hashable], None]:
delete_input_encoding=self.delete_input_encoding,
process_input=self.process_input,
metadata_cache=self.metadata_cache,
open_input_with_fsspec_reference=self.open_input_with_fsspec_reference,
)

@property
Expand All @@ -887,6 +978,7 @@ def store_chunk(self) -> Callable[[Hashable], None]:
delete_input_encoding=self.delete_input_encoding,
process_input=self.process_input,
metadata_cache=self.metadata_cache,
open_input_with_fsspec_reference=self.open_input_with_fsspec_reference,
)

@property
Expand Down Expand Up @@ -952,6 +1044,8 @@ def open_input(self, input_key):
xarray_open_kwargs=self.xarray_open_kwargs,
delete_input_encoding=self.delete_input_encoding,
process_input=self.process_input,
open_input_with_fsspec_reference=self.open_input_with_fsspec_reference,
metadata_cache=self.metadata_cache,
) as ds:
yield ds

Expand All @@ -971,5 +1065,7 @@ def open_chunk(self, chunk_key):
xarray_open_kwargs=self.xarray_open_kwargs,
delete_input_encoding=self.delete_input_encoding,
process_input=self.process_input,
open_input_with_fsspec_reference=self.open_input_with_fsspec_reference,
metadata_cache=self.metadata_cache,
) as ds:
yield ds
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ install_requires =
zarr >= 2.6.0
numcodecs >= 0.9.0
fsspec[http] >= 2021.6.0
fsspec-reference-maker >= 0.0.2
fsspec-reference-maker >= 0.0.4

[options.extras_require]
dev =
Expand Down
11 changes: 11 additions & 0 deletions tests/recipe_tests/test_XarrayZarrRecipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,17 @@ def test_recipe(recipe_fixture, execute_recipe):
pass


@pytest.mark.parametrize("recipe_fixture", all_recipes)
def test_recipe_with_references(recipe_fixture, execute_recipe):
"""Same as above, but use fsspec references for opening the files."""

RecipeClass, file_pattern, kwargs, ds_expected, target = recipe_fixture
rec = RecipeClass(file_pattern, open_input_with_fsspec_reference=True, **kwargs)
execute_recipe(rec)
ds_actual = xr.open_zarr(target.get_mapper()).load()
xr.testing.assert_identical(ds_actual, ds_expected)


@pytest.mark.parametrize("recipe_fixture", all_recipes)
@pytest.mark.parametrize("nkeep", [1, 2])
def test_prune_recipe(recipe_fixture, execute_recipe, nkeep):
Expand Down