diff --git a/docs/src/whatsnew/3.12.rst b/docs/src/whatsnew/3.12.rst index 62741ad65f..81f1c37533 100644 --- a/docs/src/whatsnew/3.12.rst +++ b/docs/src/whatsnew/3.12.rst @@ -198,6 +198,9 @@ v3.12.1 (30 Apr 2025) versions of dask fix the bug casuing the pin. Introduced a minimum pin (2025.1.0) to avoid this bug. (:pull:`6342`) +#. `@trexfeathers`_ refactored Iris loading and saving to make it compatible + with Dask version ``2025.4.0`` and above. (:pull:`6451`) + 📚 Documentation ================ diff --git a/lib/iris/fileformats/netcdf/saver.py b/lib/iris/fileformats/netcdf/saver.py index a195fe9d83..4a2474ba9b 100644 --- a/lib/iris/fileformats/netcdf/saver.py +++ b/lib/iris/fileformats/netcdf/saver.py @@ -1742,7 +1742,7 @@ def _create_generic_cf_array_var( if len(element_dims) == 1: data_first = data[0] if is_lazy_data(data_first): - data_first = data_first.compute() + data_first = dask.compute(data_first) data = list("%- *s" % (string_dimension_depth, data_first)) else: orig_shape = data.shape @@ -2380,7 +2380,7 @@ def complete(self) -> None: raise ValueError(msg) # Complete the saves now - self.delayed_completion().compute() + dask.compute(self.delayed_completion()) def save( diff --git a/lib/iris/tests/integration/netcdf/test_delayed_save.py b/lib/iris/tests/integration/netcdf/test_delayed_save.py index 2869924dce..ecbe2caf96 100644 --- a/lib/iris/tests/integration/netcdf/test_delayed_save.py +++ b/lib/iris/tests/integration/netcdf/test_delayed_save.py @@ -9,7 +9,6 @@ from cf_units import Unit import dask.array as da import dask.config -from dask.delayed import Delayed import distributed import numpy as np import pytest @@ -20,6 +19,13 @@ from iris.tests.stock import realistic_4d +def are_dask_collections(obj): + result = dask.is_dask_collection(obj) + if not result and hasattr(obj, "__len__"): + result = all([dask.is_dask_collection(item) for item in obj]) + return result + + class Test__lazy_stream_data: # Ensure all saves are done with split-atttribute saving, # -- because some of these tests are sensitive to unexpected warnings. @@ -130,7 +136,7 @@ def test_realfile_loadsave_equivalence(self, save_is_delayed, output_path): result = iris.save(original_cubes, output_path, compute=not save_is_delayed) if save_is_delayed: # In this case, must also "complete" the save. - result.compute() + dask.compute(result) reloaded_cubes = iris.load(output_path) reloaded_cubes = sorted(reloaded_cubes, key=lambda cube: cube.name()) assert reloaded_cubes == original_cubes @@ -188,7 +194,7 @@ def test_scheduler_types(self, output_path, scheduler_type, save_is_delayed): if not save_is_delayed: assert result is None else: - assert isinstance(result, Delayed) + assert are_dask_collections(result) def test_time_of_writing(self, save_is_delayed, output_path, scheduler_type): # Check when lazy data is *actually* written : @@ -251,7 +257,7 @@ def fetch_masks(): if save_is_delayed: # Complete the write. - result.compute() + dask.compute(result) # Re-fetch the lazy arrays. The data should now **not be masked**. data_mask, coord_mask, ancil_mask, cm_mask = fetch_masks() @@ -267,4 +273,4 @@ def test_no_delayed_writes(self, output_path): cube = self.make_testcube(include_lazy_content=False) warnings.simplefilter("error") result = iris.save(cube, output_path, compute=False) - assert isinstance(result, Delayed) + assert are_dask_collections(result) diff --git a/lib/iris/tests/integration/netcdf/test_general.py b/lib/iris/tests/integration/netcdf/test_general.py index 896f1d5d6c..748a127a9a 100644 --- a/lib/iris/tests/integration/netcdf/test_general.py +++ b/lib/iris/tests/integration/netcdf/test_general.py @@ -16,6 +16,7 @@ from unittest import mock import warnings +import dask import numpy as np import numpy.ma as ma import pytest @@ -456,7 +457,7 @@ def test_basic_save(self): ds.close() # Complete the delayed saves. - result.compute() + dask.compute(result) # Check that data now *is* written. ds = nc.Dataset(filepath_indirect) diff --git a/lib/iris/tests/unit/fileformats/pp/test__interpret_field.py b/lib/iris/tests/unit/fileformats/pp/test__interpret_field.py index 15c3f341f9..68520300b6 100644 --- a/lib/iris/tests/unit/fileformats/pp/test__interpret_field.py +++ b/lib/iris/tests/unit/fileformats/pp/test__interpret_field.py @@ -13,7 +13,6 @@ import numpy as np -import iris import iris.fileformats.pp as pp @@ -91,43 +90,6 @@ def test_deferred_fix_lbrow_lbnpt(self): self.assertEqual(f1.lbrow, 3) self.assertEqual(f1.lbnpt, 4) - @tests.skip_data - def test_landsea_unpacking_uses_dask(self): - # Ensure that the graph of the (lazy) landsea-masked data contains an - # explicit reference to a (lazy) landsea-mask field. - # Otherwise its compute() will need to invoke another compute(). - # See https://github.com/SciTools/iris/issues/3237 - - # This is too complex to explore in a mock-ist way, so let's load a - # tiny bit of real data ... - testfile_path = tests.get_data_path( - ["FF", "landsea_masked", "testdata_mini_lsm.ff"] - ) - landsea_mask, soil_temp = iris.load_cubes( - testfile_path, ("land_binary_mask", "soil_temperature") - ) - - # Now check that the soil-temp dask graph correctly references the - # landsea mask, in its dask graph. - lazy_mask_array = landsea_mask.core_data() - lazy_soildata_array = soil_temp.core_data() - - # Work out the main dask key for the mask data, as used by 'compute()'. - mask_toplev_key = (lazy_mask_array.name,) + (0,) * lazy_mask_array.ndim - # Get the 'main' calculation entry. - mask_toplev_item = lazy_mask_array.dask[mask_toplev_key] - # This should be a task (a simple fetch). - self.assertTrue(callable(mask_toplev_item)) - # Get the key (name) of the array that it fetches. - mask_data_name = mask_toplev_item.args[0].key - - # Check that the item this refers to is a PPDataProxy. - self.assertIsInstance(lazy_mask_array.dask[mask_data_name], pp.PPDataProxy) - - # Check that the soil-temp graph references the *same* lazy element, - # showing that the mask+data calculation is handled by dask. - self.assertIn(mask_data_name, lazy_soildata_array.dask.keys()) - if __name__ == "__main__": tests.main()