diff --git a/design-docs/005-manifest-split.md b/design-docs/005-manifest-split.md index 901593b77..cd7284180 100644 --- a/design-docs/005-manifest-split.md +++ b/design-docs/005-manifest-split.md @@ -152,7 +152,7 @@ chunk-manifests: # that will be useful for sparse arrays target: coord1 # arrays that match will go to this manifest set - + - metadata-chunks: [0, 200] # arrays < 200 chunks will go to coord2 # but only if they didn't get assigned by the rule above @@ -185,7 +185,7 @@ chunk-manifests: # of course, we'll have to tune all these numbers - coordinates: # we intend coordinate arrays to be assigned here max-manifest-size: 50000 cardinality: 1 - max-arrays-per-manifest: null + max-arrays-per-manifest: null overflow-to: default - default: @@ -267,7 +267,7 @@ def closure({a}): found = {} to_process = {a} manifests_seen = {} - + while x in to_process.peek(): for manifest in manifests(x) if manifest not in manifests_seen: manifests_seen.add(manifest) @@ -327,7 +327,7 @@ def manifest_assignments(modified_arrays: set[array]) -> list[[array]]: # this is the default (last) manifest set result.push([array]) else: - # the array is too large for this manifest set, but it may fit + # the array is too large for this manifest set, but it may fit # in the overflow set desired_assignments[manifest_set.overflow_to].push(array) else: @@ -403,4 +403,3 @@ has no dependencies. We need a bin-pack: [this one](https://crates.io/crates/rpack/0.2.2/dependencies) is fast at our scale and has no dependencies. - diff --git a/docs/docs/icechunk-python/dask.md b/docs/docs/icechunk-python/dask.md index 35273deb9..7c4dd67d0 100644 --- a/docs/docs/icechunk-python/dask.md +++ b/docs/docs/icechunk-python/dask.md @@ -68,14 +68,47 @@ Finally commit your changes! icechunk_session.commit("wrote a dask array!") ``` -## Icechunk + Dask + Xarray -### Simple +## Distributed + +In distributed contexts where the Session, and Zarr Array objects are sent across the network, +you must opt-in to successful pickling of a writable store. + +[`icechunk.dask.store_dask`](./reference.md#icechunk.dask.store_dask) takes care of the hard bit of +merging Sessions but it is required that you opt-in to pickling prior to creating the target Zarr array objects. + +Here is an example: +```python +import icechunk.dask + +zarr_chunks = (10, 10) +with icechunk_session.allow_pickling(): + group = zarr.group(store=icechunk_sesion.store, overwrite=True) + + zarray = group.create_array( + "array", + shape=shape, + chunks=zarr_chunks, + dtype="f8", + fill_value=float("nan"), + ) + icechunk.dask.store_dask(icechunk_session, sources=[dask_array], targets=[zarray]) +icechunk_session.commit("wrote a dask array!") +``` + +## Icechunk + Dask + Xarray The [`icechunk.xarray.to_icechunk`](./reference.md#icechunk.xarray.to_icechunk) is functionally identical to Xarray's [`Dataset.to_zarr`](https://docs.xarray.dev/en/stable/generated/xarray.Dataset.to_zarr.html), including many of the same keyword arguments. Notably the ``compute`` kwarg is not supported. +!!! warning + + When using Xarray, Icechunk in a Dask Distributed context, you *must* use `to_icechunk` so that the Session has a record + of the writes that are executed remotely. Using `to_zarr` in such cases, will result in the local Session having no + record of remote writes, and a meaningless commit. + + Now roundtrip an xarray dataset ```python import icechunk.xarray @@ -84,7 +117,7 @@ import xarray as xr # Assuming you have a valid writable Session named icechunk_session dataset = xr.tutorial.open_dataset("rasm", chunks={"time": 1}).isel(time=slice(24)) -icechunk.xarray.to_icechunk(dataset, store=icechunk_session.store)) +icechunk.xarray.to_icechunk(dataset, session) roundtripped = xr.open_zarr(icechunk_session.store, consolidated=False) dataset.identical(roundtripped) diff --git a/docs/docs/icechunk-python/faq.md b/docs/docs/icechunk-python/faq.md new file mode 100644 index 000000000..41c0f56ec --- /dev/null +++ b/docs/docs/icechunk-python/faq.md @@ -0,0 +1,5 @@ +# Frequently Asked Questions + +**Why do I have to opt-in to pickling an IcechunkStore or a Session?** + +Icechunk is different from normal Zarr stores because it is stateful. In a distributed setting, you have to be careful to communicate back the Session objects from remote write tasks, merge them and commit them. The opt-in to pickle is a way for us to hint to the user that they need to be sure about what they are doing. We use pickling because these operations are only tricky once you cross a process boundary. More pragmatically, to_zarr(session.store) fails spectacularly in distributed contexts (e.g. [this issue](https://github.com/earth-mover/icechunk/issues/383)), and we do not want the user to be surprised. diff --git a/docs/docs/icechunk-python/parallel.md b/docs/docs/icechunk-python/parallel.md new file mode 100644 index 000000000..26ae6370a --- /dev/null +++ b/docs/docs/icechunk-python/parallel.md @@ -0,0 +1,129 @@ +# Parallel Writes + +A common pattern with large distributed write jobs is to first initialize the dataset on a disk +with all appropriate metadata, and any coordinate variables. Following this a large write job +is kicked off in a distributed setting, where each worker is responsible for an independent +"region" of the output. + + +## Why is Icechunk different from any other Zarr store? + +The reason is that unlike Zarr, Icechunk is a "stateful" store. The Session object keeps a record of all writes, that is then +bundled together in a commit. Thus `Session.commit` must be executed on a Session object that knows about all writes, +including those executed remotely in a multi-processing or any other remote execution context. + +## Example +Here is how you can execute such writes with Icechunk, illustrate with a `ThreadPoolExecutor`. +First read some example data, and create an Icechunk Repository. +```python +import xarray as xr +import tempfile +from icechunk import Repository, local_filesystem_storage + +ds = xr.tutorial.open_dataset("rasm").isel(time=slice(24)) +repo = Repository.create(local_filesystem_storage(tempfile.mkdtemp())) +session = repo.writable_session("main") +``` +We will orchestrate so that each task writes one timestep. +This is an arbitrary choice but determines what we set for the Zarr chunk size. +```python +chunks = {1 if dim == "time" else ds.sizes[dim] for dim in ds.Tair.dims} +``` + +Initialize the dataset using [`Dataset.to_zarr`](https://docs.xarray.dev/en/stable/generated/xarray.Dataset.to_zarr.html) +and `compute=False`, this will NOT write any chunked array data, but will write all array metadata, and any +in-memory arrays (only `time` in this case). +```python +ds.to_zarr(session.store, compute=False, encoding={"Tair": {"chunks": chunks}}, mode="w") +# this commit is optional, but may be useful in your workflow +session.commit("initialize store") +``` + +## Multi-threading + +First define a function that constitutes one "write task". +```python +from icechunk import Session + +def write_timestamp(*, itime: int, session: Session) -> None: + # pass a list to isel to preserve the time dimension + ds = xr.tutorial.open_dataset("rasm").isel(time=[itime]) + # region="auto" tells Xarray to infer which "region" of the output arrays to write to. + ds.to_zarr(session.store, region="auto", consolidated=False) +``` + +Now execute the writes. +```python +from concurrent.futures import ThreadPoolExecutor, wait +from icechunk.distributed import merge_sessions + +session = repo.writable_session("main") +with ThreadPoolExecutor() as executor: + # submit the writes + futures = [executor.submit(write_timestamp, itime=i, session=session) for i in range(ds.sizes["time"])] + wait(futures) + +session.commit("finished writes") +``` + +Verify that the writes worked as expected: +```python +ondisk = xr.open_zarr(repo.readonly_session(branch="main").store, consolidated=False) +xr.testing.assert_identical(ds, ondisk) +``` + +## Distributed writes + +Any task execution framework (e.g. `ProcessPoolExecutor`, Joblib, Lithops, Dask Distributed, Ray, etc.) +can be used instead of the `ThreadPoolExecutor`. However such workloads should account for +Icehunk being a "stateful" store that records changes executed in a write session. + +There are three key points to keep in mind: +1. The `write_task` function *must* return the `Session`. It contains a record of the changes executed by this task. + These changes *must* be manually communicated back to the coordinating process, since each of the distributed processes + are working with their own independent `Session` instance. +2. Icechunk requires that users opt-in to pickling a *writable* `Session` using the `Session.allow_pickling()` context manager, + to remind the user that distributed writes with Icechunk require care. +3. The user *must* manually merge the Session objects to create a meaningful commit. + +First we modify `write_task` to return the `Session`: +```python +from icechunk import Session + +def write_timestamp(*, itime: int, session: Session) -> Session: + # pass a list to isel to preserve the time dimension + ds = xr.tutorial.open_dataset("rasm").isel(time=[itime]) + # region="auto" tells Xarray to infer which "region" of the output arrays to write to. + ds.to_zarr(session.store, region="auto", consolidated=False) + return session +``` + +Now we issue write tasks within the [`session.allow_pickling()`](./reference/md#icechunk.Session.allow_pickling) context, gather the Sessions from individual tasks, +merge them, and make a successful commit. + +```python +from concurrent.futures import ProcessPoolExecutor +from icechunk.distributed import merge_sessions + +session = repo.writable_session("main") +with ProcessPoolExecutor() as executor: + # opt-in to successful pickling of a writable session + with session.allow_pickling(): + # submit the writes + futures = [ + executor.submit(write_timestamp, itime=i, session=session) + for i in range(ds.sizes["time"]) + ] + # grab the Session objects from each individual write task + sessions = [f.result() for f in futures] + +# manually merge the remote sessions in to the local session +session = merge_sessions(session, *sessions) +session.commit("finished writes") +``` + +Verify that the writes worked as expected: +```python +ondisk = xr.open_zarr(repo.readonly_session(branch="main").store, consolidated=False) +xr.testing.assert_identical(ds, ondisk) +``` diff --git a/docs/docs/icechunk-python/xarray.md b/docs/docs/icechunk-python/xarray.md index 5e50f00be..5f6469150 100644 --- a/docs/docs/icechunk-python/xarray.md +++ b/docs/docs/icechunk-python/xarray.md @@ -2,7 +2,7 @@ Icechunk was designed to work seamlessly with Xarray. Xarray users can read and write data to Icechunk using [`xarray.open_zarr`](https://docs.xarray.dev/en/latest/generated/xarray.open_zarr.html#xarray.open_zarr) -and [`xarray.Dataset.to_zarr`](https://docs.xarray.dev/en/latest/generated/xarray.Dataset.to_zarr.html#xarray.Dataset.to_zarr). +and `icechunk.xarray.to_icechunk` methods. !!! warning @@ -12,6 +12,22 @@ and [`xarray.Dataset.to_zarr`](https://docs.xarray.dev/en/latest/generated/xarra pip install "xarray>=2025.1.1" ``` +!!!note "`to_icechunk` vs `to_zarr`" + + [`xarray.Dataset.to_zarr`](https://docs.xarray.dev/en/latest/generated/xarray.Dataset.to_zarr.html#xarray.Dataset.to_zarr) + and [`to_icechunk`](./reference.md#icechunk.xarray.to_icechunk) are nearly functionally identical. + + In a distributed context, e.g. + writes orchestrated with `multiprocesssing` or a `dask.distributed.Client` and `dask.array`, you *must* use `to_icechunk`. + This will ensure that you can execute a commit that successfully records all remote writes. + See [these docs on orchestrating parallel writes](./parallel.md) and [these docs on dask.array with distributed](./dask.md#icechunk-dask-xarray) + for more. + + If using `to_zarr`, remember to set `zarr_format=3, consolidated=False`. Consolidated metadata + is unnecessary (and unsupported) in Icechunk. Icechunk already organizes the dataset metadata + in a way that makes it very fast to fetch from storage. + + In this example, we'll explain how to create a new Icechunk repo, write some sample data to it, and append data a second block of data using Icechunk's version control features. @@ -72,19 +88,13 @@ Create a new writable session on the `main` branch to get the `IcechunkStore`: session = repo.writable_session("main") ``` -Writing Xarray data to Icechunk is as easy as calling `Dataset.to_zarr`: +Writing Xarray data to Icechunk is as easy as calling `to_icechunk`: ```python -ds1.to_zarr(session.store, zarr_format=3, consolidated=False) -``` - -!!! note +from icechunk.xarray import to_icechunk - 1. [Consolidated metadata](https://docs.xarray.dev/en/latest/user-guide/io.html#consolidated-metadata) - is unnecessary (and unsupported) in Icechunk. - Icechunk already organizes the dataset metadata in a way that makes it very - fast to fetch from storage. - 2. `zarr_format=3` is required until the default Zarr format changes in Xarray. +to_icechunk(ds, session) +``` After writing, we commit the changes using the session: @@ -101,7 +111,7 @@ this reason. Again, we'll use `Dataset.to_zarr`, this time with `append_dim='tim ```python # we have to get a new session after committing session = repo.writable_session("main") -ds2.to_zarr(session.store, append_dim='time') +to_icechunk(ds2, session, append_dim='time') ``` And then we'll commit the changes: diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 345f9853e..e911548ab 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -181,7 +181,9 @@ nav: - Icechunk Python: - Quickstart: icechunk-python/quickstart.md - Configuration: icechunk-python/configuration.md + - FAQ: icechunk-python/faq.md - Xarray: icechunk-python/xarray.md + - Parallel Writes: icechunk-python/parallel.md - Dask: icechunk-python/dask.md - Version Control: icechunk-python/version-control.md - Virtual Datasets: icechunk-python/virtual.md diff --git a/icechunk-python/benchmarks/create_era5.py b/icechunk-python/benchmarks/create_era5.py index 6bc192824..5f9d3d8b2 100644 --- a/icechunk-python/benchmarks/create_era5.py +++ b/icechunk-python/benchmarks/create_era5.py @@ -69,9 +69,8 @@ def write_era5(dataset: Dataset, *, ref, arrays_to_write): f"reports/era5-ingest-{ref}-{datetime.datetime.now()}.html" ): logger.info(f"Started writing {arrays_to_write=}.") - # FIXME: switch to region="auto" when GH606 is closed to_icechunk( - towrite, session.store, region=SELECTOR, **zarr_kwargs, split_every=32 + towrite, session=session, region="auto", **zarr_kwargs, split_every=32 ) session.commit("ingest!") logger.info(f"Finished writing {arrays_to_write=}.") diff --git a/icechunk-python/pyproject.toml b/icechunk-python/pyproject.toml index f62963415..d1ff95be8 100644 --- a/icechunk-python/pyproject.toml +++ b/icechunk-python/pyproject.toml @@ -39,7 +39,7 @@ test = [ "ruff", "dask>=2024.11.0", "distributed>=2024.11.0", - "xarray>=2025.01.1", + "xarray>=2025.01.2", "hypothesis", "pandas-stubs", "boto3-stubs[s3]", diff --git a/icechunk-python/python/icechunk/dask.py b/icechunk-python/python/icechunk/dask.py index ba3d57a50..a51dd21dc 100644 --- a/icechunk-python/python/icechunk/dask.py +++ b/icechunk-python/python/icechunk/dask.py @@ -48,6 +48,10 @@ def store_dask( merge the changesets corresponding to each write task. The `store` object passed in will be updated in-place with the fully merged changeset. + For distributed or multi-processing writes, this method must be called within + the `Session.allow_pickling()` context. All Zarr arrays in `targets` must also + be created within this context since they contain a reference to the Session. + Parameters ---------- store: IcechunkStore diff --git a/icechunk-python/python/icechunk/xarray.py b/icechunk-python/python/icechunk/xarray.py index a6fd10d53..276e6bede 100644 --- a/icechunk-python/python/icechunk/xarray.py +++ b/icechunk-python/python/icechunk/xarray.py @@ -1,19 +1,18 @@ -#!/usr/bin/env python3 import importlib from collections.abc import Hashable, Mapping, MutableMapping from dataclasses import dataclass, field -from typing import Any, Literal +from typing import Any, Literal, overload import numpy as np from packaging.version import Version import xarray as xr import zarr -from icechunk import IcechunkStore +from icechunk import IcechunkStore, Session from icechunk.dask import stateful_store_reduce from icechunk.distributed import extract_session, merge_sessions from icechunk.vendor.xarray import _choose_default_mode -from xarray import Dataset +from xarray import DataArray, Dataset from xarray.backends.common import ArrayWriter from xarray.backends.zarr import ZarrStore @@ -81,8 +80,6 @@ class XarrayDatasetWriter: store: IcechunkStore = field(kw_only=True) safe_chunks: bool = field(kw_only=True, default=True) - # TODO: uncomment when Zarr has support - # write_empty_chunks: bool = field(kw_only=True, default=True) _initialized: bool = field(default=False, repr=False) @@ -115,13 +112,12 @@ def _open_group( append_dim=append_dim, write_region=region, safe_chunks=self.safe_chunks, - # TODO: uncomment when Zarr has support - # write_empty=self.write_empty_chunks, synchronizer=None, consolidated=False, consolidate_on_close=False, zarr_version=None, ) + self.dataset = self.xarray_store._validate_and_autodetect_region(self.dataset) def write_metadata(self, encoding: Mapping[Any, Any] | None = None) -> None: """ @@ -194,28 +190,27 @@ def write_lazy( def to_icechunk( - dataset: Dataset, - store: IcechunkStore, + obj: DataArray | Dataset, + session: Session, *, group: str | None = None, mode: ZarrWriteModes | None = None, - # TODO: uncomment when Zarr has support - # write_empty_chunks: bool | None = None, safe_chunks: bool = True, append_dim: Hashable | None = None, region: Region = None, encoding: Mapping[Any, Any] | None = None, chunkmanager_store_kwargs: MutableMapping[Any, Any] | None = None, split_every: int | None = None, - **kwargs: Any, ) -> None: """ - Write an Xarray Dataset to a group of an icechunk store. + Write an Xarray object to a group of an Icechunk store. Parameters ---------- - store : MutableMapping, str or path-like, optional - Store or path to directory in local or remote file system. + obj: DataArray or Dataset + Xarray object to write + session : icechunk.Session + Writable Icechunk Session mode : {"w", "w-", "a", "a-", r+", None}, optional Persistence mode: "w" means create (overwrite if exists); "w-" means create (fail if exists); @@ -284,18 +279,50 @@ def to_icechunk( - If ``region`` is set, _all_ variables in a dataset must have at least one dimension in common with the region. Other variables - should be written in a separate single call to ``to_zarr()``. + should be written in a separate single call to ``to_icechunk()``. - Dimensions cannot be included in both ``region`` and ``append_dim`` at the same time. To create empty arrays to fill in with ``region``, use the `XarrayDatasetWriter` directly. """ - writer = XarrayDatasetWriter(dataset, store=store) - writer._open_group(group=group, mode=mode, append_dim=append_dim, region=region) - - # write metadata - writer.write_metadata(encoding) - # write in-memory arrays - writer.write_eager() - # eagerly write dask arrays - writer.write_lazy(chunkmanager_store_kwargs=chunkmanager_store_kwargs) + as_dataset = make_dataset(obj) + with session.allow_pickling(): + store = session.store + writer = XarrayDatasetWriter(as_dataset, store=store, safe_chunks=safe_chunks) + + writer._open_group(group=group, mode=mode, append_dim=append_dim, region=region) + + # write metadata + writer.write_metadata(encoding) + # write in-memory arrays + writer.write_eager() + # eagerly write dask arrays + writer.write_lazy(chunkmanager_store_kwargs=chunkmanager_store_kwargs) + + +@overload +def make_dataset(obj: DataArray) -> Dataset: ... +@overload +def make_dataset(obj: Dataset) -> Dataset: ... +def make_dataset(obj: DataArray | Dataset) -> Dataset: + """Copied from DataArray.to_zarr""" + DATAARRAY_NAME = "__xarray_dataarray_name__" + DATAARRAY_VARIABLE = "__xarray_dataarray_variable__" + + if isinstance(obj, Dataset): + return obj + + assert isinstance(obj, DataArray) + + if obj.name is None: + # If no name is set then use a generic xarray name + dataset = obj.to_dataset(name=DATAARRAY_VARIABLE) + elif obj.name in obj.coords or obj.name in obj.dims: + # The name is the same as one of the coords names, which the netCDF data model + # does not support, so rename it but keep track of the old name + dataset = obj.to_dataset(name=DATAARRAY_VARIABLE) + dataset.attrs[DATAARRAY_NAME] = obj.name + else: + # No problems with the name - so we're fine! + dataset = obj.to_dataset() + return dataset diff --git a/icechunk-python/tests/run_xarray_backends_tests.py b/icechunk-python/tests/run_xarray_backends_tests.py index f8ef96543..876ff774f 100644 --- a/icechunk-python/tests/run_xarray_backends_tests.py +++ b/icechunk-python/tests/run_xarray_backends_tests.py @@ -15,6 +15,10 @@ local_filesystem_storage, s3_storage, ) +from icechunk.xarray import to_icechunk +from xarray.tests.test_backends import ( + TestZarrRegionAuto as ZarrRegionAutoTests, +) from xarray.tests.test_backends import ( ZarrBase, default_zarr_format, # noqa: F401; needed otherwise not discovered @@ -91,3 +95,19 @@ def create_zarr_target(self) -> Generator[IcechunkStore]: session = repo.writable_session("main") with session.allow_pickling(): yield session.store + + +@pytest.mark.filterwarnings("ignore:Failed to open:RuntimeWarning") +class TestIcechunkRegionAuto(ZarrRegionAutoTests): + @contextlib.contextmanager + def create_zarr_target(self) -> Generator[IcechunkStore]: + if zarr.config.config["default_zarr_format"] == 2: + pytest.skip("v2 not supported") + repo = Repository.create(in_memory_storage()) + session = repo.writable_session("main") + yield session.store + + def save(self, target, ds, **kwargs): + # not really important here + kwargs.pop("compute", None) + to_icechunk(ds, session=target.session, **kwargs) diff --git a/icechunk-python/tests/test_dask.py b/icechunk-python/tests/test_dask.py index dff7cb5e4..6138e413d 100644 --- a/icechunk-python/tests/test_dask.py +++ b/icechunk-python/tests/test_dask.py @@ -12,19 +12,12 @@ def test_distributed() -> None: with distributed.Client(): # type: ignore [no-untyped-call] ds = create_test_data().chunk(dim1=3, dim2=4) - with roundtrip(ds, allow_pickling=True) as actual: + with roundtrip(ds) as actual: assert_identical(actual, ds) - # FIXME: this should be nicer! this TypeError is from distributed - with pytest.raises(TypeError): - with roundtrip(ds, allow_pickling=False) as actual: - pass - def test_threaded() -> None: with dask.config.set(scheduler="threads"): ds = create_test_data().chunk(dim1=3, dim2=4) with roundtrip(ds) as actual: assert_identical(actual, ds) - with roundtrip(ds, allow_pickling=False) as actual: - assert_identical(actual, ds) diff --git a/icechunk-python/tests/test_xarray.py b/icechunk-python/tests/test_xarray.py index 2be06fa72..75ff9411b 100644 --- a/icechunk-python/tests/test_xarray.py +++ b/icechunk-python/tests/test_xarray.py @@ -57,14 +57,8 @@ def roundtrip( repo = Repository.create(local_filesystem_storage(tmpdir)) session = repo.writable_session("main") - if allow_pickling: - with session.allow_pickling(): - to_icechunk(data, store=session.store, mode="w") - with xr.open_zarr(session.store, consolidated=False) as ds: - yield ds - - else: - to_icechunk(data, store=session.store, mode="w") + to_icechunk(data, session=session, mode="w") + with session.allow_pickling(): with xr.open_zarr(session.store, consolidated=False) as ds: yield ds