Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import dask.array as da
import numpy as np
import numpy.ma as ma


def is_lazy_data(data):
Expand All @@ -40,11 +41,45 @@ def is_lazy_data(data):
return result


# A magic value, borrowed from biggus
_MAX_CHUNK_SIZE = 8 * 1024 * 1024 * 2


def as_lazy_data(data, chunks=_MAX_CHUNK_SIZE):
"""
Convert the input array `data` to a lazy dask array.

Args:

* data:
An array. This will be converted to a lazy dask array.

Kwargs:

* chunks:
Describes how the created dask array should be split up. Defaults to a
value first defined in biggus (being `8 * 1024 * 1024 * 2`).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do wonder whether the references to biggus here and on line 43 are necessary. I want to be able to read this code without needing to also know the history of Iris lazy data handling.
Not that this is a blocker to this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I guess this is a temporary note for us to describe the background to choosing this number in particular...

For more information see
http://dask.pydata.org/en/latest/array-creation.html#chunks.

Returns:
The input array converted to a lazy dask array.

"""
if not is_lazy_data(data):
if isinstance(data, ma.MaskedArray):
data = array_masked_to_nans(data)
data = data.data
data = da.from_array(data, chunks=chunks)
return data


def array_masked_to_nans(array, mask=None):
"""
Convert a masked array to a normal array with NaNs at masked points.
Convert a masked array to an `ndarray` with NaNs at masked points.
This is used for dask integration, as dask does not support masked arrays.
Note that any fill value will be lost.

"""
if mask is None:
mask = array.mask
Expand Down
8 changes: 2 additions & 6 deletions lib/iris/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import numpy as np
import numpy.ma as ma

from iris._lazy_data import is_lazy_data, array_masked_to_nans
from iris._lazy_data import array_masked_to_nans, as_lazy_data, is_lazy_data
import iris.cube
import iris.coords
import iris.exceptions
Expand Down Expand Up @@ -1231,11 +1231,7 @@ def merge(self, unique=True):
if is_lazy_data(data):
all_have_data = False
else:
if isinstance(data, ma.MaskedArray):
if ma.is_masked(data):
data = array_masked_to_nans(data)
data = data.data
data = da.from_array(data, chunks=data.shape)
data = as_lazy_data(data, chunks=data.shape)
stack[nd_index] = data

merged_data = _multidim_daskstack(stack)
Expand Down
7 changes: 2 additions & 5 deletions lib/iris/cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import iris._concatenate
import iris._constraints
from iris._deprecation import warn_deprecated
from iris._lazy_data import is_lazy_data, array_masked_to_nans
from iris._lazy_data import array_masked_to_nans, as_lazy_data, is_lazy_data
import iris._merge
import iris.analysis
from iris.analysis.cartography import wrap_lons
Expand Down Expand Up @@ -1658,10 +1658,7 @@ def lazy_data(self):
result = self._dask_array
else:
data = self._numpy_array
if isinstance(data, ma.masked_array):
data = array_masked_to_nans(data)
data = data.data
result = da.from_array(data, chunks=data.shape)
result = as_lazy_data(data)
return result

@property
Expand Down
6 changes: 3 additions & 3 deletions lib/iris/fileformats/_pyke_rules/fc_rules_cf.krb
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,6 @@ fc_extras
import warnings

import cf_units
import dask.array as da
import netCDF4
import numpy as np
import numpy.ma as ma
Expand All @@ -1018,6 +1017,7 @@ fc_extras
import iris.exceptions
import iris.std_names
import iris.util
from iris._lazy_data import as_lazy_data
Copy link
Member

@lbdreyer lbdreyer Mar 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it matter what order these are in? I just noticed that in some of the other files the private iris modules are imported first.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I put this here because I'd seen examples where the private modules were imported last 😉



#
Expand Down Expand Up @@ -1630,7 +1630,7 @@ fc_extras
proxy = iris.fileformats.netcdf.NetCDFDataProxy(
cf_var.shape, dtype, engine.filename,
cf_var.cf_name, fill_value)
return da.from_array(proxy, chunks=proxy.shape)
return as_lazy_data(proxy, chunks=proxy.shape)

# Get any coordinate point data.
if isinstance(cf_coord_var, cf.CFLabelVariable):
Expand Down Expand Up @@ -1701,7 +1701,7 @@ fc_extras
proxy = iris.fileformats.netcdf.NetCDFDataProxy(
cf_var.shape, dtype, engine.filename,
cf_var.cf_name, fill_value)
return da.from_array(proxy, chunks=proxy.shape)
return as_lazy_data(proxy, chunks=proxy.shape)

data = cf_var_as_array(cf_cm_attr)

Expand Down
5 changes: 2 additions & 3 deletions lib/iris/fileformats/netcdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import warnings

import biggus
import dask.array as da
import netCDF4
import numpy as np
import numpy.ma as ma
Expand All @@ -57,7 +56,7 @@
import iris.fileformats._pyke_rules
import iris.io
import iris.util
from iris._lazy_data import array_masked_to_nans
from iris._lazy_data import array_masked_to_nans, as_lazy_data

# Show Pyke inference engine statistics.
DEBUG = False
Expand Down Expand Up @@ -508,7 +507,7 @@ def _load_cube(engine, cf, cf_var, filename):

proxy = NetCDFDataProxy(cf_var.shape, dummy_data.dtype,
filename, cf_var.cf_name, fill_value)
data = da.from_array(proxy, chunks=100)
data = as_lazy_data(proxy, chunks=cf_var.shape)
cube = iris.cube.Cube(data, fill_value=fill_value, dtype=dummy_data.dtype)

# Reset the pyke inference engine.
Expand Down
5 changes: 2 additions & 3 deletions lib/iris/fileformats/pp.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,13 @@
import numpy as np
import numpy.ma as ma
import netcdftime
import dask.array as da

from iris._deprecation import warn_deprecated
import iris.config
import iris.fileformats.rules
import iris.fileformats.pp_rules
import iris.coord_systems
from iris._lazy_data import is_lazy_data, array_masked_to_nans
from iris._lazy_data import array_masked_to_nans, as_lazy_data, is_lazy_data

try:
import mo_pack
Expand Down Expand Up @@ -1881,7 +1880,7 @@ def _create_field_data(field, data_shape, land_mask):
field.boundary_packing,
field.bmdi, land_mask)
block_shape = data_shape if 0 not in data_shape else (1, 1)
field.data = da.from_array(proxy, block_shape)
field.data = as_lazy_data(proxy, chunks=block_shape)


def _field_gen(filename, read_data_bytes, little_ended=False):
Expand Down
3 changes: 2 additions & 1 deletion lib/iris/fileformats/um/_fast_load_structured_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
optimal_array_structure

from iris.fileformats.pp import PPField3
from iris._lazy_data import as_lazy_data


class FieldCollation(object):
Expand Down Expand Up @@ -88,7 +89,7 @@ def data(self):
if not self._structure_calculated:
self._calculate_structure()
if self._data_cache is None:
data_arrays = [da.from_array(f._data, f._data.shape)
data_arrays = [as_lazy_data(f._data, chunks=f._data.shape)
for f in self.fields]
vector_dims_list = list(self.vector_dims_shape)
vector_dims_list.reverse()
Expand Down
4 changes: 2 additions & 2 deletions lib/iris/tests/integration/test_trajectory.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
# importing anything else
import iris.tests as tests

import dask.array as da
import numpy as np

import iris
import iris.tests.stock as istk
from iris._lazy_data import as_lazy_data

from iris.analysis.trajectory import (Trajectory,
interpolate as traj_interpolate)
Expand Down Expand Up @@ -233,7 +233,7 @@ class TestLazyData(tests.IrisTest):
def test_hybrid_height(self):
cube = istk.simple_4d_with_hybrid_height()
# Put a biggus array on the cube so we can test deferred loading.
cube.data = da.from_array(cube.data, chunks=cube.data.shape)
cube.data = as_lazy_data(cube.data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm interested why you have removed teh chunking here, and in other tests

is there a reason to use the 'magic chunking number' in these cases?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Primarily this has been done for consistency, but there are also performance improvements to be had for setting the chunk size to the magic chunk size.


traj = (('grid_latitude', [20.5, 21.5, 22.5, 23.5]),
('grid_longitude', [31, 32, 33, 34]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

import datetime

import dask.array as da
from iris._lazy_data import as_lazy_data
import numpy as np

import iris
Expand Down Expand Up @@ -481,7 +481,7 @@ def test_src_cube_data_loaded(self):
# of loading it again and again.

# Modify self.cube to have lazy data.
self.cube.data = da.from_array(self.data, chunks=self.data.shape)
self.cube.data = as_lazy_data(self.data)
self.assertTrue(self.cube.has_lazy_data())

# Perform interpolation and check the data has been loaded.
Expand Down
6 changes: 3 additions & 3 deletions lib/iris/tests/unit/analysis/test_MEAN.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
# importing anything else.
import iris.tests as tests

import dask.array as da
from iris._lazy_data import as_lazy_data
import numpy as np
import numpy.ma as ma

Expand All @@ -34,7 +34,7 @@ class Test_lazy_aggregate(tests.IrisTest):
def setUp(self):
self.data = np.arange(12.0).reshape(3, 4)
self.data[2, 1:] = np.nan
self.array = da.from_array(self.data, chunks=self.data.shape)
self.array = as_lazy_data(self.data)
masked_data = ma.masked_array(self.data,
mask=np.isnan(self.data))
self.axis = 0
Expand Down Expand Up @@ -66,7 +66,7 @@ def test_mdtol_above(self):
def test_multi_axis(self):
data = np.arange(24.0).reshape((2, 3, 4))
collapse_axes = (0, 2)
lazy_data = da.from_array(data, chunks=1e6)
lazy_data = as_lazy_data(data)
agg = MEAN.lazy_aggregate(lazy_data, axis=collapse_axes)
expected = np.mean(data, axis=collapse_axes)
self.assertArrayAllClose(agg.compute(), expected)
Expand Down
8 changes: 4 additions & 4 deletions lib/iris/tests/unit/analysis/test_STD_DEV.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
# importing anything else.
import iris.tests as tests

from iris._lazy_data import as_lazy_data
import numpy as np
import dask.array as da

from iris.analysis import STD_DEV

Expand All @@ -35,7 +35,7 @@ def test_mdtol(self):
array = np.array([[1., 2., 1., 2.],
[1., 2., 3., na],
[1., 2., na, na]])
array = da.from_array(array, chunks=1e6)
array = as_lazy_data(array)
var = STD_DEV.lazy_aggregate(array, axis=1, mdtol=0.3)
result = var.compute()
masked_result = np.ma.masked_array(result, mask=np.isnan(result))
Expand All @@ -44,12 +44,12 @@ def test_mdtol(self):
self.assertMaskedArrayAlmostEqual(masked_result, masked_expected)

def test_ddof_one(self):
array = da.from_array(np.arange(8), chunks=1e6)
array = as_lazy_data(np.arange(8))
var = STD_DEV.lazy_aggregate(array, axis=0, ddof=1)
self.assertArrayAlmostEqual(var.compute(), np.array(2.449489))

def test_ddof_zero(self):
array = da.from_array(np.arange(8), chunks=1e6)
array = as_lazy_data(np.arange(8))
var = STD_DEV.lazy_aggregate(array, axis=0, ddof=0)
self.assertArrayAlmostEqual(var.compute(), np.array(2.291287))

Expand Down
6 changes: 3 additions & 3 deletions lib/iris/tests/unit/analysis/test_VARIANCE.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
# importing anything else.
import iris.tests as tests

import dask.array as da
from iris._lazy_data import as_lazy_data
import numpy as np
import numpy.ma as ma

Expand Down Expand Up @@ -69,12 +69,12 @@ def test_ma_ddof1(self):

class Test_lazy_aggregate(tests.IrisTest):
def test_ddof_one(self):
array = da.from_array(np.arange(8), chunks=1e6)
array = as_lazy_data(np.arange(8))
var = VARIANCE.lazy_aggregate(array, axis=0, ddof=1)
self.assertArrayAlmostEqual(var.compute(), np.array(6.0))

def test_ddof_zero(self):
array = da.from_array(np.arange(8), chunks=1e6)
array = as_lazy_data(np.arange(8))
var = VARIANCE.lazy_aggregate(array, axis=0, ddof=0)
self.assertArrayAlmostEqual(var.compute(), np.array(5.25))

Expand Down
19 changes: 9 additions & 10 deletions lib/iris/tests/unit/aux_factory/test_AuxCoordFactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
# importing anything else.
import iris.tests as tests

import iris._lazy_data
import dask.array as da
from iris._lazy_data import as_lazy_data, is_lazy_data
import numpy as np

import iris.coords
Expand Down Expand Up @@ -58,25 +57,25 @@ def test_numpy_complex(self):

def test_lazy_simple(self):
raw_points = np.arange(12).reshape(4, 3)
points = da.from_array(raw_points, 1)
points = as_lazy_data(raw_points, 1)
coord = iris.coords.AuxCoord(points)
self.assertTrue(iris._lazy_data.is_lazy_data(coord._points))
self.assertTrue(is_lazy_data(coord._points))
result = AuxCoordFactory._nd_points(coord, (0, 1), 2)
# Check we haven't triggered the loading of the coordinate values.
self.assertTrue(iris._lazy_data.is_lazy_data(coord._points))
self.assertTrue(iris._lazy_data.is_lazy_data(result))
self.assertTrue(is_lazy_data(coord._points))
self.assertTrue(is_lazy_data(result))
expected = raw_points
self.assertArrayEqual(result, expected)

def test_lazy_complex(self):
raw_points = np.arange(12).reshape(4, 3)
points = da.from_array(raw_points, 1)
points = as_lazy_data(raw_points, 1)
coord = iris.coords.AuxCoord(points)
self.assertTrue(iris._lazy_data.is_lazy_data(coord._points))
self.assertTrue(is_lazy_data(coord._points))
result = AuxCoordFactory._nd_points(coord, (3, 2), 5)
# Check we haven't triggered the loading of the coordinate values.
self.assertTrue(iris._lazy_data.is_lazy_data(coord._points))
self.assertTrue(iris._lazy_data.is_lazy_data(result))
self.assertTrue(is_lazy_data(coord._points))
self.assertTrue(is_lazy_data(result))
expected = raw_points.T[np.newaxis, np.newaxis, ..., np.newaxis]
self.assertArrayEqual(result, expected)

Expand Down
Loading