diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index cf6dcc43bf..e1f918c874 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -22,35 +22,25 @@ """ from __future__ import (absolute_import, division, print_function) from six.moves import (filter, input, map, range, zip) # noqa +import six import dask.array as da import numpy as np -# Whether to recognise biggus arrays as lazy, *as well as* dask. -# NOTE: in either case, this module will not *make* biggus arrays, only dask. -_SUPPORT_BIGGUS = True - -if _SUPPORT_BIGGUS: - import biggus - - def is_lazy_data(data): """ Return whether the argument is an Iris 'lazy' data array. At present, this means simply a Dask array. We determine this by checking for a "compute" property. - NOTE: ***for now only*** accept Biggus arrays also. """ result = hasattr(data, 'compute') - if not result and _SUPPORT_BIGGUS: - result = isinstance(data, biggus.Array) return result -def as_concrete_data(data): +def as_concrete_data(data, fill_value=None): """ Return the actual content of the argument, as a numpy masked array. @@ -58,21 +48,15 @@ def as_concrete_data(data): """ if is_lazy_data(data): - if _SUPPORT_BIGGUS and isinstance(data, biggus.Array): - # Realise biggus array. - # treat all as masked, for standard cube.data behaviour. - data = data.masked_array() - else: - # Grab a fill value, in case this is just a converted masked array. - fill_value = getattr(data, 'fill_value', None) - # Realise dask array. - data = data.compute() - # Convert NaN arrays into masked arrays for Iris' consumption. - mask = np.isnan(data) - if np.all(~mask): - mask = None - data = np.ma.masked_array(data, mask=mask, - fill_value=fill_value) + # Realise dask array. + data = data.compute() + # Convert NaN arrays into masked arrays for Iris' consumption. + mask = np.isnan(data) + + if np.all(~mask): + mask = None + data = np.ma.masked_array(data, mask=mask, + fill_value=fill_value) return data @@ -94,17 +78,10 @@ def as_lazy_data(data): """ if not is_lazy_data(data): - # record the original fill value. - fill_value = getattr(data, 'fill_value', None) if isinstance(data, np.ma.MaskedArray): # Use with NaNs replacing the mask. data = array_masked_to_nans(data) data = da.from_array(data, chunks=_MAX_CHUNK_SIZE) - # Attach any fill value to the dask object. - # Note: this is not passed on to dask arrays derived from this one. - data.fill_value = fill_value - elif not hasattr(data, 'fill_value'): - data.fill_value = None # make it look more like a biggus Array ? return data diff --git a/lib/iris/cube.py b/lib/iris/cube.py index 54a7369190..20b1f14757 100644 --- a/lib/iris/cube.py +++ b/lib/iris/cube.py @@ -33,6 +33,7 @@ import zlib import biggus +import dask.array as da import numpy as np import numpy.ma as ma @@ -64,7 +65,8 @@ class CubeMetadata(collections.namedtuple('CubeMetadata', 'var_name', 'units', 'attributes', - 'cell_methods'])): + 'cell_methods', + 'dtype', 'fill_value'])): """ Represents the phenomenon metadata for a single :class:`Cube`. @@ -648,7 +650,7 @@ def __init__(self, data, standard_name=None, long_name=None, var_name=None, units=None, attributes=None, cell_methods=None, dim_coords_and_dims=None, aux_coords_and_dims=None, aux_factories=None, - cell_measures_and_dims=None): + cell_measures_and_dims=None, dtype=None, fill_value=None): """ Creates a cube with data and optional metadata. @@ -714,9 +716,15 @@ def __init__(self, data, standard_name=None, long_name=None, if isinstance(data, six.string_types): raise TypeError('Invalid data type: {!r}.'.format(data)) + self.shape = data.shape + if dtype is not None and dtype != data.dtype: + raise ValueError('dtype must match data') + self.dtype = data.dtype + self.fill_value = fill_value + if not is_lazy_data(data): data = np.asarray(data) - self._my_data = data + self.data_graph = as_lazy_data(data) #: The "standard name" for the Cube's phenomenon. self.standard_name = standard_name @@ -786,7 +794,8 @@ def metadata(self): """ return CubeMetadata(self.standard_name, self.long_name, self.var_name, - self.units, self.attributes, self.cell_methods) + self.units, self.attributes, self.cell_methods, + self.dtype, self.fill_value) @metadata.setter def metadata(self, value): @@ -1589,16 +1598,16 @@ def cell_methods(self): def cell_methods(self, cell_methods): self._cell_methods = tuple(cell_methods) if cell_methods else tuple() - @property - def shape(self): - """The shape of the data of this cube.""" - shape = self._my_data.shape - return shape + # @property + # def shape(self): + # """The shape of the data of this cube.""" + # shape = self.data_graph.shape + # return shape - @property - def dtype(self): - """The :class:`numpy.dtype` of the data of this cube.""" - return self._my_data.dtype + # @property + # def dtype(self): + # """The :class:`numpy.dtype` of the data of this cube.""" + # return self.data_graph.dtype @property def ndim(self): @@ -1642,11 +1651,8 @@ def lazy_data(self, array=None): if self.shape or array.shape != (1,): raise ValueError('Require cube data with shape %r, got ' '%r.' % (self.shape, array.shape)) - self._my_data = array - else: - array = self._my_data - array = as_lazy_data(array) - return array + self.data_graph = array + return self.data_graph @property def data(self): @@ -1681,26 +1687,29 @@ def data(self): (10, 20) """ - data = self._my_data - if is_lazy_data(data): - try: - data = as_concrete_data(data) - except MemoryError: - msg = "Failed to create the cube's data as there was not" \ - " enough memory available.\n" \ - "The array shape would have been {0!r} and the data" \ - " type {1}.\n" \ - "Consider freeing up variables or indexing the cube" \ - " before getting its data." - msg = msg.format(self.shape, data.dtype) - raise MemoryError(msg) - # Unmask the array only if it is filled. - if (isinstance(data, np.ma.masked_array) and - ma.count_masked(data) == 0): - data = data.data - # data may be a numeric type, so ensure an np.ndarray is returned - self._my_data = np.asanyarray(data) - return self._my_data + data = self.data_graph + chunks = self.data_graph.chunks + try: + data = as_concrete_data(data, fill_value=self.fill_value) + except MemoryError: + msg = "Failed to create the cube's data as there was not" \ + " enough memory available.\n" \ + "The array shape would have been {0!r} and the data" \ + " type {1}.\n" \ + "Consider freeing up variables or indexing the cube" \ + " before getting its data." + msg = msg.format(self.shape, data.dtype) + raise MemoryError(msg) + + # Unmask the array only if it is filled. + if (isinstance(data, np.ma.masked_array) and + ma.count_masked(data) == 0): + data = data.data + # data may be a numeric type, so ensure an np.ndarray is returned + data = np.asanyarray(data) + # Create a dask data_graph and link the cube to this + self.data_graph = da.from_array(data.data, chunks) + return data @data.setter def data(self, value): @@ -1713,11 +1722,13 @@ def data(self, value): if self.shape or data.shape != (1,): raise ValueError('Require cube data with shape %r, got ' '%r.' % (self.shape, data.shape)) - - self._my_data = data + self.dtype = data.dtype + self.data_graph = as_lazy_data(data) def has_lazy_data(self): - return is_lazy_data(self._my_data) + # now this always returns true, new pattern needed + return is_lazy_data(self.data_graph) + @property def dim_coords(self): @@ -2182,9 +2193,9 @@ def new_cell_measure_dims(cm_): first_slice = None if first_slice is not None: - data = self._my_data[first_slice] + data = self.data_graph[first_slice] else: - data = copy.deepcopy(self._my_data) + data = copy.deepcopy(self.data_graph) for other_slice in slice_gen: data = data[other_slice] @@ -2819,9 +2830,9 @@ def transpose(self, new_order=None): raise ValueError('Incorrect number of dimensions.') if self.has_lazy_data(): - self._my_data = self.lazy_data().transpose(new_order) + self.data_graph = self.lazy_data().transpose(new_order) else: - self._my_data = self.data.transpose(new_order) + self.data_graph = self.data.transpose(new_order) dim_mapping = {src: dest for dest, src in enumerate(new_order)} diff --git a/lib/iris/fileformats/netcdf.py b/lib/iris/fileformats/netcdf.py index 5b89c110a1..26656b0fc0 100644 --- a/lib/iris/fileformats/netcdf.py +++ b/lib/iris/fileformats/netcdf.py @@ -38,6 +38,7 @@ import warnings import biggus +import dask.array as da import netCDF4 import numpy as np import numpy.ma as ma @@ -56,7 +57,6 @@ import iris.fileformats._pyke_rules import iris.io import iris.util -import iris._lazy_data # Show Pyke inference engine statistics. @@ -374,7 +374,8 @@ def _pyke_kb_engine(): class NetCDFDataProxy(object): """A reference to the data payload of a single NetCDF file variable.""" - __slots__ = ('shape', 'dtype', 'path', 'variable_name', 'fill_value') + __slots__ = ('shape', 'dtype', 'path', 'variable_name', 'fill_value', + '_data_cache') def __init__(self, shape, dtype, path, variable_name, fill_value): self.shape = shape @@ -382,19 +383,26 @@ def __init__(self, shape, dtype, path, variable_name, fill_value): self.path = path self.variable_name = variable_name self.fill_value = fill_value + self._data_cache = {} @property def ndim(self): return len(self.shape) def __getitem__(self, keys): - dataset = netCDF4.Dataset(self.path) - try: - variable = dataset.variables[self.variable_name] - # Get the NetCDF variable data and slice. - data = variable[keys] - finally: - dataset.close() + if keys not in self._data_cache.keys(): + dataset = netCDF4.Dataset(self.path) + try: + variable = dataset.variables[self.variable_name] + # Get the NetCDF variable data and slice. + v = variable[keys] + if isinstance(v, np.ma.MaskedArray): + self._data_cache[str(keys)] = v.filled(np.nan) + else: + self._data_cache[str(keys)] = v[keys] + finally: + dataset.close() + data = self._data_cache[str(keys)] return data def __repr__(self): @@ -501,12 +509,12 @@ def _load_cube(engine, cf, cf_var, filename): dummy_data = cf_var.add_offset + dummy_data # Create cube with deferred data, but no metadata - fill_value = getattr(cf_var.cf_data, '_FillValue', - netCDF4.default_fillvals[cf_var.dtype.str[1:]]) + fill_value = getattr(cf_var.cf_data, '_FillValue', None) + # netCDF4.default_fillvals[cf_var.dtype.str[1:]]) proxy = NetCDFDataProxy(cf_var.shape, dummy_data.dtype, filename, cf_var.cf_name, fill_value) - data = iris._lazy_data.as_lazy_data(proxy) - cube = iris.cube.Cube(data) + data = da.from_array(proxy, chunks=100) + cube = iris.cube.Cube(data, fill_value=fill_value) # Reset the pyke inference engine. engine.reset() diff --git a/lib/iris/fileformats/pp.py b/lib/iris/fileformats/pp.py index 5c928ca6f5..9f55150b65 100644 --- a/lib/iris/fileformats/pp.py +++ b/lib/iris/fileformats/pp.py @@ -43,7 +43,6 @@ import iris.fileformats.rules import iris.fileformats.pp_rules import iris.coord_systems -import iris._lazy_data try: import mo_pack @@ -831,7 +830,7 @@ class PPDataProxy(object): """A reference to the data payload of a single PP field.""" __slots__ = ('shape', 'src_dtype', 'path', 'offset', 'data_len', - '_lbpack', 'boundary_packing', 'mdi', 'mask') + '_lbpack', 'boundary_packing', 'mdi', 'mask', '_data_cache') def __init__(self, shape, src_dtype, path, offset, data_len, lbpack, boundary_packing, mdi, mask): @@ -844,6 +843,7 @@ def __init__(self, shape, src_dtype, path, offset, data_len, self.boundary_packing = boundary_packing self.mdi = mdi self.mask = mask + self._data_cache = None # lbpack def _lbpack_setter(self, value): @@ -874,12 +874,18 @@ def __getitem__(self, keys): with open(self.path, 'rb') as pp_file: pp_file.seek(self.offset, os.SEEK_SET) data_bytes = pp_file.read(self.data_len) - data = _data_bytes_to_shaped_array(data_bytes, - self.lbpack, - self.boundary_packing, - self.shape, self.src_dtype, - self.mdi, self.mask) - return data.__getitem__(keys) + # Only read from disk if the data is not cached or + # if it is not the correct shape. + if (self._data_cache is None or + not hasattr(self._data_cache, 'shape') or + self._data_cache.shape != self.shape): + data = _data_bytes_to_shaped_array(data_bytes, + self.lbpack, + self.boundary_packing, + self.shape, self.src_dtype, + self.mdi, self.mask) + self._data_cache = data + return self._data_cache.__getitem__(keys) def __repr__(self): fmt = '<{self.__class__.__name__} shape={self.shape}' \ @@ -1035,9 +1041,13 @@ def _data_bytes_to_shaped_array(data_bytes, lbpack, boundary_packing, # Reform in row-column order data.shape = data_shape + if np.ma.is_masked(data): + data = data.filled(np.nan) # Mask the array? if mdi in data: - data = ma.masked_values(data, mdi, copy=False) + # data = ma.masked_values(data, mdi, copy=False) + # data = array_masked_to_nans(data) + data[data==mdi] = np.nan return data diff --git a/lib/iris/fileformats/rules.py b/lib/iris/fileformats/rules.py index 137aec545e..0e853f720b 100644 --- a/lib/iris/fileformats/rules.py +++ b/lib/iris/fileformats/rules.py @@ -909,7 +909,8 @@ def _make_cube(field, converter): attributes=metadata.attributes, cell_methods=metadata.cell_methods, dim_coords_and_dims=metadata.dim_coords_and_dims, - aux_coords_and_dims=metadata.aux_coords_and_dims) + aux_coords_and_dims=metadata.aux_coords_and_dims, + fill_value=field.bmdi) # Temporary code to deal with invalid standard names in the # translation table.