Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ Bug fixes
- Fix error when encoding an empty :py:class:`numpy.datetime64` array
(:issue:`10722`, :pull:`10723`). By `Spencer Clark
<https://github.com/spencerkclark>`_.
- Fix error from ``to_netcdf(..., compute=False)`` when using Dask Distributed
(:issue:`10725`).
By `Stephan Hoyer <https://github.com/shoyer>`_.
- Propagation coordinate attrs in :py:meth:`xarray.Dataset.map` (:issue:`9317`, :pull:`10602`).
By `Justus Magin <https://github.com/keewis>`_.

Expand Down
27 changes: 16 additions & 11 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1858,6 +1858,20 @@ def open_mfdataset(
return combined


def _get_netcdf_autoclose(dataset: Dataset, engine: T_NetcdfEngine) -> bool:
"""Should we close files after each write operations?"""
scheduler = get_dask_scheduler()
have_chunks = any(v.chunks is not None for v in dataset.variables.values())

autoclose = have_chunks and scheduler in ["distributed", "multiprocessing"]
if autoclose and engine == "scipy":
raise NotImplementedError(
f"Writing netCDF files with the {engine} backend "
f"is not currently supported with dask's {scheduler} scheduler"
)
return autoclose


WRITEABLE_STORES: dict[T_NetcdfEngine, Callable] = {
"netcdf4": backends.NetCDF4DataStore.open,
"scipy": backends.ScipyDataStore,
Expand Down Expand Up @@ -2064,16 +2078,7 @@ def to_netcdf(
# sanitize unlimited_dims
unlimited_dims = _sanitize_unlimited_dims(dataset, unlimited_dims)

# handle scheduler specific logic
scheduler = get_dask_scheduler()
have_chunks = any(v.chunks is not None for v in dataset.variables.values())

autoclose = have_chunks and scheduler in ["distributed", "multiprocessing"]
if autoclose and engine == "scipy":
raise NotImplementedError(
f"Writing netCDF files with the {engine} backend "
f"is not currently supported with dask's {scheduler} scheduler"
)
autoclose = _get_netcdf_autoclose(dataset, engine)

if path_or_file is None:
if not compute:
Expand Down Expand Up @@ -2116,7 +2121,7 @@ def to_netcdf(
writes = writer.sync(compute=compute)

finally:
if not multifile:
if not multifile and not autoclose: # type: ignore[redundant-expr,unused-ignore]
if compute:
store.close()
else:
Expand Down
8 changes: 7 additions & 1 deletion xarray/tests/test_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,13 @@ def tmp_netcdf_filename(tmpdir):


@pytest.mark.parametrize("engine,nc_format", ENGINES_AND_FORMATS)
@pytest.mark.parametrize("compute", [True, False])
def test_dask_distributed_netcdf_roundtrip(
loop, # noqa: F811
tmp_netcdf_filename,
engine,
nc_format,
compute,
):
if engine not in ENGINES:
pytest.skip("engine not available")
Expand All @@ -107,7 +109,11 @@ def test_dask_distributed_netcdf_roundtrip(
)
return

original.to_netcdf(tmp_netcdf_filename, engine=engine, format=nc_format)
result = original.to_netcdf(
tmp_netcdf_filename, engine=engine, format=nc_format, compute=compute
)
if not compute:
result.compute()

with xr.open_dataset(
tmp_netcdf_filename, chunks=chunks, engine=engine
Expand Down
Loading