Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/iris/fileformats/netcdf/saver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1742,7 +1742,7 @@ def _create_generic_cf_array_var(
if len(element_dims) == 1:
data_first = data[0]
if is_lazy_data(data_first):
data_first = data_first.compute()
data_first = dask.compute(data_first)
data = list("%- *s" % (string_dimension_depth, data_first))
else:
orig_shape = data.shape
Expand Down Expand Up @@ -2380,7 +2380,7 @@ def complete(self) -> None:
raise ValueError(msg)

# Complete the saves now
self.delayed_completion().compute()
dask.compute(self.delayed_completion())


def save(
Expand Down
16 changes: 11 additions & 5 deletions lib/iris/tests/integration/netcdf/test_delayed_save.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from cf_units import Unit
import dask.array as da
import dask.config
from dask.delayed import Delayed
import distributed
import numpy as np
import pytest
Expand All @@ -20,6 +19,13 @@
from iris.tests.stock import realistic_4d


def are_dask_collections(obj):
result = dask.is_dask_collection(obj)
if not result and hasattr(obj, "__len__"):
result = all([dask.is_dask_collection(item) for item in obj])
return result


class Test__lazy_stream_data:
# Ensure all saves are done with split-atttribute saving,
# -- because some of these tests are sensitive to unexpected warnings.
Expand Down Expand Up @@ -130,7 +136,7 @@ def test_realfile_loadsave_equivalence(self, save_is_delayed, output_path):
result = iris.save(original_cubes, output_path, compute=not save_is_delayed)
if save_is_delayed:
# In this case, must also "complete" the save.
result.compute()
dask.compute(result)
reloaded_cubes = iris.load(output_path)
reloaded_cubes = sorted(reloaded_cubes, key=lambda cube: cube.name())
assert reloaded_cubes == original_cubes
Expand Down Expand Up @@ -188,7 +194,7 @@ def test_scheduler_types(self, output_path, scheduler_type, save_is_delayed):
if not save_is_delayed:
assert result is None
else:
assert isinstance(result, Delayed)
assert are_dask_collections(result)

def test_time_of_writing(self, save_is_delayed, output_path, scheduler_type):
# Check when lazy data is *actually* written :
Expand Down Expand Up @@ -251,7 +257,7 @@ def fetch_masks():

if save_is_delayed:
# Complete the write.
result.compute()
dask.compute(result)

# Re-fetch the lazy arrays. The data should now **not be masked**.
data_mask, coord_mask, ancil_mask, cm_mask = fetch_masks()
Expand All @@ -267,4 +273,4 @@ def test_no_delayed_writes(self, output_path):
cube = self.make_testcube(include_lazy_content=False)
warnings.simplefilter("error")
result = iris.save(cube, output_path, compute=False)
assert isinstance(result, Delayed)
assert are_dask_collections(result)
3 changes: 2 additions & 1 deletion lib/iris/tests/integration/netcdf/test_general.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from unittest import mock
import warnings

import dask
import numpy as np
import numpy.ma as ma
import pytest
Expand Down Expand Up @@ -456,7 +457,7 @@ def test_basic_save(self):
ds.close()

# Complete the delayed saves.
result.compute()
dask.compute(result)

# Check that data now *is* written.
ds = nc.Dataset(filepath_indirect)
Expand Down
43 changes: 27 additions & 16 deletions lib/iris/tests/unit/fileformats/pp/test__interpret_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# importing anything else.
import iris.tests as tests # isort:skip

import contextlib
from copy import deepcopy
from unittest import mock

Expand Down Expand Up @@ -91,7 +92,6 @@ def test_deferred_fix_lbrow_lbnpt(self):
self.assertEqual(f1.lbrow, 3)
self.assertEqual(f1.lbnpt, 4)

@tests.skip_data
def test_landsea_unpacking_uses_dask(self):
# Ensure that the graph of the (lazy) landsea-masked data contains an
# explicit reference to a (lazy) landsea-mask field.
Expand All @@ -112,21 +112,32 @@ def test_landsea_unpacking_uses_dask(self):
lazy_mask_array = landsea_mask.core_data()
lazy_soildata_array = soil_temp.core_data()

# Work out the main dask key for the mask data, as used by 'compute()'.
mask_toplev_key = (lazy_mask_array.name,) + (0,) * lazy_mask_array.ndim
# Get the 'main' calculation entry.
mask_toplev_item = lazy_mask_array.dask[mask_toplev_key]
# This should be a task (a simple fetch).
self.assertTrue(callable(mask_toplev_item))
# Get the key (name) of the array that it fetches.
mask_data_name = mask_toplev_item.args[0].key

# Check that the item this refers to is a PPDataProxy.
self.assertIsInstance(lazy_mask_array.dask[mask_data_name], pp.PPDataProxy)

# Check that the soil-temp graph references the *same* lazy element,
# showing that the mask+data calculation is handled by dask.
self.assertIn(mask_data_name, lazy_soildata_array.dask.keys())
def is_pp_layer(obj):
result = False
if hasattr(obj, "values"):
if len(obj) == 1:
(result,) = [hasattr(v, "_lbpack") for v in obj.values()]
return result

# Find the single reference to the PPDataProxy within the dask graph.
(layer,) = [
lay for lay in lazy_mask_array.dask.layers.values() if is_pp_layer(lay)
]
((layer_key, proxy),) = layer.items()
# Replace the PPDataProxy with a mock.
# By replacing directly within the dask graph, we can prove whether an
# operation is relying on the graph in the expected way.
mock_proxy = mock.MagicMock(spec=pp.PPDataProxy, wraps=proxy)
layer.mapping[layer_key] = mock_proxy

self.assertFalse(mock_proxy.__getitem__.called)
with contextlib.suppress(TypeError):
# Expect this to crash - data realisation is complex, and it is not
# worth mocking this fully.
_ = lazy_soildata_array.compute()
# Proves that the soil data graph includes the mask graph - computing
# the soil data has accessed the mask graph object which we mocked.
self.assertTrue(mock_proxy.__getitem__.called)
Comment on lines +115 to +140
Copy link
Contributor Author

@trexfeathers trexfeathers May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(see previous thread)

This change in approach is more agnostic to Dask's internals (although still somewhat vulnerable). It successfully runs, only failing in a classic test failure rather than falling over.

But all I have managed to do is prove that Dask >=2025.4.0 breaks this test on a philosophical level - not just because the structures have changed.

Results of updated test

fileformats/pp.py Dask version Pass/Fail what this proves
v2.2.0 2025.3.0 Fail ❌
#3255 2025.3.0 Pass ✔ The updated test still detects the case it was designed for
main 2025.3.0 Pass ✔
main 2025.4.0 Fail ❌ The Dask graph for the data payload no longer incorporates the mask graph

Understanding if this is cause for concern - and if so, how to fix things - will be complex. The development team are fully booked on other projects for the foreseeable, so I would suggest a patch release with a dask < 2025.4 pin. @SciTools/iris-devs what are your thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed at length with @pp-mo, who explained that this test was not written to demonstrate the other changes in #3255, but because there was a concern about something else similar happening again.

Our increased experience with Dask in the 6 years since has given us the confidence that:

  1. We now write code that is much less vulnerable to things like Don't nest compute calls #3237
  2. Dask itself has matured, and is less vulnerable to unexpected corner cases

It looks like the refactoring that made this test fail (dask/dask#11736) was technical debt work to make Dask safer and more robust, and is NOT evidence that we have become vulnerable to nested compute() calls.

Based on all of this, we have decided to remove the test.



if __name__ == "__main__":
Expand Down
Loading
Loading