-
Notifications
You must be signed in to change notification settings - Fork 300
NetCDF save with dask support #2411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
f4ce322
80af4ae
fa5bff2
693ac38
570f2ca
0b82b11
b3ac9fc
4f062a7
6400ab2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -118,7 +118,7 @@ def multidim_lazy_stack(stack): | |
| Args: | ||
|
|
||
| * stack: | ||
| + An ndarray of dask arrays. | ||
| An ndarray of dask arrays. | ||
|
|
||
| Returns: | ||
| The input array converted to a lazy dask array. | ||
|
|
@@ -135,3 +135,64 @@ def multidim_lazy_stack(stack): | |
| result = da.stack([multidim_lazy_stack(subarray) | ||
| for subarray in stack]) | ||
| return result | ||
|
|
||
|
|
||
| def convert_nans_array(array, nans=None, result_dtype=None): | ||
| """ | ||
| Convert a :class:`~numpy.ndarray` that may contain one or more NaN values | ||
| to either a :class:`~numpy.ma.core.MaskedArray` or a | ||
| :class:`~numpy.ndarray` with the NaN values filled. | ||
|
|
||
| Args: | ||
|
|
||
| * array: | ||
| The :class:`~numpy.ndarray` to be converted. | ||
|
|
||
| Kwargs: | ||
|
|
||
| * nans: | ||
| If `nans` is None, then raise an exception if the `array` contains | ||
| any NaN values (default). | ||
|
||
| If `nans` is `numpy.ma.masked`, then convert the `array` to a | ||
| :class:`~numpy.ma.core.MaskedArray`. | ||
| Otherwise, use the specified `nans` value as the `array` fill value. | ||
|
|
||
| * result_dtype: | ||
| Cast the resultant array to this target :class:`~numpy.dtype`. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will not happen with pass-through cases (anything already masked, or integer types).
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pp-mo This is actually mentioned in the doc-string note. I think that is sufficient. |
||
|
|
||
| Returns: | ||
| An :class:`numpy.ndarray`. | ||
|
|
||
| .. note:: | ||
| An input array that is either a :class:`~numpy.ma.core.MaskedArray` | ||
| or has an integral dtype will be returned unaltered. | ||
|
|
||
| """ | ||
| if not ma.isMaskedArray(array) and array.dtype.kind == 'f': | ||
| # First, calculate the mask. | ||
| mask = np.isnan(array) | ||
| # Now, cast the dtype, if required. | ||
| if result_dtype is not None: | ||
| result_dtype = np.dtype(result_dtype) | ||
| if array.dtype != result_dtype: | ||
| array = array.astype(result_dtype) | ||
| # Finally, mask or fill the data, as required or raise an exception | ||
| # if we detect there are NaNs present and we didn't expect any. | ||
| if np.any(mask): | ||
| if nans is None: | ||
| emsg = 'Array contains unexpected NaNs.' | ||
| raise ValueError(emsg) | ||
| elif nans is ma.masked: | ||
| # Mask the array with the default fill_value. | ||
| array = ma.masked_array(array, mask=mask) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, this uses the default fill_value irrespective of what the fill value on the cube is.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exactly. We no longer care. The |
||
| else: | ||
| # Check the fill value is appropriate for the | ||
| # result array dtype. | ||
| try: | ||
| [fill_value] = np.asarray([nans], dtype=array.dtype) | ||
| except OverflowError: | ||
| emsg = 'Fill value of {!r} invalid for array result {!r}.' | ||
| raise ValueError(emsg.format(nans, array.dtype)) | ||
| # Fill the array. | ||
| array[mask] = fill_value | ||
| return array | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pp-mo I've combined my Interested on your take with my raising an exception for the Also, that this routine will perform
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, mine is very very similar here. |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| # (C) British Crown Copyright 2010 - 2016, Met Office | ||
| # (C) British Crown Copyright 2010 - 2017, Met Office | ||
| # | ||
| # This file is part of Iris. | ||
| # | ||
|
|
@@ -341,6 +341,8 @@ def interpolate(cube, sample_points, method=None): | |
| # This is **not** proper mask handling, because we cannot produce a | ||
| # masked result, but it ensures we use a "filled" version of the | ||
| # input in this case. | ||
| if cube.fill_value is not None: | ||
| source_data.fill_value = cube.fill_value | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The way The preceding inline code comment says it all really ... |
||
| source_data = source_data.filled() | ||
| new_cube.data[:] = source_data | ||
| # NOTE: we assign to "new_cube.data[:]" and *not* just "new_cube.data", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,8 +32,7 @@ | |
| import warnings | ||
| import zlib | ||
|
|
||
| import biggus | ||
| import iris._lazy_data | ||
| from iris._lazy_data import is_lazy_data | ||
| import dask.array as da | ||
| import netcdftime | ||
| import numpy as np | ||
|
|
@@ -1611,7 +1610,7 @@ def _sanitise_array(self, src, ndmin): | |
| @property | ||
| def points(self): | ||
| """Property containing the points values as a numpy array""" | ||
| if iris._lazy_data.is_lazy_data(self._points): | ||
| if is_lazy_data(self._points): | ||
| self._points = self._points.compute() | ||
| return self._points.view() | ||
|
|
||
|
|
@@ -1623,9 +1622,9 @@ def points(self, points): | |
| # of 1 and is either a numpy or lazy array. | ||
| # This will avoid Scalar coords with points of shape () rather | ||
| # than the desired (1,) | ||
| if iris._lazy_data.is_lazy_data(points): | ||
| if is_lazy_data(points): | ||
| if points.shape == (): | ||
| points = points * np.ones(1) | ||
| points = da.reshape(points, (1,)) | ||
| elif not isinstance(points, iris.aux_factory._LazyArray): | ||
| points = self._sanitise_array(points, 1) | ||
| # If points are already defined for this coordinate, | ||
|
|
@@ -1649,8 +1648,8 @@ def bounds(self): | |
| """ | ||
| if self._bounds is not None: | ||
| bounds = self._bounds | ||
| if isinstance(bounds, biggus.Array): | ||
| bounds = bounds.ndarray() | ||
| if is_lazy_data(bounds): | ||
| bounds = bounds.compute() | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a better way to do this now? I stumbled across this
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Anything with "compute" in it should be hit by #2422 to turn it into an "as_concrete_data" call. |
||
| self._bounds = bounds | ||
| bounds = bounds.view() | ||
| else: | ||
|
|
@@ -1662,8 +1661,8 @@ def bounds(self): | |
| def bounds(self, bounds): | ||
| # Ensure the bounds are a compatible shape. | ||
| if bounds is not None: | ||
| if not isinstance(bounds, (iris.aux_factory._LazyArray, | ||
| biggus.Array)): | ||
| if not (isinstance(bounds, iris.aux_factory._LazyArray) or | ||
| is_lazy_data(bounds)): | ||
| bounds = self._sanitise_array(bounds, 2) | ||
| # NB. Use _points to avoid triggering any lazy array. | ||
| if self._points.shape != bounds.shape[:-1]: | ||
|
|
@@ -1742,10 +1741,9 @@ def measure(self): | |
| def data(self): | ||
| """Property containing the data values as a numpy array""" | ||
| data = self._data | ||
| if isinstance(data, biggus.Array): | ||
| data = data.ndarray() | ||
| self._data = data | ||
| return data.view() | ||
| if is_lazy_data(self._data): | ||
| self._data = self._data.compute() | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, is this now the best way to do this ... again #2422 might address this in a better way
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Likewise subject to #2422 |
||
| return self._data.view() | ||
|
|
||
| @data.setter | ||
| def data(self, data): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,7 +42,8 @@ | |
| import iris._concatenate | ||
| import iris._constraints | ||
| from iris._deprecation import warn_deprecated | ||
| from iris._lazy_data import array_masked_to_nans, as_lazy_data, is_lazy_data | ||
| from iris._lazy_data import (array_masked_to_nans, as_lazy_data, | ||
| convert_nans_array, is_lazy_data) | ||
| import iris._merge | ||
| import iris.analysis | ||
| from iris.analysis.cartography import wrap_lons | ||
|
|
@@ -1733,16 +1734,14 @@ def data(self): | |
| if self.has_lazy_data(): | ||
| try: | ||
| data = self._dask_array.compute() | ||
| mask = np.isnan(data) | ||
| if data.dtype != self.dtype: | ||
| data = data.astype(self.dtype) | ||
| self.dtype = None | ||
| if np.all(~mask): | ||
| self._numpy_array = data | ||
| else: | ||
| fv = self.fill_value | ||
| self._numpy_array = ma.masked_array(data, mask=mask, | ||
| fill_value=fv) | ||
| # Now convert the data payload from a NaN array to a | ||
| # masked array, and if appropriate cast to the specified | ||
| # cube result dtype. | ||
| self._numpy_array = convert_nans_array(data, | ||
| nans=ma.masked, | ||
| result_dtype=self.dtype) | ||
| self.dtype = None | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pp-mo I'm expecting you to have stomped over this with your PR, but hopefully they align in logic ...
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Dead right, mine is spookily similar ! |
||
|
|
||
| except MemoryError: | ||
| msg = "Failed to create the cube's data as there was not" \ | ||
| " enough memory available.\n" \ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,7 +37,7 @@ | |
| import string | ||
| import warnings | ||
|
|
||
| import biggus | ||
| import dask.array as da | ||
| import netCDF4 | ||
| import numpy as np | ||
| import numpy.ma as ma | ||
|
|
@@ -56,7 +56,8 @@ | |
| import iris.fileformats._pyke_rules | ||
| import iris.io | ||
| import iris.util | ||
| from iris._lazy_data import array_masked_to_nans, as_lazy_data | ||
| from iris._lazy_data import (array_masked_to_nans, as_lazy_data, | ||
| convert_nans_array) | ||
|
|
||
| # Show Pyke inference engine statistics. | ||
| DEBUG = False | ||
|
|
@@ -1938,16 +1939,21 @@ def set_packing_ncattrs(cfvar): | |
| # Explicitly assign the fill_value, which will be the type default | ||
| # in the case of an unmasked array. | ||
| if packing is None: | ||
| fill_value = cube.lazy_data().fill_value | ||
| fill_value = cube.fill_value | ||
| dtype = cube.lazy_data().dtype.newbyteorder('=') | ||
|
|
||
| cf_var = self._dataset.createVariable( | ||
| cf_name, dtype, | ||
| dimension_names, fill_value=fill_value, | ||
| **kwargs) | ||
| set_packing_ncattrs(cf_var) | ||
| # stream the data | ||
| biggus.save([cube.lazy_data()], [cf_var], masked=True) | ||
|
|
||
| # Now stream the cube data payload straight to the netCDF | ||
| # data variable within the netCDF file, where any NaN values | ||
| # are replaced with the specified cube fill_value. | ||
| data = da.map_blocks(convert_nans_array, cube.lazy_data(), | ||
| nans=cube.fill_value, result_dtype=cube.dtype) | ||
| da.store([data], [cf_var]) | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the core change to this PR. Dask streamed saving, using the new |
||
|
|
||
| if cube.standard_name: | ||
| _setncattr(cf_var, 'standard_name', cube.standard_name) | ||
|
|
@@ -2045,7 +2051,7 @@ def save(cube, filename, netcdf_format='NETCDF4', local_keys=None, | |
| * Keyword arguments specifying how to save the data are applied | ||
| to each cube. To use different settings for different cubes, use | ||
| the NetCDF Context manager (:class:`~Saver`) directly. | ||
| * The save process will stream the data payload to the file using biggus, | ||
| * The save process will stream the data payload to the file using dask, | ||
| enabling large data payloads to be saved and maintaining the 'lazy' | ||
| status of the cube's data payload, unless the netcdf_format is explicitly | ||
| specified to be 'NETCDF3' or 'NETCDF3_CLASSIC'. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -203,7 +203,7 @@ def test_tri_polar__nearest(self): | |
| # TODO: arguably, we should support masked data properly in the | ||
| # interpolation routine. In the legacy code, that is unfortunately | ||
| # just not the case. | ||
| test_cube.data.set_fill_value(0.0) | ||
| test_cube.fill_value = 0 | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm just aping the same behaviour for this test, in terms of trajectories approach to dealing with masked data, in order to yield the same expected result. Seems appropriate. |
||
|
|
||
| # Test points on a regular global grid, with unrelated steps + offsets | ||
| # and an extended range of longitude values. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,6 @@ | ||
| <?xml version="1.0" ?> | ||
| <cubes xmlns="urn:x-iris:cubeml-0.2"> | ||
| <cube standard_name="eastward_wind" units="m s-1" var_name="wind1"> | ||
| <cube core-dtype="int32" dtype="int32" standard_name="eastward_wind" units="m s-1" var_name="wind1"> | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At some point we need to take out these |
||
| <attributes> | ||
| <attribute name="test" value="weak-monotonic time coordinate"/> | ||
| </attributes> | ||
|
|
@@ -18,7 +18,7 @@ | |
| <cellMethods/> | ||
| <data checksum="0xd4e7a32f" dtype="int32" shape="(3, 3, 3)"/> | ||
| </cube> | ||
| <cube standard_name="eastward_wind" units="m s-1" var_name="wind2"> | ||
| <cube core-dtype="int32" dtype="int32" standard_name="eastward_wind" units="m s-1" var_name="wind2"> | ||
| <attributes> | ||
| <attribute name="test" value="masked monotonic time coordinate"/> | ||
| </attributes> | ||
|
|
@@ -36,7 +36,7 @@ | |
| <cellMethods/> | ||
| <data checksum="0xd4e7a32f" dtype="int32" shape="(3, 3, 3)"/> | ||
| </cube> | ||
| <cube standard_name="eastward_wind" units="m s-1" var_name="wind3"> | ||
| <cube core-dtype="int32" dtype="int32" standard_name="eastward_wind" units="m s-1" var_name="wind3"> | ||
| <attributes> | ||
| <attribute name="test" value="masked non-monotonic time coordinate"/> | ||
| </attributes> | ||
|
|
@@ -52,6 +52,6 @@ | |
| </coord> | ||
| </coords> | ||
| <cellMethods/> | ||
| <data checksum="0x0f7cfdf3" dtype="int32" fill_value="-2147483647" mask_checksum="0xc0aeb298" shape="(3, 3, 3)"/> | ||
| <data checksum="0x0f7cfdf3" dtype="int32" mask_checksum="0xc0aeb298" shape="(3, 3, 3)"/> | ||
| </cube> | ||
| </cubes> | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,7 +51,6 @@ | |
|
|
||
| @tests.skip_data | ||
| class TestNetCDFLoad(tests.IrisTest): | ||
| @tests.skip_biggus | ||
| def test_monotonic(self): | ||
| cubes = iris.load(tests.get_data_path( | ||
| ('NetCDF', 'testing', 'test_monotonic_coordinate.nc'))) | ||
|
|
@@ -95,7 +94,7 @@ def test_load_global_xyzt_gems(self): | |
| # manager loading. | ||
| lnsp = cubes[1] | ||
| self.assertTrue(ma.isMaskedArray(lnsp.data)) | ||
| self.assertEqual(-32767.0, lnsp.data.fill_value) | ||
| self.assertEqual(-32767.0, lnsp.fill_value) | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We now care about the |
||
|
|
||
| def test_load_global_xyzt_gems_iter(self): | ||
| # Test loading stepped single xyzt CF-netCDF file (multi-cube). | ||
|
|
@@ -107,7 +106,6 @@ def test_load_global_xyzt_gems_iter(self): | |
| self.assertCML(cube, ('netcdf', | ||
| 'netcdf_global_xyzt_gems_iter_%d.cml' % i)) | ||
|
|
||
| @tests.skip_biggus | ||
| def test_load_rotated_xy_land(self): | ||
| # Test loading single xy rotated pole CF-netCDF file. | ||
| cube = iris.load_cube(tests.get_data_path( | ||
|
|
@@ -350,7 +348,6 @@ def test_no_name_cube(self): | |
| self.assertCDL(filename, ('netcdf', 'netcdf_save_no_name.cdl')) | ||
|
|
||
|
|
||
| @tests.skip_biggus | ||
| class TestNetCDFSave(tests.IrisTest): | ||
| def setUp(self): | ||
| self.cubell = iris.cube.Cube(np.arange(4).reshape(2, 2), | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nans is
Noneis never used so why did you include?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the default in the function formal parameter list.
Also, it's a very useful value to default to because it's gonna raise an exception if it detects that there are NaNs in your data. We're forcing people to care about this. So for example, this is good behaviour to adopt as it will force the issue in cases when streamed saving is being attempted with NaN data, but the
fill_valueisn't set i.e. None. See here in particular.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it is here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant when
convert_nans_arrayis used. but I forgot that the fill_value of a cube could be set to None as @bjlittle's comment points out