Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade xarray.to_icechunk and related docs #633

Merged
merged 38 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
4a1628f
Support region="auto" in to_xarray
dcherian Jan 27, 2025
2354738
fix
dcherian Jan 27, 2025
5df083f
fix agian
dcherian Jan 27, 2025
76d1362
lint
dcherian Jan 27, 2025
a1c9479
Support dataarray
dcherian Jan 27, 2025
4471081
Support region="auto"
dcherian Jan 27, 2025
75fa55a
fix
dcherian Jan 27, 2025
cc9791d
write_empty_chunks
dcherian Jan 27, 2025
39594d7
Revert "write_empty_chunks"
dcherian Jan 27, 2025
0628f73
Remove write_empty_chunks
dcherian Jan 27, 2025
d7042d1
lint
dcherian Jan 27, 2025
5f1a9e0
type
dcherian Jan 27, 2025
69495d9
Support safe_chunks
dcherian Jan 27, 2025
cd8ac54
skip one import
dcherian Jan 27, 2025
e7f5eeb
remove unnecessary **kwargs
dcherian Jan 27, 2025
bd25331
full compute support
dcherian Jan 27, 2025
4e9ad54
Minimal support for compute kwarg
dcherian Jan 27, 2025
8603947
Revert "Minimal support for compute kwarg"
dcherian Jan 28, 2025
8402014
Add distributed writes docs
dcherian Jan 28, 2025
6672ae9
sidestep compute=False in tests
dcherian Jan 28, 2025
1848b22
Revert "full compute support"
dcherian Jan 28, 2025
29cc317
Move to distributed.md
dcherian Jan 28, 2025
ec8c486
fix
dcherian Jan 28, 2025
91e4d50
Merge branch 'main' into region-auto
dcherian Jan 31, 2025
ee5250f
Merge branch 'main' into region-auto
dcherian Jan 31, 2025
0894777
cleanup after xarray release
dcherian Jan 31, 2025
e2c83de
update docs
dcherian Jan 31, 2025
ad4a77c
Edits
dcherian Jan 31, 2025
4145223
more edits
dcherian Jan 31, 2025
502fab0
few more edits
dcherian Jan 31, 2025
f3f7682
edit
dcherian Jan 31, 2025
d0a3d9e
fix
dcherian Jan 31, 2025
b4a90d6
to_icehunk takes session
dcherian Jan 31, 2025
ad96ba4
more edits
dcherian Jan 31, 2025
7c244f5
add faq
dcherian Jan 31, 2025
eb3aeb3
Update docs/docs/icechunk-python/parallel.md
dcherian Jan 31, 2025
e27c668
Merge branch 'main' into region-auto
dcherian Jan 31, 2025
baf8344
lint
dcherian Jan 31, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 42 additions & 3 deletions docs/docs/icechunk-python/dask.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,47 @@ Finally commit your changes!
icechunk_session.commit("wrote a dask array!")
```

## Icechunk + Dask + Xarray

### Simple
## Distributed

In distributed contexts where the Session, and Zarr Array objects are sent across the network,
you must opt-in to successful pickling of a writable store.

[`icechunk.dask.store_dask`](./reference.md#icechunk.dask.store_dask) takes care of the hard bit of
merging Sessions but it is required that you opt-in to pickling prior to creating the target Zarr array objects.

Here is an example:
```python
import icechunk.dask

zarr_chunks = (10, 10)
with icechunk_session.allow_pickling():
group = zarr.group(store=icechunk_sesion.store, overwrite=True)

zarray = group.create_array(
"array",
shape=shape,
chunks=zarr_chunks,
dtype="f8",
fill_value=float("nan"),
)
icechunk.dask.store_dask(icechunk_session, sources=[dask_array], targets=[zarray])
icechunk_session.commit("wrote a dask array!")
```

## Icechunk + Dask + Xarray

The [`icechunk.xarray.to_icechunk`](./reference.md#icechunk.xarray.to_icechunk) is functionally identical to Xarray's
[`Dataset.to_zarr`](https://docs.xarray.dev/en/stable/generated/xarray.Dataset.to_zarr.html), including many of the same keyword arguments.
Notably the ``compute`` kwarg is not supported.

!!! warning

When using Xarray, Icechunk in a Dask Distributed context, you *must* use `to_icechunk` so that the Session has a record
of the writes that are executed remotely. Using `to_zarr` in such cases, will result in the local Session having no
record of remote writes, and a meaningless commit.


Now roundtrip an xarray dataset
```python
import icechunk.xarray
Expand All @@ -84,7 +117,7 @@ import xarray as xr
# Assuming you have a valid writable Session named icechunk_session
dataset = xr.tutorial.open_dataset("rasm", chunks={"time": 1}).isel(time=slice(24))

icechunk.xarray.to_icechunk(dataset, store=icechunk_session.store))
icechunk.xarray.to_icechunk(dataset, store=icechunk_session.store)

roundtripped = xr.open_zarr(icechunk_session.store, consolidated=False)
dataset.identical(roundtripped)
Expand All @@ -94,3 +127,9 @@ Finally commit your changes!
```python
icechunk_session.commit("wrote an Xarray dataset!")
```

In a distributed context you will have to opt-in to pickling:
```python
with icechunk_session.allow_pickling():
icechunk.xarray.to_icechunk(dataset, store=icechunk_session.store)
```
129 changes: 129 additions & 0 deletions docs/docs/icechunk-python/parallel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# Parallel Writes

A common pattern with large distributed write jobs is to first initialize the dataset on a disk
with all appropriate metadata, and any coordinate variables. Following this a large write job
is kicked off in a distributed setting, where each worker is responsible for an independent
"region" of the output.


## Why is Icechunk different from any other Zarr store?

The reason is that unlike Zarr, Icechunk is a "stateful" store. The Session object keeps a record of all writes, that is then
bundled together in a commit. Thus `Session.commit` must be executed on a Session object that knows about all writes,
including those executed remotely in a multi-processing or any other remote execution context.

## Example
Here is how you can execute such writes with Icechunk, illustrate with a `ThreadPoolExecutor`.
First read some example data, and create an Icechunk Repository.
```python
import xarray as xr
import tempfile
from icechunk import Repository, local_filesystem_storage

ds = xr.tutorial.open_dataset("rasm").isel(time=slice(24))
repo = Repository.create(local_filesystem_storage(tempfile.mkdtemp()))
session = repo.writable_session("main")
```
We will orchestrate so that each task writes one timestep.
This is an arbitrary choice but determines what we set for the Zarr chunk size.
```python
chunks = {1 if dim == "time" else ds.sizes[dim] for dim in ds.Tair.dims}
```

Initialize the dataset using [`Dataset.to_zarr`](https://docs.xarray.dev/en/stable/generated/xarray.Dataset.to_zarr.html)
and `compute=False`, this will NOT write any chunked array data, but will write all array metadata, and any
in-memory arrays (only `time` in this case).
```python
ds.to_zarr(session.store, compute=False, encoding={"Tair": {"chunks": chunks}}, mode="w")
# this commit is optional, but may be useful in your workflow
session.commit("initialize store")
```

## Multi-threading

First define a function that constitutes one "write task".
```python
from icechunk import Session

def write_timestamp(*, itime: int, session: Session) -> None:
# pass a list to isel to preserve the time dimension
ds = xr.tutorial.open_dataset("rasm").isel(time=[itime])
# region="auto" tells Xarray to infer which "region" of the output arrays to write to.
ds.to_zarr(session.store, region="auto", consolidated=False)
```

Now execute the writes.
```python
from concurrent.futures import ThreadPoolExecutor, wait
from icechunk.distributed import merge_sessions

session = repo.writable_session("main")
with ThreadPoolExecutor() as executor:
# submit the writes
futures = [executor.submit(write_timestamp, itime=i, session=session) for i in range(ds.sizes["time"])]
wait(futures)

session.commit("finished writes")
```

Verify that the writes worked as expected:
```python
ondisk = xr.open_zarr(repo.readonly_session(branch="main").store, consolidated=False)
xr.testing.assert_identical(ds, ondisk)
```

## Distributed writes

Any task execution framework (e.g. `ProcessPoolExecutor`, joblib, lithops, dask, ray, etc.)
dcherian marked this conversation as resolved.
Show resolved Hide resolved
can be used instead of the `ThreadPoolExecutor`. However such workloads should account for
Icehunk being a "stateful" store that records changes executed in a write session.

There are three key points to keep in mind:
1. The `write_task` function *must* return the `Session`. It contains a record of the changes executed by this task.
These changes *must* be manually communicated back to the coordinating process, since each of the distributed processes
are working with their own independent `Session` instance.
2. Icechunk requires that users opt-in to pickling a *writable* `Session` using the `Session.allow_pickling()` context manager,
to remind the user that distributed writes with Icechunk require care.
3. The user *must* manually merge the Session objects to create a meaningful commit.

First we modify `write_task` to return the `Session`:
```python
from icechunk import Session

def write_timestamp(*, itime: int, session: Session) -> Session:
# pass a list to isel to preserve the time dimension
ds = xr.tutorial.open_dataset("rasm").isel(time=[itime])
# region="auto" tells Xarray to infer which "region" of the output arrays to write to.
ds.to_zarr(session.store, region="auto", consolidated=False)
return session
```

Now we issue write tasks within the [`session.allow_pickling()`](./reference/md#icechunk.Session.allow_pickling) context, gather the Sessions from individual tasks,
merge them, and make a successful commit.

```python
from concurrent.futures import ProcessPoolExecutor
from icechunk.distributed import merge_sessions

session = repo.writable_session("main")
with ProcessPoolExecutor() as executor:
# opt-in to successful pickling of a writable session
with session.allow_pickling():
# submit the writes
futures = [
executor.submit(write_timestamp, itime=i, session=session)
for i in range(ds.sizes["time"])
]
# grab the Session objects from each individual write task
sessions = [f.result() for f in futures]

# manually merge the remote sessions in to the local session
session = merge_sessions(session, *sessions)
session.commit("finished writes")
```

Verify that the writes worked as expected:
```python
ondisk = xr.open_zarr(repo.readonly_session(branch="main").store, consolidated=False)
xr.testing.assert_identical(ds, ondisk)
```
16 changes: 13 additions & 3 deletions docs/docs/icechunk-python/xarray.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Icechunk was designed to work seamlessly with Xarray. Xarray users can read and
write data to Icechunk using [`xarray.open_zarr`](https://docs.xarray.dev/en/latest/generated/xarray.open_zarr.html#xarray.open_zarr)
and [`xarray.Dataset.to_zarr`](https://docs.xarray.dev/en/latest/generated/xarray.Dataset.to_zarr.html#xarray.Dataset.to_zarr).
and `icechunk.xarray.to_icechunk` methods.

!!! warning

Expand All @@ -12,6 +12,16 @@ and [`xarray.Dataset.to_zarr`](https://docs.xarray.dev/en/latest/generated/xarra
pip install "xarray>=2025.1.1"
```

!!! note "`to_icechunk` vs `to_zarr`"

[`xarray.Dataset.to_zarr`](https://docs.xarray.dev/en/latest/generated/xarray.Dataset.to_zarr.html#xarray.Dataset.to_zarr)
and [`to_icechunk`](./reference.md#icechunk.xarray.to_icechunk) are nearly functionally identical. In a a distributed context, e.g.
dcherian marked this conversation as resolved.
Show resolved Hide resolved
writes orchestrated with `multiprocesssing` or a `dask.distributed.Client` and `dask.array`, you *must* use `to_icechunk`.
This will ensure that you can execute a commit that successfully records all remote writes.
See [these docs on orchestrating parallel writes](./parallel.md) and [these docs on dask.array with distributed](./dask.md#icechunk-dask-xarray)
for more.


In this example, we'll explain how to create a new Icechunk repo, write some sample data
to it, and append data a second block of data using Icechunk's version control features.

Expand Down Expand Up @@ -75,7 +85,7 @@ session = repo.writable_session("main")
Writing Xarray data to Icechunk is as easy as calling `Dataset.to_zarr`:

```python
ds1.to_zarr(session.store, zarr_format=3, consolidated=False)
ds1.to_icechunk(session.store, zarr_format=3, consolidated=False)
```

!!! note
Expand All @@ -101,7 +111,7 @@ this reason. Again, we'll use `Dataset.to_zarr`, this time with `append_dim='tim
```python
# we have to get a new session after committing
session = repo.writable_session("main")
ds2.to_zarr(session.store, append_dim='time')
ds2.to_icechunk(session.store, append_dim='time')
```

And then we'll commit the changes:
Expand Down
1 change: 1 addition & 0 deletions docs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ nav:
- Quickstart: icechunk-python/quickstart.md
- Configuration: icechunk-python/configuration.md
- Xarray: icechunk-python/xarray.md
- Parallel Writes: icechunk-python/parallel.md
- Dask: icechunk-python/dask.md
- Version Control: icechunk-python/version-control.md
- Virtual Datasets: icechunk-python/virtual.md
Expand Down
2 changes: 1 addition & 1 deletion icechunk-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ test = [
"ruff",
"dask>=2024.11.0",
"distributed>=2024.11.0",
"xarray>=2025.01.1",
"xarray>=2025.01.2",
"hypothesis",
"pandas-stubs",
"boto3-stubs[s3]",
Expand Down
4 changes: 4 additions & 0 deletions icechunk-python/python/icechunk/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ def store_dask(
merge the changesets corresponding to each write task. The `store` object
passed in will be updated in-place with the fully merged changeset.

For distributed or multi-processing writes, this method must be called within
the `Session.allow_pickling()` context. All Zarr arrays in `targets` must also
be created within this context since they contain a reference to the Session.

Parameters
----------
store: IcechunkStore
Expand Down
54 changes: 41 additions & 13 deletions icechunk-python/python/icechunk/xarray.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#!/usr/bin/env python3
import importlib
from collections.abc import Hashable, Mapping, MutableMapping
from dataclasses import dataclass, field
from typing import Any, Literal
from typing import Any, Literal, overload

import numpy as np
from packaging.version import Version
Expand All @@ -13,7 +12,7 @@
from icechunk.dask import stateful_store_reduce
from icechunk.distributed import extract_session, merge_sessions
from icechunk.vendor.xarray import _choose_default_mode
from xarray import Dataset
from xarray import DataArray, Dataset
from xarray.backends.common import ArrayWriter
from xarray.backends.zarr import ZarrStore

Expand Down Expand Up @@ -81,8 +80,6 @@ class XarrayDatasetWriter:
store: IcechunkStore = field(kw_only=True)

safe_chunks: bool = field(kw_only=True, default=True)
# TODO: uncomment when Zarr has support
# write_empty_chunks: bool = field(kw_only=True, default=True)
Copy link
Contributor Author

@dcherian dcherian Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now deprecated, and users are supposed to set it in the Zarr config. So we don't need the kwarg


_initialized: bool = field(default=False, repr=False)

Expand Down Expand Up @@ -115,13 +112,12 @@ def _open_group(
append_dim=append_dim,
write_region=region,
safe_chunks=self.safe_chunks,
# TODO: uncomment when Zarr has support
# write_empty=self.write_empty_chunks,
synchronizer=None,
consolidated=False,
consolidate_on_close=False,
zarr_version=None,
)
self.dataset = self.xarray_store._validate_and_autodetect_region(self.dataset)

def write_metadata(self, encoding: Mapping[Any, Any] | None = None) -> None:
"""
Expand Down Expand Up @@ -194,26 +190,28 @@ def write_lazy(


def to_icechunk(
dataset: Dataset,
obj: DataArray | Dataset,
store: IcechunkStore,
*,
group: str | None = None,
mode: ZarrWriteModes | None = None,
# TODO: uncomment when Zarr has support
# write_empty_chunks: bool | None = None,
safe_chunks: bool = True,
append_dim: Hashable | None = None,
region: Region = None,
encoding: Mapping[Any, Any] | None = None,
chunkmanager_store_kwargs: MutableMapping[Any, Any] | None = None,
split_every: int | None = None,
**kwargs: Any,
) -> None:
"""
Write an Xarray Dataset to a group of an icechunk store.
Write an Xarray object to a group of an icechunk store.

For distributed or multi-processing writes, this method must be passed an IcechunkStore
object that is created within a `Session.allow_pickling()` context.

Parameters
----------
obj: DataArray or Dataset
Xarray object to write
store : MutableMapping, str or path-like, optional
Store or path to directory in local or remote file system.
mode : {"w", "w-", "a", "a-", r+", None}, optional
Expand Down Expand Up @@ -289,7 +287,9 @@ def to_icechunk(
``append_dim`` at the same time. To create empty arrays to fill
in with ``region``, use the `XarrayDatasetWriter` directly.
"""
writer = XarrayDatasetWriter(dataset, store=store)

as_dataset = make_dataset(obj)
writer = XarrayDatasetWriter(as_dataset, store=store, safe_chunks=safe_chunks)

writer._open_group(group=group, mode=mode, append_dim=append_dim, region=region)

Expand All @@ -299,3 +299,31 @@ def to_icechunk(
writer.write_eager()
# eagerly write dask arrays
writer.write_lazy(chunkmanager_store_kwargs=chunkmanager_store_kwargs)


@overload
def make_dataset(obj: DataArray) -> Dataset: ...
@overload
def make_dataset(obj: Dataset) -> Dataset: ...
def make_dataset(obj: DataArray | Dataset) -> Dataset:
"""Copied from DataArray.to_zarr"""
DATAARRAY_NAME = "__xarray_dataarray_name__"
DATAARRAY_VARIABLE = "__xarray_dataarray_variable__"

if isinstance(obj, Dataset):
return obj

assert isinstance(obj, DataArray)

if obj.name is None:
# If no name is set then use a generic xarray name
dataset = obj.to_dataset(name=DATAARRAY_VARIABLE)
elif obj.name in obj.coords or obj.name in obj.dims:
# The name is the same as one of the coords names, which the netCDF data model
# does not support, so rename it but keep track of the old name
dataset = obj.to_dataset(name=DATAARRAY_VARIABLE)
dataset.attrs[DATAARRAY_NAME] = obj.name
else:
# No problems with the name - so we're fine!
dataset = obj.to_dataset()
return dataset
Loading