Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/iris/fileformats/netcdf/saver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1742,7 +1742,7 @@ def _create_generic_cf_array_var(
if len(element_dims) == 1:
data_first = data[0]
if is_lazy_data(data_first):
data_first = data_first.compute()
data_first = dask.compute(data_first)
data = list("%- *s" % (string_dimension_depth, data_first))
else:
orig_shape = data.shape
Expand Down Expand Up @@ -2380,7 +2380,7 @@ def complete(self) -> None:
raise ValueError(msg)

# Complete the saves now
self.delayed_completion().compute()
dask.compute(self.delayed_completion())


def save(
Expand Down
16 changes: 11 additions & 5 deletions lib/iris/tests/integration/netcdf/test_delayed_save.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from cf_units import Unit
import dask.array as da
import dask.config
from dask.delayed import Delayed
import distributed
import numpy as np
import pytest
Expand All @@ -20,6 +19,13 @@
from iris.tests.stock import realistic_4d


def are_dask_collections(obj):
result = dask.is_dask_collection(obj)
if not result and hasattr(obj, "__len__"):
result = all([dask.is_dask_collection(item) for item in obj])
return result


class Test__lazy_stream_data:
# Ensure all saves are done with split-atttribute saving,
# -- because some of these tests are sensitive to unexpected warnings.
Expand Down Expand Up @@ -130,7 +136,7 @@ def test_realfile_loadsave_equivalence(self, save_is_delayed, output_path):
result = iris.save(original_cubes, output_path, compute=not save_is_delayed)
if save_is_delayed:
# In this case, must also "complete" the save.
result.compute()
dask.compute(result)
reloaded_cubes = iris.load(output_path)
reloaded_cubes = sorted(reloaded_cubes, key=lambda cube: cube.name())
assert reloaded_cubes == original_cubes
Expand Down Expand Up @@ -188,7 +194,7 @@ def test_scheduler_types(self, output_path, scheduler_type, save_is_delayed):
if not save_is_delayed:
assert result is None
else:
assert isinstance(result, Delayed)
assert are_dask_collections(result)

def test_time_of_writing(self, save_is_delayed, output_path, scheduler_type):
# Check when lazy data is *actually* written :
Expand Down Expand Up @@ -251,7 +257,7 @@ def fetch_masks():

if save_is_delayed:
# Complete the write.
result.compute()
dask.compute(result)

# Re-fetch the lazy arrays. The data should now **not be masked**.
data_mask, coord_mask, ancil_mask, cm_mask = fetch_masks()
Expand All @@ -267,4 +273,4 @@ def test_no_delayed_writes(self, output_path):
cube = self.make_testcube(include_lazy_content=False)
warnings.simplefilter("error")
result = iris.save(cube, output_path, compute=False)
assert isinstance(result, Delayed)
assert are_dask_collections(result)
3 changes: 2 additions & 1 deletion lib/iris/tests/integration/netcdf/test_general.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from unittest import mock
import warnings

import dask
import numpy as np
import numpy.ma as ma
import pytest
Expand Down Expand Up @@ -456,7 +457,7 @@ def test_basic_save(self):
ds.close()

# Complete the delayed saves.
result.compute()
dask.compute(result)

# Check that data now *is* written.
ds = nc.Dataset(filepath_indirect)
Expand Down
38 changes: 0 additions & 38 deletions lib/iris/tests/unit/fileformats/pp/test__interpret_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import numpy as np

import iris
import iris.fileformats.pp as pp


Expand Down Expand Up @@ -91,43 +90,6 @@ def test_deferred_fix_lbrow_lbnpt(self):
self.assertEqual(f1.lbrow, 3)
self.assertEqual(f1.lbnpt, 4)

@tests.skip_data
def test_landsea_unpacking_uses_dask(self):
Comment thread
trexfeathers marked this conversation as resolved.
# Ensure that the graph of the (lazy) landsea-masked data contains an
# explicit reference to a (lazy) landsea-mask field.
# Otherwise its compute() will need to invoke another compute().
# See https://github.com/SciTools/iris/issues/3237

# This is too complex to explore in a mock-ist way, so let's load a
# tiny bit of real data ...
testfile_path = tests.get_data_path(
["FF", "landsea_masked", "testdata_mini_lsm.ff"]
)
landsea_mask, soil_temp = iris.load_cubes(
testfile_path, ("land_binary_mask", "soil_temperature")
)

# Now check that the soil-temp dask graph correctly references the
# landsea mask, in its dask graph.
lazy_mask_array = landsea_mask.core_data()
lazy_soildata_array = soil_temp.core_data()

# Work out the main dask key for the mask data, as used by 'compute()'.
mask_toplev_key = (lazy_mask_array.name,) + (0,) * lazy_mask_array.ndim
# Get the 'main' calculation entry.
mask_toplev_item = lazy_mask_array.dask[mask_toplev_key]
# This should be a task (a simple fetch).
self.assertTrue(callable(mask_toplev_item))
# Get the key (name) of the array that it fetches.
mask_data_name = mask_toplev_item.args[0].key

# Check that the item this refers to is a PPDataProxy.
self.assertIsInstance(lazy_mask_array.dask[mask_data_name], pp.PPDataProxy)

# Check that the soil-temp graph references the *same* lazy element,
# showing that the mask+data calculation is handled by dask.
self.assertIn(mask_data_name, lazy_soildata_array.dask.keys())


if __name__ == "__main__":
tests.main()
Loading