Skip to content

Commit 5d77d49

Browse files
authored
Merge pull request #595 from pangeo-forge/dynamic-chunks-interface
Dynamic chunking interface for StoreToZarr
2 parents dc125ab + 2afd727 commit 5d77d49

File tree

7 files changed

+172
-13
lines changed

7 files changed

+172
-13
lines changed

.github/workflows/test-integration.yaml

+4-1
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,11 @@ jobs:
5757
- name: 🌈 Install pangeo-forge-recipes & pangeo-forge-runner
5858
shell: bash -l {0}
5959
run: |
60-
python -m pip install -e ".[test,minio]"
6160
python -m pip install ${{ matrix.runner-version }}
61+
python -m pip install -e ".[test,minio]"
62+
63+
# order reversed to fix https://github.com/pangeo-forge/pangeo-forge-recipes/pull/595#issuecomment-1811630921
64+
# this should however be fixed in the runner itself
6265
- name: 🏄‍♂️ Run Tests
6366
shell: bash -l {0}
6467
run: |

docs/release_notes.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Release Notes
22

3+
## v0.10.4 - 2023-11-15
4+
5+
- Add `dynamic_chunking_fn`/`dynamic_chunking_fn_kwargs` keywords to StoreToZarr. This allows users to pass a function that will be called at runtime to determine the target chunks for the resulting datasets based on the in memory representation/size of the recipe dataset. {pull}`595`
6+
37
## v0.10.3 - 2023-10-03
48

59
- Assign injection spec values for command line interface {pull}`566`

examples/feedstock/gpcp_from_gcs.py

+3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore:
2929
assert ds.title == (
3030
"Global Precipitation Climatatology Project (GPCP) " "Climate Data Record (CDR), Daily V1.3"
3131
)
32+
# Making sure that the native chunking is different from the dynamic chunking
33+
assert ds.chunks["time"][0] == 1
34+
3235
return store
3336

3437

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
from typing import Dict
2+
3+
import apache_beam as beam
4+
import pandas as pd
5+
import xarray as xr
6+
import zarr
7+
8+
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
9+
from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr
10+
11+
dates = [
12+
d.to_pydatetime().strftime("%Y%m%d")
13+
for d in pd.date_range("1996-10-01", "1999-02-01", freq="D")
14+
]
15+
16+
17+
def make_url(time):
18+
url_base = "https://storage.googleapis.com/pforge-test-data"
19+
return f"{url_base}/gpcp/v01r03_daily_d{time}.nc"
20+
21+
22+
concat_dim = ConcatDim("time", dates, nitems_per_file=1)
23+
pattern = FilePattern(make_url, concat_dim)
24+
25+
26+
def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore:
27+
# This fails integration test if not imported here
28+
# TODO: see if --setup-file option for runner fixes this
29+
import xarray as xr
30+
31+
ds = xr.open_dataset(store, engine="zarr", chunks={})
32+
assert ds.title == (
33+
"Global Precipitation Climatatology Project (GPCP) " "Climate Data Record (CDR), Daily V1.3"
34+
)
35+
36+
assert ds.chunks["time"][0] == 2
37+
return store
38+
39+
40+
def chunk_func(ds: xr.Dataset) -> Dict[str, int]:
41+
return {"time": 2}
42+
43+
44+
recipe = (
45+
beam.Create(pattern.items())
46+
| OpenURLWithFSSpec()
47+
| OpenWithXarray(file_type=pattern.file_type, xarray_open_kwargs={"decode_coords": "all"})
48+
| StoreToZarr(
49+
dynamic_chunking_fn=chunk_func,
50+
store_name="gpcp.zarr",
51+
combine_dims=pattern.combine_dim_keys,
52+
)
53+
| "Test dataset" >> beam.Map(test_ds)
54+
)

examples/feedstock/meta.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
recipes:
22
- id: "gpcp-from-gcs"
33
object: "gpcp_from_gcs:recipe"
4+
- id: "gpcp-from-gcs-dynamic-chunks"
5+
object: "gpcp_from_gcs_dynamic_chunks:recipe"
46
- id: "noaa-oisst"
57
object: "noaa_oisst:recipe"
68
- id: "terraclimate"

pangeo_forge_recipes/transforms.py

+27-5
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import xarray as xr
1818
import zarr
1919

20-
from .aggregation import XarraySchema, dataset_to_schema, schema_to_zarr
20+
from .aggregation import XarraySchema, dataset_to_schema, schema_to_template_ds, schema_to_zarr
2121
from .combiners import CombineMultiZarrToZarr, CombineXarraySchemas
2222
from .openers import open_url, open_with_kerchunk, open_with_xarray
2323
from .patterns import CombineOp, Dimension, FileType, Index, augment_index_with_start_stop
@@ -505,7 +505,14 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin):
505505
:param target_root: Root path the Zarr store will be created inside;
506506
`store_name` will be appended to this prefix to create a full path.
507507
:param target_chunks: Dictionary mapping dimension names to chunks sizes.
508-
If a dimension is a not named, the chunks will be inferred from the data.
508+
If a dimension is a not named, the chunks will be inferred from the data.
509+
:param dynamic_chunking_fn: Optionally provide a function that takes an ``xarray.Dataset``
510+
template dataset as its first argument and returns a dynamically generated chunking dict.
511+
If provided, ``target_chunks`` cannot also be passed. You can use this to determine chunking
512+
based on the full dataset (e.g. divide along a certain dimension based on a desired chunk
513+
size in memory). For more advanced chunking strategies, check
514+
out https://github.com/jbusecke/dynamic_chunks
515+
:param dynamic_chunking_fn_kwargs: Optional keyword arguments for ``dynamic_chunking_fn``.
509516
:param attrs: Extra group-level attributes to inject into the dataset.
510517
"""
511518

@@ -517,19 +524,34 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin):
517524
default_factory=RequiredAtRuntimeDefault
518525
)
519526
target_chunks: Dict[str, int] = field(default_factory=dict)
527+
dynamic_chunking_fn: Optional[Callable[[xr.Dataset], dict]] = None
528+
dynamic_chunking_fn_kwargs: Optional[dict] = field(default_factory=dict)
520529
attrs: Dict[str, str] = field(default_factory=dict)
521530

531+
def __post_init__(self):
532+
if self.target_chunks and self.dynamic_chunking_fn:
533+
raise ValueError("Passing both `target_chunks` and `dynamic_chunking_fn` not allowed.")
534+
522535
def expand(
523536
self,
524537
datasets: beam.PCollection[Tuple[Index, xr.Dataset]],
525538
) -> beam.PCollection[zarr.storage.FSStore]:
526539
schema = datasets | DetermineSchema(combine_dims=self.combine_dims)
527540
indexed_datasets = datasets | IndexItems(schema=schema)
528-
rechunked_datasets = indexed_datasets | Rechunk(
529-
target_chunks=self.target_chunks, schema=schema
541+
target_chunks = (
542+
self.target_chunks
543+
if not self.dynamic_chunking_fn
544+
else beam.pvalue.AsSingleton(
545+
schema
546+
| beam.Map(schema_to_template_ds)
547+
| beam.Map(self.dynamic_chunking_fn, **self.dynamic_chunking_fn_kwargs)
548+
)
530549
)
550+
rechunked_datasets = indexed_datasets | Rechunk(target_chunks=target_chunks, schema=schema)
531551
target_store = schema | PrepareZarrTarget(
532-
target=self.get_full_target(), target_chunks=self.target_chunks, attrs=self.attrs
552+
target=self.get_full_target(),
553+
target_chunks=target_chunks,
554+
attrs=self.attrs,
533555
)
534556
n_target_stores = rechunked_datasets | StoreDatasetFragments(target_store=target_store)
535557
singleton_target_store = (

tests/test_transforms.py

+78-7
Original file line numberDiff line numberDiff line change
@@ -234,18 +234,20 @@ def _check_chunks(actual):
234234
assert_that(rechunked, correct_chunks())
235235

236236

237+
class OpenZarrStore(beam.PTransform):
238+
@staticmethod
239+
def _open_zarr(store):
240+
return xr.open_dataset(store, engine="zarr", chunks={})
241+
242+
def expand(self, pcoll):
243+
return pcoll | beam.Map(self._open_zarr)
244+
245+
237246
def test_StoreToZarr_emits_openable_fsstore(
238247
pipeline,
239248
netcdf_local_file_pattern_sequential,
240249
tmp_target_url,
241250
):
242-
def _open_zarr(store):
243-
return xr.open_dataset(store, engine="zarr")
244-
245-
class OpenZarrStore(beam.PTransform):
246-
def expand(self, pcoll):
247-
return pcoll | beam.Map(_open_zarr)
248-
249251
def is_xrdataset():
250252
def _is_xr_dataset(actual):
251253
assert len(actual) == 1
@@ -266,6 +268,75 @@ def _is_xr_dataset(actual):
266268
assert_that(open_store, is_xrdataset())
267269

268270

271+
@pytest.mark.parametrize("with_kws", [True, False])
272+
def test_StoreToZarr_dynamic_chunking_interface(
273+
pipeline: beam.Pipeline,
274+
netcdf_local_file_pattern_sequential: FilePattern,
275+
tmp_target_url: str,
276+
daily_xarray_dataset: xr.Dataset,
277+
with_kws: bool,
278+
):
279+
def has_dynamically_set_chunks():
280+
def _has_dynamically_set_chunks(actual):
281+
assert len(actual) == 1
282+
item = actual[0]
283+
assert isinstance(item, xr.Dataset)
284+
if not with_kws:
285+
# we've dynamically set the number of timesteps per chunk to be equal to
286+
# the length of the full time dimension of the aggregate dataset, therefore
287+
# if this worked, there should only be one chunk
288+
assert len(item.chunks["time"]) == 1
289+
else:
290+
# in this case, we've passed the kws {"divisor": 2}, so we expect two time chunks
291+
assert len(item.chunks["time"]) == 2
292+
293+
return _has_dynamically_set_chunks
294+
295+
pattern: FilePattern = netcdf_local_file_pattern_sequential
296+
297+
time_len = len(daily_xarray_dataset.time)
298+
299+
def dynamic_chunking_fn(template_ds: xr.Dataset, divisor: int = 1):
300+
assert isinstance(template_ds, xr.Dataset)
301+
return {"time": int(time_len / divisor)}
302+
303+
kws = {} if not with_kws else {"dynamic_chunking_fn_kwargs": {"divisor": 2}}
304+
305+
with pipeline as p:
306+
datasets = p | beam.Create(pattern.items()) | OpenWithXarray()
307+
target_store = datasets | StoreToZarr(
308+
target_root=tmp_target_url,
309+
store_name="test.zarr",
310+
combine_dims=pattern.combine_dim_keys,
311+
attrs={},
312+
dynamic_chunking_fn=dynamic_chunking_fn,
313+
**kws,
314+
)
315+
open_store = target_store | OpenZarrStore()
316+
assert_that(open_store, has_dynamically_set_chunks())
317+
318+
319+
def test_StoreToZarr_dynamic_chunking_with_target_chunks_raises(
320+
netcdf_local_file_pattern_sequential: FilePattern,
321+
):
322+
def fn(template_ds):
323+
pass
324+
325+
pattern: FilePattern = netcdf_local_file_pattern_sequential
326+
327+
with pytest.raises(
328+
ValueError,
329+
match="Passing both `target_chunks` and `dynamic_chunking_fn` not allowed",
330+
):
331+
_ = StoreToZarr(
332+
target_root="target_root",
333+
store_name="test.zarr",
334+
combine_dims=pattern.combine_dim_keys,
335+
target_chunks={"time": 1},
336+
dynamic_chunking_fn=fn,
337+
)
338+
339+
269340
def test_StoreToZarr_target_root_default_unrunnable(
270341
pipeline,
271342
netcdf_local_file_pattern_sequential,

0 commit comments

Comments
 (0)