diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index 3fcda5b1f3..e96f24da86 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -25,6 +25,7 @@ import dask.array as da import numpy as np +import numpy.ma as ma def is_lazy_data(data): @@ -40,11 +41,45 @@ def is_lazy_data(data): return result +# A magic value, borrowed from biggus +_MAX_CHUNK_SIZE = 8 * 1024 * 1024 * 2 + + +def as_lazy_data(data, chunks=_MAX_CHUNK_SIZE): + """ + Convert the input array `data` to a lazy dask array. + + Args: + + * data: + An array. This will be converted to a lazy dask array. + + Kwargs: + + * chunks: + Describes how the created dask array should be split up. Defaults to a + value first defined in biggus (being `8 * 1024 * 1024 * 2`). + For more information see + http://dask.pydata.org/en/latest/array-creation.html#chunks. + + Returns: + The input array converted to a lazy dask array. + + """ + if not is_lazy_data(data): + if isinstance(data, ma.MaskedArray): + data = array_masked_to_nans(data) + data = data.data + data = da.from_array(data, chunks=chunks) + return data + + def array_masked_to_nans(array, mask=None): """ - Convert a masked array to a normal array with NaNs at masked points. + Convert a masked array to an `ndarray` with NaNs at masked points. This is used for dask integration, as dask does not support masked arrays. Note that any fill value will be lost. + """ if mask is None: mask = array.mask diff --git a/lib/iris/_merge.py b/lib/iris/_merge.py index eb6a4811ac..bb943b9415 100644 --- a/lib/iris/_merge.py +++ b/lib/iris/_merge.py @@ -33,7 +33,7 @@ import numpy as np import numpy.ma as ma -from iris._lazy_data import is_lazy_data, array_masked_to_nans +from iris._lazy_data import array_masked_to_nans, as_lazy_data, is_lazy_data import iris.cube import iris.coords import iris.exceptions @@ -1231,11 +1231,7 @@ def merge(self, unique=True): if is_lazy_data(data): all_have_data = False else: - if isinstance(data, ma.MaskedArray): - if ma.is_masked(data): - data = array_masked_to_nans(data) - data = data.data - data = da.from_array(data, chunks=data.shape) + data = as_lazy_data(data, chunks=data.shape) stack[nd_index] = data merged_data = _multidim_daskstack(stack) diff --git a/lib/iris/cube.py b/lib/iris/cube.py index 9b66208490..f59baca313 100644 --- a/lib/iris/cube.py +++ b/lib/iris/cube.py @@ -42,7 +42,7 @@ import iris._concatenate import iris._constraints from iris._deprecation import warn_deprecated -from iris._lazy_data import is_lazy_data, array_masked_to_nans +from iris._lazy_data import array_masked_to_nans, as_lazy_data, is_lazy_data import iris._merge import iris.analysis from iris.analysis.cartography import wrap_lons @@ -1658,10 +1658,7 @@ def lazy_data(self): result = self._dask_array else: data = self._numpy_array - if isinstance(data, ma.masked_array): - data = array_masked_to_nans(data) - data = data.data - result = da.from_array(data, chunks=data.shape) + result = as_lazy_data(data) return result @property diff --git a/lib/iris/fileformats/_pyke_rules/fc_rules_cf.krb b/lib/iris/fileformats/_pyke_rules/fc_rules_cf.krb index b4696518d5..3bbb252887 100644 --- a/lib/iris/fileformats/_pyke_rules/fc_rules_cf.krb +++ b/lib/iris/fileformats/_pyke_rules/fc_rules_cf.krb @@ -1003,7 +1003,6 @@ fc_extras import warnings import cf_units - import dask.array as da import netCDF4 import numpy as np import numpy.ma as ma @@ -1018,6 +1017,7 @@ fc_extras import iris.exceptions import iris.std_names import iris.util + from iris._lazy_data import as_lazy_data # @@ -1630,7 +1630,7 @@ fc_extras proxy = iris.fileformats.netcdf.NetCDFDataProxy( cf_var.shape, dtype, engine.filename, cf_var.cf_name, fill_value) - return da.from_array(proxy, chunks=proxy.shape) + return as_lazy_data(proxy, chunks=proxy.shape) # Get any coordinate point data. if isinstance(cf_coord_var, cf.CFLabelVariable): @@ -1701,7 +1701,7 @@ fc_extras proxy = iris.fileformats.netcdf.NetCDFDataProxy( cf_var.shape, dtype, engine.filename, cf_var.cf_name, fill_value) - return da.from_array(proxy, chunks=proxy.shape) + return as_lazy_data(proxy, chunks=proxy.shape) data = cf_var_as_array(cf_cm_attr) diff --git a/lib/iris/fileformats/netcdf.py b/lib/iris/fileformats/netcdf.py index ee68955057..edb54abe74 100644 --- a/lib/iris/fileformats/netcdf.py +++ b/lib/iris/fileformats/netcdf.py @@ -38,7 +38,6 @@ import warnings import biggus -import dask.array as da import netCDF4 import numpy as np import numpy.ma as ma @@ -57,7 +56,7 @@ import iris.fileformats._pyke_rules import iris.io import iris.util -from iris._lazy_data import array_masked_to_nans +from iris._lazy_data import array_masked_to_nans, as_lazy_data # Show Pyke inference engine statistics. DEBUG = False @@ -508,7 +507,7 @@ def _load_cube(engine, cf, cf_var, filename): proxy = NetCDFDataProxy(cf_var.shape, dummy_data.dtype, filename, cf_var.cf_name, fill_value) - data = da.from_array(proxy, chunks=100) + data = as_lazy_data(proxy, chunks=cf_var.shape) cube = iris.cube.Cube(data, fill_value=fill_value, dtype=dummy_data.dtype) # Reset the pyke inference engine. diff --git a/lib/iris/fileformats/pp.py b/lib/iris/fileformats/pp.py index 3c3e54f719..f1a7019cb8 100644 --- a/lib/iris/fileformats/pp.py +++ b/lib/iris/fileformats/pp.py @@ -37,14 +37,13 @@ import numpy as np import numpy.ma as ma import netcdftime -import dask.array as da from iris._deprecation import warn_deprecated import iris.config import iris.fileformats.rules import iris.fileformats.pp_rules import iris.coord_systems -from iris._lazy_data import is_lazy_data, array_masked_to_nans +from iris._lazy_data import array_masked_to_nans, as_lazy_data, is_lazy_data try: import mo_pack @@ -1881,7 +1880,7 @@ def _create_field_data(field, data_shape, land_mask): field.boundary_packing, field.bmdi, land_mask) block_shape = data_shape if 0 not in data_shape else (1, 1) - field.data = da.from_array(proxy, block_shape) + field.data = as_lazy_data(proxy, chunks=block_shape) def _field_gen(filename, read_data_bytes, little_ended=False): diff --git a/lib/iris/fileformats/um/_fast_load_structured_fields.py b/lib/iris/fileformats/um/_fast_load_structured_fields.py index b1248ed015..25c7f21059 100644 --- a/lib/iris/fileformats/um/_fast_load_structured_fields.py +++ b/lib/iris/fileformats/um/_fast_load_structured_fields.py @@ -36,6 +36,7 @@ optimal_array_structure from iris.fileformats.pp import PPField3 +from iris._lazy_data import as_lazy_data class FieldCollation(object): @@ -88,7 +89,7 @@ def data(self): if not self._structure_calculated: self._calculate_structure() if self._data_cache is None: - data_arrays = [da.from_array(f._data, f._data.shape) + data_arrays = [as_lazy_data(f._data, chunks=f._data.shape) for f in self.fields] vector_dims_list = list(self.vector_dims_shape) vector_dims_list.reverse() diff --git a/lib/iris/tests/integration/test_trajectory.py b/lib/iris/tests/integration/test_trajectory.py index 40a22a5b87..6a3004fd0b 100644 --- a/lib/iris/tests/integration/test_trajectory.py +++ b/lib/iris/tests/integration/test_trajectory.py @@ -24,11 +24,11 @@ # importing anything else import iris.tests as tests -import dask.array as da import numpy as np import iris import iris.tests.stock as istk +from iris._lazy_data import as_lazy_data from iris.analysis.trajectory import (Trajectory, interpolate as traj_interpolate) @@ -233,7 +233,7 @@ class TestLazyData(tests.IrisTest): def test_hybrid_height(self): cube = istk.simple_4d_with_hybrid_height() # Put a biggus array on the cube so we can test deferred loading. - cube.data = da.from_array(cube.data, chunks=cube.data.shape) + cube.data = as_lazy_data(cube.data) traj = (('grid_latitude', [20.5, 21.5, 22.5, 23.5]), ('grid_longitude', [31, 32, 33, 34])) diff --git a/lib/iris/tests/unit/analysis/interpolation/test_RectilinearInterpolator.py b/lib/iris/tests/unit/analysis/interpolation/test_RectilinearInterpolator.py index d4d7e51c58..97433d2f65 100644 --- a/lib/iris/tests/unit/analysis/interpolation/test_RectilinearInterpolator.py +++ b/lib/iris/tests/unit/analysis/interpolation/test_RectilinearInterpolator.py @@ -28,7 +28,7 @@ import datetime -import dask.array as da +from iris._lazy_data import as_lazy_data import numpy as np import iris @@ -481,7 +481,7 @@ def test_src_cube_data_loaded(self): # of loading it again and again. # Modify self.cube to have lazy data. - self.cube.data = da.from_array(self.data, chunks=self.data.shape) + self.cube.data = as_lazy_data(self.data) self.assertTrue(self.cube.has_lazy_data()) # Perform interpolation and check the data has been loaded. diff --git a/lib/iris/tests/unit/analysis/test_MEAN.py b/lib/iris/tests/unit/analysis/test_MEAN.py index 3b93fc58d8..93baafa21f 100644 --- a/lib/iris/tests/unit/analysis/test_MEAN.py +++ b/lib/iris/tests/unit/analysis/test_MEAN.py @@ -23,7 +23,7 @@ # importing anything else. import iris.tests as tests -import dask.array as da +from iris._lazy_data import as_lazy_data import numpy as np import numpy.ma as ma @@ -34,7 +34,7 @@ class Test_lazy_aggregate(tests.IrisTest): def setUp(self): self.data = np.arange(12.0).reshape(3, 4) self.data[2, 1:] = np.nan - self.array = da.from_array(self.data, chunks=self.data.shape) + self.array = as_lazy_data(self.data) masked_data = ma.masked_array(self.data, mask=np.isnan(self.data)) self.axis = 0 @@ -66,7 +66,7 @@ def test_mdtol_above(self): def test_multi_axis(self): data = np.arange(24.0).reshape((2, 3, 4)) collapse_axes = (0, 2) - lazy_data = da.from_array(data, chunks=1e6) + lazy_data = as_lazy_data(data) agg = MEAN.lazy_aggregate(lazy_data, axis=collapse_axes) expected = np.mean(data, axis=collapse_axes) self.assertArrayAllClose(agg.compute(), expected) diff --git a/lib/iris/tests/unit/analysis/test_STD_DEV.py b/lib/iris/tests/unit/analysis/test_STD_DEV.py index d52cf11b73..1cd9049b0f 100644 --- a/lib/iris/tests/unit/analysis/test_STD_DEV.py +++ b/lib/iris/tests/unit/analysis/test_STD_DEV.py @@ -23,8 +23,8 @@ # importing anything else. import iris.tests as tests +from iris._lazy_data import as_lazy_data import numpy as np -import dask.array as da from iris.analysis import STD_DEV @@ -35,7 +35,7 @@ def test_mdtol(self): array = np.array([[1., 2., 1., 2.], [1., 2., 3., na], [1., 2., na, na]]) - array = da.from_array(array, chunks=1e6) + array = as_lazy_data(array) var = STD_DEV.lazy_aggregate(array, axis=1, mdtol=0.3) result = var.compute() masked_result = np.ma.masked_array(result, mask=np.isnan(result)) @@ -44,12 +44,12 @@ def test_mdtol(self): self.assertMaskedArrayAlmostEqual(masked_result, masked_expected) def test_ddof_one(self): - array = da.from_array(np.arange(8), chunks=1e6) + array = as_lazy_data(np.arange(8)) var = STD_DEV.lazy_aggregate(array, axis=0, ddof=1) self.assertArrayAlmostEqual(var.compute(), np.array(2.449489)) def test_ddof_zero(self): - array = da.from_array(np.arange(8), chunks=1e6) + array = as_lazy_data(np.arange(8)) var = STD_DEV.lazy_aggregate(array, axis=0, ddof=0) self.assertArrayAlmostEqual(var.compute(), np.array(2.291287)) diff --git a/lib/iris/tests/unit/analysis/test_VARIANCE.py b/lib/iris/tests/unit/analysis/test_VARIANCE.py index 9ebac535aa..f858cd7182 100644 --- a/lib/iris/tests/unit/analysis/test_VARIANCE.py +++ b/lib/iris/tests/unit/analysis/test_VARIANCE.py @@ -23,7 +23,7 @@ # importing anything else. import iris.tests as tests -import dask.array as da +from iris._lazy_data import as_lazy_data import numpy as np import numpy.ma as ma @@ -69,12 +69,12 @@ def test_ma_ddof1(self): class Test_lazy_aggregate(tests.IrisTest): def test_ddof_one(self): - array = da.from_array(np.arange(8), chunks=1e6) + array = as_lazy_data(np.arange(8)) var = VARIANCE.lazy_aggregate(array, axis=0, ddof=1) self.assertArrayAlmostEqual(var.compute(), np.array(6.0)) def test_ddof_zero(self): - array = da.from_array(np.arange(8), chunks=1e6) + array = as_lazy_data(np.arange(8)) var = VARIANCE.lazy_aggregate(array, axis=0, ddof=0) self.assertArrayAlmostEqual(var.compute(), np.array(5.25)) diff --git a/lib/iris/tests/unit/aux_factory/test_AuxCoordFactory.py b/lib/iris/tests/unit/aux_factory/test_AuxCoordFactory.py index 40a1a84585..25a4f2683a 100644 --- a/lib/iris/tests/unit/aux_factory/test_AuxCoordFactory.py +++ b/lib/iris/tests/unit/aux_factory/test_AuxCoordFactory.py @@ -26,8 +26,7 @@ # importing anything else. import iris.tests as tests -import iris._lazy_data -import dask.array as da +from iris._lazy_data import as_lazy_data, is_lazy_data import numpy as np import iris.coords @@ -58,25 +57,25 @@ def test_numpy_complex(self): def test_lazy_simple(self): raw_points = np.arange(12).reshape(4, 3) - points = da.from_array(raw_points, 1) + points = as_lazy_data(raw_points, 1) coord = iris.coords.AuxCoord(points) - self.assertTrue(iris._lazy_data.is_lazy_data(coord._points)) + self.assertTrue(is_lazy_data(coord._points)) result = AuxCoordFactory._nd_points(coord, (0, 1), 2) # Check we haven't triggered the loading of the coordinate values. - self.assertTrue(iris._lazy_data.is_lazy_data(coord._points)) - self.assertTrue(iris._lazy_data.is_lazy_data(result)) + self.assertTrue(is_lazy_data(coord._points)) + self.assertTrue(is_lazy_data(result)) expected = raw_points self.assertArrayEqual(result, expected) def test_lazy_complex(self): raw_points = np.arange(12).reshape(4, 3) - points = da.from_array(raw_points, 1) + points = as_lazy_data(raw_points, 1) coord = iris.coords.AuxCoord(points) - self.assertTrue(iris._lazy_data.is_lazy_data(coord._points)) + self.assertTrue(is_lazy_data(coord._points)) result = AuxCoordFactory._nd_points(coord, (3, 2), 5) # Check we haven't triggered the loading of the coordinate values. - self.assertTrue(iris._lazy_data.is_lazy_data(coord._points)) - self.assertTrue(iris._lazy_data.is_lazy_data(result)) + self.assertTrue(is_lazy_data(coord._points)) + self.assertTrue(is_lazy_data(result)) expected = raw_points.T[np.newaxis, np.newaxis, ..., np.newaxis] self.assertArrayEqual(result, expected) diff --git a/lib/iris/tests/unit/cube/test_Cube.py b/lib/iris/tests/unit/cube/test_Cube.py index f7b69d3c0b..849491fc19 100644 --- a/lib/iris/tests/unit/cube/test_Cube.py +++ b/lib/iris/tests/unit/cube/test_Cube.py @@ -23,7 +23,6 @@ # importing anything else. import iris.tests as tests -import dask.array as da import numpy as np import numpy.ma as ma @@ -39,6 +38,7 @@ from iris.exceptions import CoordinateNotFoundError, CellMeasureNotFoundError from iris.tests import mock import iris.tests.stock as stock +from iris._lazy_data import as_lazy_data class Test___init___data(tests.IrisTest): @@ -147,7 +147,7 @@ def test_byteorder_true(self): class Test_collapsed__lazy(tests.IrisTest): def setUp(self): self.data = np.arange(6.0).reshape((2, 3)) - self.lazydata = da.from_array(self.data, chunks=self.data.shape) + self.lazydata = as_lazy_data(self.data) cube = Cube(self.lazydata) for i_dim, name in enumerate(('y', 'x')): npts = cube.shape[i_dim] @@ -545,7 +545,7 @@ def test_nodimension(self): def create_cube(lon_min, lon_max, bounds=False): n_lons = max(lon_min, lon_max) - min(lon_max, lon_min) data = np.arange(4 * 3 * n_lons, dtype='f4').reshape(4, 3, n_lons) - data = da.from_array(data, chunks=data.shape) + data = as_lazy_data(data) cube = Cube(data, standard_name='x_wind', units='ms-1') cube.add_dim_coord(iris.coords.DimCoord([0, 20, 40, 80], long_name='level_height', @@ -1219,7 +1219,7 @@ def test__masked_scalar_arraymask(self): self._check_copy(cube, cube.copy()) def test__lazy(self): - cube = Cube(da.from_array(np.array([1, 0]), chunks=100)) + cube = Cube(as_lazy_data(np.array([1, 0]))) self._check_copy(cube, cube.copy()) @@ -1234,8 +1234,7 @@ def test_float32(self): def test_lazy(self): data = np.arange(6, dtype=np.float32).reshape(2, 3) - lazydata = da.from_array(data, chunks=data.shape) - cube = Cube(lazydata) + cube = Cube(as_lazy_data(data)) self.assertEqual(cube.dtype, np.float32) # Check that accessing the dtype does not trigger loading of the data. self.assertTrue(cube.has_lazy_data()) @@ -1414,7 +1413,7 @@ def test_fail_cell_measure_dims(self): class Test_transpose(tests.IrisTest): def test_lazy_data(self): data = np.arange(12).reshape(3, 4) - cube = Cube(da.from_array(data, chunks=data.shape)) + cube = Cube(as_lazy_data(data)) cube.transpose() self.assertTrue(cube.has_lazy_data()) self.assertArrayEqual(data.T, cube.data) diff --git a/lib/iris/tests/unit/fileformats/um/fast_load_structured_fields/test_FieldCollation.py b/lib/iris/tests/unit/fileformats/um/fast_load_structured_fields/test_FieldCollation.py index 5c813a0a69..1d728731f5 100644 --- a/lib/iris/tests/unit/fileformats/um/fast_load_structured_fields/test_FieldCollation.py +++ b/lib/iris/tests/unit/fileformats/um/fast_load_structured_fields/test_FieldCollation.py @@ -27,7 +27,7 @@ # before importing anything else. import iris.tests as tests -import dask.array as da +from iris._lazy_data import as_lazy_data from netcdftime import datetime import numpy as np @@ -70,7 +70,7 @@ def _make_field(lbyr=None, lbyrd=None, lbft=None, def _make_data(fill_value): shape = (10, 10) - return da.from_array(np.ones(shape)*fill_value, chunks=100) + return as_lazy_data(np.ones(shape)*fill_value) class Test_data(tests.IrisTest): diff --git a/lib/iris/tests/unit/lazy_data/test_as_lazy_data.py b/lib/iris/tests/unit/lazy_data/test_as_lazy_data.py new file mode 100644 index 0000000000..45aa0a9833 --- /dev/null +++ b/lib/iris/tests/unit/lazy_data/test_as_lazy_data.py @@ -0,0 +1,65 @@ +# (C) British Crown Copyright 2017, Met Office +# +# This file is part of Iris. +# +# Iris is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the +# Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Iris is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Iris. If not, see . +"""Test the function :func:`iris._lazy data.as_lazy_data`.""" + +from __future__ import (absolute_import, division, print_function) +from six.moves import (filter, input, map, range, zip) # noqa + +# Import iris.tests first so that some things can be initialised before +# importing anything else. +import iris.tests as tests + +import numpy as np +import dask.array as da + +from iris._lazy_data import as_lazy_data, _MAX_CHUNK_SIZE + + +class Test_as_lazy_data(tests.IrisTest): + def test_lazy(self): + data = da.from_array(np.arange(24).reshape((2, 3, 4)), + chunks=_MAX_CHUNK_SIZE) + result = as_lazy_data(data) + self.assertIsInstance(result, da.core.Array) + + def test_real(self): + data = np.arange(24).reshape((2, 3, 4)) + result = as_lazy_data(data) + self.assertIsInstance(result, da.core.Array) + + def test_masked(self): + data = np.ma.masked_greater(np.arange(24), 10) + result = as_lazy_data(data) + self.assertIsInstance(result, da.core.Array) + + def test_non_default_chunks(self): + data = np.arange(24) + chunks = 12 + lazy_data = as_lazy_data(data, chunks=chunks) + result, = np.unique(lazy_data.chunks) + self.assertEqual(result, chunks) + + def test_non_default_chunks__chunks_already_set(self): + chunks = 12 + data = da.from_array(np.arange(24), chunks=chunks) + lazy_data = as_lazy_data(data) + result, = np.unique(lazy_data.chunks) + self.assertEqual(result, chunks) + + +if __name__ == '__main__': + tests.main() diff --git a/lib/iris/tests/unit/lazy_data/test_is_lazy_data.py b/lib/iris/tests/unit/lazy_data/test_is_lazy_data.py index 90e0ff6aff..1c40d9a475 100644 --- a/lib/iris/tests/unit/lazy_data/test_is_lazy_data.py +++ b/lib/iris/tests/unit/lazy_data/test_is_lazy_data.py @@ -26,13 +26,13 @@ import numpy as np import dask.array as da -from iris._lazy_data import is_lazy_data +from iris._lazy_data import as_lazy_data, is_lazy_data, _MAX_CHUNK_SIZE class Test_is_lazy_data(tests.IrisTest): def test_lazy(self): - lazy_values = np.arange(30).reshape((2, 5, 3)) - lazy_array = da.from_array(lazy_values, 1e6) + values = np.arange(30).reshape((2, 5, 3)) + lazy_array = da.from_array(values, chunks=_MAX_CHUNK_SIZE) self.assertTrue(is_lazy_data(lazy_array)) def test_real(self): diff --git a/lib/iris/tests/unit/util/test_new_axis.py b/lib/iris/tests/unit/util/test_new_axis.py index 735cb815dc..0f40d92426 100644 --- a/lib/iris/tests/unit/util/test_new_axis.py +++ b/lib/iris/tests/unit/util/test_new_axis.py @@ -25,7 +25,7 @@ import iris.tests.stock as stock import copy -import dask.array as da +from iris._lazy_data import as_lazy_data import numpy as np import unittest @@ -137,7 +137,7 @@ def test_maint_factory(self): self._assert_cube_notis(res, cube) def test_lazy_data(self): - cube = iris.cube.Cube(da.from_array(self.data, chunks=self.data.shape)) + cube = iris.cube.Cube(as_lazy_data(self.data)) cube.add_aux_coord(iris.coords.DimCoord([1], standard_name='time')) res = new_axis(cube, 'time') self.assertTrue(cube.has_lazy_data())