Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 11 additions & 34 deletions lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,57 +22,41 @@
"""
from __future__ import (absolute_import, division, print_function)
from six.moves import (filter, input, map, range, zip) # noqa
import six

import dask.array as da
import numpy as np


# Whether to recognise biggus arrays as lazy, *as well as* dask.
# NOTE: in either case, this module will not *make* biggus arrays, only dask.
_SUPPORT_BIGGUS = True

if _SUPPORT_BIGGUS:
import biggus


def is_lazy_data(data):
"""
Return whether the argument is an Iris 'lazy' data array.

At present, this means simply a Dask array.
We determine this by checking for a "compute" property.
NOTE: ***for now only*** accept Biggus arrays also.

"""
result = hasattr(data, 'compute')
if not result and _SUPPORT_BIGGUS:
result = isinstance(data, biggus.Array)
return result


def as_concrete_data(data):
def as_concrete_data(data, fill_value=None):
"""
Return the actual content of the argument, as a numpy masked array.

If lazy, return the realised data, otherwise return the argument unchanged.

"""
if is_lazy_data(data):
if _SUPPORT_BIGGUS and isinstance(data, biggus.Array):
# Realise biggus array.
# treat all as masked, for standard cube.data behaviour.
data = data.masked_array()
else:
# Grab a fill value, in case this is just a converted masked array.
fill_value = getattr(data, 'fill_value', None)
# Realise dask array.
data = data.compute()
# Convert NaN arrays into masked arrays for Iris' consumption.
mask = np.isnan(data)
if np.all(~mask):
mask = None
data = np.ma.masked_array(data, mask=mask,
fill_value=fill_value)
# Realise dask array.
data = data.compute()
# Convert NaN arrays into masked arrays for Iris' consumption.
mask = np.isnan(data)

if np.all(~mask):
mask = None
data = np.ma.masked_array(data, mask=mask,
fill_value=fill_value)
return data


Expand All @@ -94,17 +78,10 @@ def as_lazy_data(data):

"""
if not is_lazy_data(data):
# record the original fill value.
fill_value = getattr(data, 'fill_value', None)
if isinstance(data, np.ma.MaskedArray):
# Use with NaNs replacing the mask.
data = array_masked_to_nans(data)
data = da.from_array(data, chunks=_MAX_CHUNK_SIZE)
# Attach any fill value to the dask object.
# Note: this is not passed on to dask arrays derived from this one.
data.fill_value = fill_value
elif not hasattr(data, 'fill_value'):
data.fill_value = None # make it look more like a biggus Array ?
return data


Expand Down
101 changes: 56 additions & 45 deletions lib/iris/cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import zlib

import biggus
import dask.array as da
import numpy as np
import numpy.ma as ma

Expand Down Expand Up @@ -64,7 +65,8 @@ class CubeMetadata(collections.namedtuple('CubeMetadata',
'var_name',
'units',
'attributes',
'cell_methods'])):
'cell_methods',
'dtype', 'fill_value'])):
"""
Represents the phenomenon metadata for a single :class:`Cube`.

Expand Down Expand Up @@ -648,7 +650,7 @@ def __init__(self, data, standard_name=None, long_name=None,
var_name=None, units=None, attributes=None,
cell_methods=None, dim_coords_and_dims=None,
aux_coords_and_dims=None, aux_factories=None,
cell_measures_and_dims=None):
cell_measures_and_dims=None, dtype=None, fill_value=None):
"""
Creates a cube with data and optional metadata.

Expand Down Expand Up @@ -714,9 +716,15 @@ def __init__(self, data, standard_name=None, long_name=None,
if isinstance(data, six.string_types):
raise TypeError('Invalid data type: {!r}.'.format(data))

self.shape = data.shape
if dtype is not None and dtype != data.dtype:
raise ValueError('dtype must match data')
self.dtype = data.dtype
self.fill_value = fill_value

if not is_lazy_data(data):
data = np.asarray(data)
self._my_data = data
self.data_graph = as_lazy_data(data)

#: The "standard name" for the Cube's phenomenon.
self.standard_name = standard_name
Expand Down Expand Up @@ -786,7 +794,8 @@ def metadata(self):

"""
return CubeMetadata(self.standard_name, self.long_name, self.var_name,
self.units, self.attributes, self.cell_methods)
self.units, self.attributes, self.cell_methods,
self.dtype, self.fill_value)

@metadata.setter
def metadata(self, value):
Expand Down Expand Up @@ -1589,16 +1598,16 @@ def cell_methods(self):
def cell_methods(self, cell_methods):
self._cell_methods = tuple(cell_methods) if cell_methods else tuple()

@property
def shape(self):
"""The shape of the data of this cube."""
shape = self._my_data.shape
return shape
# @property
# def shape(self):
# """The shape of the data of this cube."""
# shape = self.data_graph.shape
# return shape

@property
def dtype(self):
"""The :class:`numpy.dtype` of the data of this cube."""
return self._my_data.dtype
# @property
# def dtype(self):
# """The :class:`numpy.dtype` of the data of this cube."""
# return self.data_graph.dtype

@property
def ndim(self):
Expand Down Expand Up @@ -1642,11 +1651,8 @@ def lazy_data(self, array=None):
if self.shape or array.shape != (1,):
raise ValueError('Require cube data with shape %r, got '
'%r.' % (self.shape, array.shape))
self._my_data = array
else:
array = self._my_data
array = as_lazy_data(array)
return array
self.data_graph = array
return self.data_graph

@property
def data(self):
Expand Down Expand Up @@ -1681,26 +1687,29 @@ def data(self):
(10, 20)

"""
data = self._my_data
if is_lazy_data(data):
try:
data = as_concrete_data(data)
except MemoryError:
msg = "Failed to create the cube's data as there was not" \
" enough memory available.\n" \
"The array shape would have been {0!r} and the data" \
" type {1}.\n" \
"Consider freeing up variables or indexing the cube" \
" before getting its data."
msg = msg.format(self.shape, data.dtype)
raise MemoryError(msg)
# Unmask the array only if it is filled.
if (isinstance(data, np.ma.masked_array) and
ma.count_masked(data) == 0):
data = data.data
# data may be a numeric type, so ensure an np.ndarray is returned
self._my_data = np.asanyarray(data)
return self._my_data
data = self.data_graph
chunks = self.data_graph.chunks
try:
data = as_concrete_data(data, fill_value=self.fill_value)
except MemoryError:
msg = "Failed to create the cube's data as there was not" \
" enough memory available.\n" \
"The array shape would have been {0!r} and the data" \
" type {1}.\n" \
"Consider freeing up variables or indexing the cube" \
" before getting its data."
msg = msg.format(self.shape, data.dtype)
raise MemoryError(msg)

# Unmask the array only if it is filled.
if (isinstance(data, np.ma.masked_array) and
ma.count_masked(data) == 0):
data = data.data
# data may be a numeric type, so ensure an np.ndarray is returned
data = np.asanyarray(data)
# Create a dask data_graph and link the cube to this
self.data_graph = da.from_array(data.data, chunks)
return data

@data.setter
def data(self, value):
Expand All @@ -1713,11 +1722,13 @@ def data(self, value):
if self.shape or data.shape != (1,):
raise ValueError('Require cube data with shape %r, got '
'%r.' % (self.shape, data.shape))

self._my_data = data
self.dtype = data.dtype
self.data_graph = as_lazy_data(data)

def has_lazy_data(self):
return is_lazy_data(self._my_data)
# now this always returns true, new pattern needed
return is_lazy_data(self.data_graph)


@property
def dim_coords(self):
Expand Down Expand Up @@ -2182,9 +2193,9 @@ def new_cell_measure_dims(cm_):
first_slice = None

if first_slice is not None:
data = self._my_data[first_slice]
data = self.data_graph[first_slice]
else:
data = copy.deepcopy(self._my_data)
data = copy.deepcopy(self.data_graph)

for other_slice in slice_gen:
data = data[other_slice]
Expand Down Expand Up @@ -2819,9 +2830,9 @@ def transpose(self, new_order=None):
raise ValueError('Incorrect number of dimensions.')

if self.has_lazy_data():
self._my_data = self.lazy_data().transpose(new_order)
self.data_graph = self.lazy_data().transpose(new_order)
else:
self._my_data = self.data.transpose(new_order)
self.data_graph = self.data.transpose(new_order)

dim_mapping = {src: dest for dest, src in enumerate(new_order)}

Expand Down
34 changes: 21 additions & 13 deletions lib/iris/fileformats/netcdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import warnings

import biggus
import dask.array as da
import netCDF4
import numpy as np
import numpy.ma as ma
Expand All @@ -56,7 +57,6 @@
import iris.fileformats._pyke_rules
import iris.io
import iris.util
import iris._lazy_data


# Show Pyke inference engine statistics.
Expand Down Expand Up @@ -374,27 +374,35 @@ def _pyke_kb_engine():
class NetCDFDataProxy(object):
"""A reference to the data payload of a single NetCDF file variable."""

__slots__ = ('shape', 'dtype', 'path', 'variable_name', 'fill_value')
__slots__ = ('shape', 'dtype', 'path', 'variable_name', 'fill_value',
'_data_cache')

def __init__(self, shape, dtype, path, variable_name, fill_value):
self.shape = shape
self.dtype = dtype
self.path = path
self.variable_name = variable_name
self.fill_value = fill_value
self._data_cache = {}

@property
def ndim(self):
return len(self.shape)

def __getitem__(self, keys):
dataset = netCDF4.Dataset(self.path)
try:
variable = dataset.variables[self.variable_name]
# Get the NetCDF variable data and slice.
data = variable[keys]
finally:
dataset.close()
if keys not in self._data_cache.keys():
dataset = netCDF4.Dataset(self.path)
try:
variable = dataset.variables[self.variable_name]
# Get the NetCDF variable data and slice.
v = variable[keys]
if isinstance(v, np.ma.MaskedArray):
self._data_cache[str(keys)] = v.filled(np.nan)
else:
self._data_cache[str(keys)] = v[keys]
finally:
dataset.close()
data = self._data_cache[str(keys)]
return data

def __repr__(self):
Expand Down Expand Up @@ -501,12 +509,12 @@ def _load_cube(engine, cf, cf_var, filename):
dummy_data = cf_var.add_offset + dummy_data

# Create cube with deferred data, but no metadata
fill_value = getattr(cf_var.cf_data, '_FillValue',
netCDF4.default_fillvals[cf_var.dtype.str[1:]])
fill_value = getattr(cf_var.cf_data, '_FillValue', None)
# netCDF4.default_fillvals[cf_var.dtype.str[1:]])
proxy = NetCDFDataProxy(cf_var.shape, dummy_data.dtype,
filename, cf_var.cf_name, fill_value)
data = iris._lazy_data.as_lazy_data(proxy)
cube = iris.cube.Cube(data)
data = da.from_array(proxy, chunks=100)
cube = iris.cube.Cube(data, fill_value=fill_value)

# Reset the pyke inference engine.
engine.reset()
Expand Down
Loading