Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 89 additions & 1 deletion lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,30 @@ def as_lazy_data(data, chunks=_MAX_CHUNK_SIZE):
return data


def as_concrete_data(data, nans_replacement=None, result_dtype=None):
"""
Return the actual content of a lazy array, as a numpy array.

If the data is a NumPy array, return it unchanged.

If the data is lazy, return the realised result.

Where lazy data contains NaNs these are translated by filling or conversion
to masked data, using the :func:`convert_nans_array` function.
See there for usage of the 'nans_replacement' and 'result_dtype' keys.

"""
if is_lazy_data(data):
# Realise dask array.
data = data.compute()
# Convert any missing data as requested.
data = convert_nans_array(data,
nans_replacement=nans_replacement,
result_dtype=result_dtype)

return data


def array_masked_to_nans(array):
"""
Convert a masked array to a NumPy `ndarray` filled with NaN values. Input
Expand Down Expand Up @@ -118,7 +142,7 @@ def multidim_lazy_stack(stack):
Args:

* stack:
+ An ndarray of dask arrays.
An ndarray of dask arrays.

Returns:
The input array converted to a lazy dask array.
Expand All @@ -135,3 +159,67 @@ def multidim_lazy_stack(stack):
result = da.stack([multidim_lazy_stack(subarray)
for subarray in stack])
return result


def convert_nans_array(array, nans_replacement=None, result_dtype=None):
"""
Convert a :class:`~numpy.ndarray` that may contain one or more NaN values
to either a :class:`~numpy.ma.core.MaskedArray` or a
:class:`~numpy.ndarray` with the NaN values filled.

Args:

* array:
The :class:`~numpy.ndarray` to be converted.

Kwargs:

* nans_replacement:
If `nans_replacement` is None, then raise an exception if the `array`
contains any NaN values (default behaviour).
If `nans_replacement` is `numpy.ma.masked`, then convert the `array`
to a :class:`~numpy.ma.core.MaskedArray`.
Otherwise, use the specified `nans_replacement` value as the `array`
fill value.

* result_dtype:
Cast the resultant array to this target :class:`~numpy.dtype`.

Returns:
An :class:`numpy.ndarray`.

.. note::
An input array that is either a :class:`~numpy.ma.core.MaskedArray`
or has an integral dtype will be returned unaltered.

"""
if not ma.isMaskedArray(array) and array.dtype.kind == 'f':
# First, calculate the mask.
mask = np.isnan(array)
# Now, cast the dtype, if required.
if result_dtype is not None:
result_dtype = np.dtype(result_dtype)
if array.dtype != result_dtype:
array = array.astype(result_dtype)
# Finally, mask or fill the data, as required or raise an exception
# if we detect there are NaNs present and we didn't expect any.
if np.any(mask):
if nans_replacement is None:
emsg = 'Array contains unexpected NaNs.'
raise ValueError(emsg)
elif nans_replacement is ma.masked:
# Mask the array with the default fill_value.
array = ma.masked_array(array, mask=mask)
else:
# Check the fill value is appropriate for the
# result array dtype.
try:
[fill_value] = np.asarray([nans_replacement],
dtype=array.dtype)
except OverflowError:
emsg = 'Fill value of {!r} invalid for array result {!r}.'
raise ValueError(emsg.format(nans_replacement,
array.dtype))
# Fill the array.
array[mask] = fill_value
return array
10 changes: 6 additions & 4 deletions lib/iris/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
import numpy as np
import numpy.ma as ma

from iris._lazy_data import as_lazy_data, is_lazy_data, multidim_lazy_stack
from iris._lazy_data import (as_lazy_data, is_lazy_data, multidim_lazy_stack,
as_concrete_data)
import iris.cube
import iris.coords
import iris.exceptions
Expand Down Expand Up @@ -1217,10 +1218,11 @@ def merge(self, unique=True):
if all_have_data:
# All inputs were concrete, so turn the result back into a
# normal array.
merged_data = merged_data.compute()
# Unmask the array only if it is filled.
merged_data = as_concrete_data(merged_data,
nans_replacement=ma.masked)
# Unmask the array if it has no masked points.
if (ma.isMaskedArray(merged_data) and
ma.count_masked(merged_data) == 0):
not ma.is_masked(merged_data)):
merged_data = merged_data.data
merged_cube = self._get_cube(merged_data)
merged_cubes.append(merged_cube)
Expand Down
4 changes: 3 additions & 1 deletion lib/iris/analysis/trajectory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (C) British Crown Copyright 2010 - 2016, Met Office
# (C) British Crown Copyright 2010 - 2017, Met Office
#
# This file is part of Iris.
#
Expand Down Expand Up @@ -341,6 +341,8 @@ def interpolate(cube, sample_points, method=None):
# This is **not** proper mask handling, because we cannot produce a
# masked result, but it ensures we use a "filled" version of the
# input in this case.
if cube.fill_value is not None:
source_data.fill_value = cube.fill_value
source_data = source_data.filled()
new_cube.data[:] = source_data
# NOTE: we assign to "new_cube.data[:]" and *not* just "new_cube.data",
Expand Down
39 changes: 24 additions & 15 deletions lib/iris/coords.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@
import warnings
import zlib

import biggus
import iris._lazy_data
from iris._lazy_data import is_lazy_data
import dask.array as da
import netcdftime
import numpy as np

import iris.aux_factory
import iris.exceptions
from iris._lazy_data import as_concrete_data
import iris.time
import iris.util

Expand Down Expand Up @@ -1611,8 +1611,12 @@ def _sanitise_array(self, src, ndmin):
@property
def points(self):
"""Property containing the points values as a numpy array"""
if iris._lazy_data.is_lazy_data(self._points):
self._points = self._points.compute()
if is_lazy_data(self._points):
self._points = as_concrete_data(self._points,
nans_replacement=np.ma.masked)
# NOTE: we probably don't have full support for masked aux-coords.
# We certainly *don't* handle a _FillValue attribute (and possibly
# the loader will throw one away ?)
return self._points.view()

@points.setter
Expand All @@ -1623,9 +1627,9 @@ def points(self, points):
# of 1 and is either a numpy or lazy array.
# This will avoid Scalar coords with points of shape () rather
# than the desired (1,)
if iris._lazy_data.is_lazy_data(points):
if is_lazy_data(points):
if points.shape == ():
points = points * np.ones(1)
points = da.reshape(points, (1,))
elif not isinstance(points, iris.aux_factory._LazyArray):
points = self._sanitise_array(points, 1)
# If points are already defined for this coordinate,
Expand All @@ -1649,8 +1653,12 @@ def bounds(self):
"""
if self._bounds is not None:
bounds = self._bounds
if isinstance(bounds, biggus.Array):
bounds = bounds.ndarray()
if is_lazy_data(bounds):
bounds = as_concrete_data(bounds,
nans_replacement=np.ma.masked)
# NOTE: we probably don't fully support for masked aux-coords.
# We certainly *don't* handle a _FillValue attribute (and
# possibly the loader will throw one away ?)
self._bounds = bounds
bounds = bounds.view()
else:
Expand All @@ -1662,8 +1670,8 @@ def bounds(self):
def bounds(self, bounds):
# Ensure the bounds are a compatible shape.
if bounds is not None:
if not isinstance(bounds, (iris.aux_factory._LazyArray,
biggus.Array)):
if not (isinstance(bounds, iris.aux_factory._LazyArray) or
is_lazy_data(bounds)):
bounds = self._sanitise_array(bounds, 2)
# NB. Use _points to avoid triggering any lazy array.
if self._points.shape != bounds.shape[:-1]:
Expand Down Expand Up @@ -1741,11 +1749,12 @@ def measure(self):
@property
def data(self):
"""Property containing the data values as a numpy array"""
data = self._data
if isinstance(data, biggus.Array):
data = data.ndarray()
self._data = data
return data.view()
if is_lazy_data(self._data):
self._data = as_concrete_data(self._data,
nans_replacement=np.ma.masked)
# NOTE: like AuxCoords, we probably don't fully support masks, and
# we certainly don't handle any _FillValue attribute.
return self._data.view()

@data.setter
def data(self, data):
Expand Down
23 changes: 11 additions & 12 deletions lib/iris/cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
import iris._concatenate
import iris._constraints
from iris._deprecation import warn_deprecated
from iris._lazy_data import array_masked_to_nans, as_lazy_data, is_lazy_data
from iris._lazy_data import (array_masked_to_nans, as_lazy_data,
convert_nans_array, is_lazy_data,
as_concrete_data)
import iris._merge
import iris.analysis
from iris.analysis.cartography import wrap_lons
Expand Down Expand Up @@ -1732,17 +1734,14 @@ def data(self):
"""
if self.has_lazy_data():
try:
data = self._dask_array.compute()
mask = np.isnan(data)
if data.dtype != self.dtype:
data = data.astype(self.dtype)
self.dtype = None
if np.all(~mask):
self._numpy_array = data
else:
fv = self.fill_value
self._numpy_array = ma.masked_array(data, mask=mask,
fill_value=fv)
# Realise the data, convert from a NaN array to a masked array,
# and if appropriate cast to the specified cube result dtype.
result = as_concrete_data(self._dask_array,
nans_replacement=ma.masked,
result_dtype=self.dtype)
self._numpy_array = result
self.dtype = None

except MemoryError:
msg = "Failed to create the cube's data as there was not" \
" enough memory available.\n" \
Expand Down
19 changes: 13 additions & 6 deletions lib/iris/fileformats/netcdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import string
import warnings

import biggus
import dask.array as da
import netCDF4
import numpy as np
import numpy.ma as ma
Expand All @@ -56,7 +56,8 @@
import iris.fileformats._pyke_rules
import iris.io
import iris.util
from iris._lazy_data import array_masked_to_nans, as_lazy_data
from iris._lazy_data import (array_masked_to_nans, as_lazy_data,
convert_nans_array)

# Show Pyke inference engine statistics.
DEBUG = False
Expand Down Expand Up @@ -1938,16 +1939,22 @@ def set_packing_ncattrs(cfvar):
# Explicitly assign the fill_value, which will be the type default
# in the case of an unmasked array.
if packing is None:
fill_value = cube.lazy_data().fill_value
fill_value = cube.fill_value
dtype = cube.lazy_data().dtype.newbyteorder('=')

cf_var = self._dataset.createVariable(
cf_name, dtype,
dimension_names, fill_value=fill_value,
**kwargs)
set_packing_ncattrs(cf_var)
# stream the data
biggus.save([cube.lazy_data()], [cf_var], masked=True)

# Now stream the cube data payload straight to the netCDF
# data variable within the netCDF file, where any NaN values
# are replaced with the specified cube fill_value.
data = da.map_blocks(convert_nans_array, cube.lazy_data(),
nans_replacement=cube.fill_value,
result_dtype=cube.dtype)
da.store([data], [cf_var])

if cube.standard_name:
_setncattr(cf_var, 'standard_name', cube.standard_name)
Expand Down Expand Up @@ -2045,7 +2052,7 @@ def save(cube, filename, netcdf_format='NETCDF4', local_keys=None,
* Keyword arguments specifying how to save the data are applied
to each cube. To use different settings for different cubes, use
the NetCDF Context manager (:class:`~Saver`) directly.
* The save process will stream the data payload to the file using biggus,
* The save process will stream the data payload to the file using dask,
enabling large data payloads to be saved and maintaining the 'lazy'
status of the cube's data payload, unless the netcdf_format is explicitly
specified to be 'NETCDF3' or 'NETCDF3_CLASSIC'.
Expand Down
9 changes: 5 additions & 4 deletions lib/iris/fileformats/pp.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
import iris.fileformats.rules
import iris.fileformats.pp_rules
import iris.coord_systems
from iris._lazy_data import array_masked_to_nans, as_lazy_data, is_lazy_data
from iris._lazy_data import (array_masked_to_nans, as_lazy_data, is_lazy_data,
as_concrete_data)

try:
import mo_pack
Expand Down Expand Up @@ -1280,11 +1281,11 @@ def data(self):

"""
# Cache the real data on first use
if is_lazy_data(self._data):
self._data = self._data.compute()
if self._data.dtype.kind == 'i' and self.bmdi == -1e30:
self.bmdi = -9999
self._data[np.isnan(self._data)] = self.bmdi
if is_lazy_data(self._data):
self._data = as_concrete_data(self._data,
nans_replacement=self.bmdi)
return self._data

@data.setter
Expand Down
5 changes: 0 additions & 5 deletions lib/iris/tests/integration/test_netcdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ def test_save(self):
iris.save(self.cube, filename)
self.assertCDL(filename)

@tests.skip_biggus
def test_save_load_loop(self):
# Tests an issue where the variable names in the formula
# terms changed to the standard_names instead of the variable names
Expand Down Expand Up @@ -201,7 +200,6 @@ def test_patching_conventions_attribute(self):
class TestLazySave(tests.IrisTest):

@tests.skip_data
@tests.skip_biggus
def test_lazy_preserved_save(self):
fpath = tests.get_data_path(('NetCDF', 'label_and_climate',
'small_FC_167_mon_19601101.nc'))
Expand All @@ -212,7 +210,6 @@ def test_lazy_preserved_save(self):
saver.write(acube)
self.assertTrue(acube.has_lazy_data())

@tests.skip_biggus
def test_lazy_mask_preserve_fill_value(self):
cube = iris.cube.Cube(np.ma.array([0, 1], mask=[False, True],
fill_value=-1))
Expand Down Expand Up @@ -272,7 +269,6 @@ def test_concatenate_cell_measure_match(self):
self.assertEqual(cubes[0]._cell_measures_and_dims, cm_and_dims)
self.assertEqual(len(cubes), 1)

@tests.skip_biggus
def test_round_trip(self):
cube, = iris.load(self.fname)
with self.temp_filename(suffix='.nc') as filename:
Expand Down Expand Up @@ -430,7 +426,6 @@ def test_multi_packed_single_dtype(self):
# Read PP input file.
self._multi_test('multi_packed_single_dtype.cdl')

@tests.skip_biggus
def test_multi_packed_multi_dtype(self):
"""Test saving multiple packed cubes with pack_dtype list."""
# Read PP input file.
Expand Down
Loading