-
Notifications
You must be signed in to change notification settings - Fork 300
Dask landsea masks bugfix #3255
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| # (C) British Crown Copyright 2010 - 2018, Met Office | ||
| # (C) British Crown Copyright 2010 - 2019, Met Office | ||
| # | ||
| # This file is part of Iris. | ||
| # | ||
|
|
@@ -37,6 +37,9 @@ | |
| import numpy.ma as ma | ||
| import cftime | ||
|
|
||
| import dask | ||
| import dask.array as da | ||
|
|
||
| from iris._deprecation import warn_deprecated | ||
| from iris._lazy_data import as_concrete_data, as_lazy_data, is_lazy_data | ||
| import iris.config | ||
|
|
@@ -602,10 +605,10 @@ class PPDataProxy(object): | |
| """A reference to the data payload of a single PP field.""" | ||
|
|
||
| __slots__ = ('shape', 'src_dtype', 'path', 'offset', 'data_len', | ||
| '_lbpack', 'boundary_packing', 'mdi', 'mask') | ||
| '_lbpack', 'boundary_packing', 'mdi') | ||
|
|
||
| def __init__(self, shape, src_dtype, path, offset, data_len, | ||
| lbpack, boundary_packing, mdi, mask): | ||
| lbpack, boundary_packing, mdi): | ||
| self.shape = shape | ||
| self.src_dtype = src_dtype | ||
| self.path = path | ||
|
|
@@ -614,7 +617,6 @@ def __init__(self, shape, src_dtype, path, offset, data_len, | |
| self.lbpack = lbpack | ||
| self.boundary_packing = boundary_packing | ||
| self.mdi = mdi | ||
| self.mask = mask | ||
|
|
||
| # lbpack | ||
| def _lbpack_setter(self, value): | ||
|
|
@@ -649,14 +651,14 @@ def __getitem__(self, keys): | |
| self.lbpack, | ||
| self.boundary_packing, | ||
| self.shape, self.src_dtype, | ||
| self.mdi, self.mask) | ||
| self.mdi) | ||
| data = data.__getitem__(keys) | ||
| return np.asanyarray(data, dtype=self.dtype) | ||
|
|
||
| def __repr__(self): | ||
| fmt = '<{self.__class__.__name__} shape={self.shape}' \ | ||
| ' src_dtype={self.dtype!r} path={self.path!r}' \ | ||
| ' offset={self.offset} mask={self.mask!r}>' | ||
| ' offset={self.offset}>' | ||
| return fmt.format(self=self) | ||
|
|
||
| def __getstate__(self): | ||
|
|
@@ -772,24 +774,29 @@ def _data_bytes_to_shaped_array(data_bytes, lbpack, boundary_packing, | |
|
|
||
| elif lbpack.n2 == 2: | ||
| if mask is None: | ||
| raise ValueError('No mask was found to unpack the data. ' | ||
| 'Could not load.') | ||
| land_mask = mask.data.astype(np.bool) | ||
| sea_mask = ~land_mask | ||
| new_data = np.ma.masked_all(land_mask.shape) | ||
| new_data.fill_value = mdi | ||
| if lbpack.n3 == 1: | ||
| # Land mask packed data. | ||
| # Sometimes the data comes in longer than it should be (i.e. it | ||
| # looks like the compressed data is compressed, but the trailing | ||
| # data hasn't been clipped off!). | ||
| new_data[land_mask] = data[:land_mask.sum()] | ||
| elif lbpack.n3 == 2: | ||
| # Sea mask packed data. | ||
| new_data[sea_mask] = data[:sea_mask.sum()] | ||
| # If we are given no mask to apply, then just return raw data, even | ||
| # though it does not have the correct shape. | ||
| # For dask-delayed loading, this means that mask, data and the | ||
| # combination can be properly handled within a dask graph. | ||
| # However, we still mask any MDI values in the array (below). | ||
| pass | ||
| else: | ||
| raise ValueError('Unsupported mask compression.') | ||
| data = new_data | ||
| land_mask = mask.data.astype(np.bool) | ||
| sea_mask = ~land_mask | ||
| new_data = np.ma.masked_all(land_mask.shape) | ||
| new_data.fill_value = mdi | ||
| if lbpack.n3 == 1: | ||
| # Land mask packed data. | ||
| # Sometimes the data comes in longer than it should be (i.e. it | ||
| # looks like the compressed data is compressed, but the | ||
| # trailing data hasn't been clipped off!). | ||
| new_data[land_mask] = data[:land_mask.sum()] | ||
| elif lbpack.n3 == 2: | ||
| # Sea mask packed data. | ||
| new_data[sea_mask] = data[:sea_mask.sum()] | ||
| else: | ||
| raise ValueError('Unsupported mask compression.') | ||
| data = new_data | ||
|
|
||
| else: | ||
| # Reform in row-column order | ||
|
|
@@ -1581,51 +1588,68 @@ def _interpret_fields(fields): | |
| numpy arrays (via the _create_field_data) function. | ||
|
|
||
| """ | ||
| land_mask = None | ||
| land_mask_field = None | ||
| landmask_compressed_fields = [] | ||
| for field in fields: | ||
| # Store the first reference to a land mask, and use this as the | ||
| # definitive mask for future fields in this generator. | ||
| if land_mask is None and field.lbuser[6] == 1 and \ | ||
| if land_mask_field is None and field.lbuser[6] == 1 and \ | ||
| (field.lbuser[3] // 1000) == 0 and \ | ||
| (field.lbuser[3] % 1000) == 30: | ||
| land_mask = field | ||
| land_mask_field = field | ||
|
|
||
| # Handle land compressed data payloads, | ||
| # when lbpack.n2 is 2. | ||
| # Handle land compressed data payloads, when lbpack.n2 is 2. | ||
| if (field.raw_lbpack // 10 % 10) == 2: | ||
| if land_mask is None: | ||
| # Field uses land-mask packing, so needs a related land-mask field. | ||
| if land_mask_field is None: | ||
| landmask_compressed_fields.append(field) | ||
| # Land-masked fields have their size+shape defined by the | ||
| # reference landmask field, so we can't yield them if they | ||
| # are encountered *before* the landmask. | ||
| # In that case, defer them, and process them all afterwards at | ||
| # the end of the file. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pp-mo Nice clarification, thanks! |
||
| continue | ||
|
|
||
| # Land compressed fields don't have a lbrow and lbnpt. | ||
| field.lbrow, field.lbnpt = land_mask.lbrow, land_mask.lbnpt | ||
| # Land-mask compressed fields don't have an lbrow and lbnpt. | ||
| field.lbrow, field.lbnpt = \ | ||
| land_mask_field.lbrow, land_mask_field.lbnpt | ||
| _create_field_data(field, (field.lbrow, field.lbnpt), | ||
| land_mask_field=land_mask_field) | ||
| else: | ||
| # Field does not use land-mask packing. | ||
| _create_field_data(field, (field.lbrow, field.lbnpt)) | ||
|
|
||
| data_shape = (field.lbrow, field.lbnpt) | ||
| _create_field_data(field, data_shape, land_mask) | ||
| yield field | ||
|
|
||
| # At file end, return any land-masked fields that were deferred because | ||
| # they were encountered before the landmask reference field. | ||
| if landmask_compressed_fields: | ||
| if land_mask is None: | ||
| if land_mask_field is None: | ||
| warnings.warn('Landmask compressed fields existed without a ' | ||
| 'landmask to decompress with. The data will have ' | ||
| 'a shape of (0, 0) and will not read.') | ||
| mask_shape = (0, 0) | ||
| else: | ||
| mask_shape = (land_mask.lbrow, land_mask.lbnpt) | ||
| mask_shape = (land_mask_field.lbrow, land_mask_field.lbnpt) | ||
|
|
||
| for field in landmask_compressed_fields: | ||
| field.lbrow, field.lbnpt = mask_shape | ||
| _create_field_data(field, (field.lbrow, field.lbnpt), land_mask) | ||
| _create_field_data(field, mask_shape, | ||
| land_mask_field=land_mask_field) | ||
| yield field | ||
|
|
||
|
|
||
| def _create_field_data(field, data_shape, land_mask): | ||
| def _create_field_data(field, data_shape, land_mask_field=None): | ||
| """ | ||
| Modifies a field's ``_data`` attribute either by: | ||
| * converting DeferredArrayBytes into a lazy array, | ||
| * converting a 'deferred array bytes' tuple into a lazy array, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pp-mo Yup 👍 |
||
| * converting LoadedArrayBytes into an actual numpy array. | ||
|
|
||
| If 'land_mask_field' is passed (not None), then it contains the associated | ||
| landmask, which is also a field : Its data array is used as a template for | ||
| `field`'s data, determining its size, shape and the locations of all the | ||
| valid (non-missing) datapoints. | ||
|
|
||
| """ | ||
| if isinstance(field.core_data(), LoadedArrayBytes): | ||
| loaded_bytes = field.core_data() | ||
|
|
@@ -1634,7 +1658,8 @@ def _create_field_data(field, data_shape, land_mask): | |
| field.boundary_packing, | ||
| data_shape, | ||
| loaded_bytes.dtype, | ||
| field.bmdi, land_mask) | ||
| field.bmdi, | ||
| land_mask_field) | ||
| else: | ||
| # Wrap the reference to the data payload within a data proxy | ||
| # in order to support deferred data loading. | ||
|
|
@@ -1643,9 +1668,64 @@ def _create_field_data(field, data_shape, land_mask): | |
| fname, position, n_bytes, | ||
| field.raw_lbpack, | ||
| field.boundary_packing, | ||
| field.bmdi, land_mask) | ||
| field.bmdi) | ||
| block_shape = data_shape if 0 not in data_shape else (1, 1) | ||
| field.data = as_lazy_data(proxy, chunks=block_shape) | ||
| if land_mask_field is None: | ||
| # For a "normal" (non-landsea-masked) field, the proxy can be | ||
| # wrapped directly as a deferred array. | ||
| field.data = as_lazy_data(proxy, chunks=block_shape) | ||
| else: | ||
| # This is a landsea-masked field, and its data must be handled in | ||
| # a different way : Because data shape/size is not known in | ||
| # advance, the data+mask calculation can't be represented | ||
| # as a dask-array operation. Instead, we make that calculation | ||
| # 'delayed', and then use 'from_delayed' to make the result back | ||
| # into a dask array -- because the final result shape *is* known. | ||
| @dask.delayed | ||
| def fetch_valid_values_array(): | ||
| # Return the data values array (shape+size unknown). | ||
| return proxy[:] | ||
|
|
||
| delayed_valid_values = fetch_valid_values_array() | ||
|
|
||
| # Get the mask data-array from the landsea-mask field. | ||
| # This is *either* a lazy or a real array, we don't actually care. | ||
| # If this is a deferred dependency, the delayed calc can see that. | ||
| mask_field_array = land_mask_field.core_data() | ||
|
|
||
| # Check whether this field uses a land or a sea mask. | ||
| if field.lbpack.n3 not in (1, 2): | ||
| raise ValueError('Unsupported mask compression : ' | ||
| 'lbpack.n3 = {}.'.format(field.lbpack.n3)) | ||
| if field.lbpack.n3 == 2: | ||
| # Sea-mask packing : points are inverse of the land-mask. | ||
| mask_field_array = ~mask_field_array | ||
|
|
||
| # Define the mask+data calculation as a deferred operation. | ||
| # NOTE: duplicates the operation in _data_bytes_to_shaped_array. | ||
| @dask.delayed | ||
| def calc_array(mask, values): | ||
| # Note: "mask" is True at *valid* points, not missing ones. | ||
| # First ensure the mask array is boolean (not int). | ||
| mask = mask.astype(bool) | ||
| result = ma.masked_all(mask.shape, dtype=dtype) | ||
| # Apply the fill-value from the proxy object. | ||
| # Note: 'values' is just 'proxy' in a dask wrapper. This arg | ||
| # must be a dask type so that 'delayed' can recognise it, but | ||
| # that provides no access to the underlying fill value. | ||
| result.fill_value = proxy.mdi | ||
| n_values = np.sum(mask) | ||
| if n_values > 0: | ||
| # Note: data field can have excess values, but not fewer. | ||
| result[mask] = values[:n_values] | ||
| return result | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pp-mo Nice 👍 The only part that seems to be missing compare to the functionality of If so, then does the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sometime back when I was writing this, I had concluded that the fill-value was not respected by dask anyway, so this wasn't necessary. But the suggestions feels appropriate + I think does no harm, so I will put it back in. |
||
|
|
||
| delayed_result = calc_array(mask_field_array, | ||
| delayed_valid_values) | ||
| lazy_result_array = da.from_delayed(delayed_result, | ||
| shape=block_shape, | ||
| dtype=dtype) | ||
| field.data = lazy_result_array | ||
|
|
||
|
|
||
| def _field_gen(filename, read_data_bytes, little_ended=False): | ||
|
|
@@ -1655,12 +1735,19 @@ def _field_gen(filename, read_data_bytes, little_ended=False): | |
|
|
||
| A field returned by the generator is only "half-formed" because its | ||
| `_data` attribute represents a simple one-dimensional stream of | ||
| bytes. (Encoded as an instance of either LoadedArrayBytes or | ||
| DeferredArrayBytes, depending on the value of `read_data_bytes`.) | ||
| bytes. (Encoded either as an instance of LoadedArrayBytes or as a | ||
| 'deferred array bytes' tuple, depending on the value of `read_data_bytes`.) | ||
| This is because fields encoded with a land/sea mask do not contain | ||
| sufficient information within the field to determine the final | ||
| two-dimensional shape of the data. | ||
|
|
||
| The callers of this routine are the 'load' routines (both PP and FF). | ||
| They both filter the resulting stream of fields with `_interpret_fields`, | ||
| which replaces each field.data with an actual array (real or lazy). | ||
| This is done separately so that `_interpret_fields` can detect land-mask | ||
| fields and use them to construct data arrays for any fields which use | ||
| land/sea-mask packing. | ||
|
|
||
| """ | ||
| dtype_endian_char = '<' if little_ended else '>' | ||
| with open(filename, 'rb') as pp_file: | ||
|
|
@@ -1738,7 +1825,10 @@ def _field_gen(filename, read_data_bytes, little_ended=False): | |
| pp_field.data = LoadedArrayBytes(pp_file.read(data_len), | ||
| dtype) | ||
| else: | ||
| # Provide enough context to read the data bytes later on. | ||
| # Provide enough context to read the data bytes later on, | ||
| # as a 'deferred array bytes' tuple. | ||
| # N.B. this used to be a namedtuple called DeferredArrayBytes, | ||
| # but it no longer is. Possibly for performance reasons? | ||
| pp_field.data = (filename, pp_file.tell(), data_len, dtype) | ||
| # Seek over the actual data payload. | ||
| pp_file_seek(data_len, os.SEEK_CUR) | ||
|
|
@@ -2155,8 +2245,6 @@ def save_fields(fields, target, append=False): | |
| of the file. | ||
| Only applicable when target is a filename, not a file handle. | ||
| Default is False. | ||
| * callback: | ||
| A modifier/filter function. | ||
|
|
||
| See also :func:`iris.io.save`. | ||
|
|
||
|
|
@@ -2185,11 +2273,9 @@ def save_fields(fields, target, append=False): | |
|
|
||
| if isinstance(target, six.string_types): | ||
| pp_file = open(target, "ab" if append else "wb") | ||
| filename = target | ||
| elif hasattr(target, "write"): | ||
| if hasattr(target, "mode") and "b" not in target.mode: | ||
| raise ValueError("Target not binary") | ||
| filename = target.name if hasattr(target, 'name') else None | ||
| pp_file = target | ||
| else: | ||
| raise ValueError("Can only save pp to filename or writable") | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pp-mo We're assuming that the
land_mask_fieldis constant, right? i.e. multiple occurrences are simply duplicatesIf that's the case, then we're good... and I'm really hoping that's the case here, otherwise it's going to get complicated quickly 😨
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem here, discussed to clarify. Always snaps the first land/sea mask.