diff --git a/lib/iris/__init__.py b/lib/iris/__init__.py index 38465472ee..0e6670533f 100644 --- a/lib/iris/__init__.py +++ b/lib/iris/__init__.py @@ -89,12 +89,12 @@ def callback(cube, field, filename): """ +from collections.abc import Iterable import contextlib import glob import importlib import itertools import os.path -import pathlib import threading import iris._constraints @@ -256,7 +256,8 @@ def context(self, **kwargs): def _generate_cubes(uris, callback, constraints): """Returns a generator of cubes given the URIs and a callback.""" - if isinstance(uris, (str, pathlib.PurePath)): + if isinstance(uris, str) or not isinstance(uris, Iterable): + # Make a string, or other single item, into an iterable. uris = [uris] # Group collections of uris by their iris handler @@ -273,6 +274,10 @@ def _generate_cubes(uris, callback, constraints): urls = [":".join(x) for x in groups] for cube in iris.io.load_http(urls, callback): yield cube + elif scheme == "data": + data_objects = [x[1] for x in groups] + for cube in iris.io.load_data_objects(data_objects, callback): + yield cube else: raise ValueError("Iris cannot handle the URI scheme: %s" % scheme) diff --git a/lib/iris/fileformats/__init__.py b/lib/iris/fileformats/__init__.py index 96a848deb0..ceafa5b97e 100644 --- a/lib/iris/fileformats/__init__.py +++ b/lib/iris/fileformats/__init__.py @@ -9,6 +9,7 @@ """ from iris.io.format_picker import ( + DataSourceObjectProtocol, FileExtension, FormatAgent, FormatSpecification, @@ -125,16 +126,32 @@ def _load_grib(*args, **kwargs): ) -_nc_dap = FormatSpecification( - "NetCDF OPeNDAP", - UriProtocol(), - lambda protocol: protocol in ["http", "https"], - netcdf.load_cubes, - priority=6, - constraint_aware_handler=True, +FORMAT_AGENT.add_spec( + FormatSpecification( + "NetCDF OPeNDAP", + UriProtocol(), + lambda protocol: protocol in ["http", "https"], + netcdf.load_cubes, + priority=6, + constraint_aware_handler=True, + ) ) -FORMAT_AGENT.add_spec(_nc_dap) -del _nc_dap + +# NetCDF file presented as an open, readable netCDF4 dataset (or mimic). +FORMAT_AGENT.add_spec( + FormatSpecification( + "NetCDF dataset", + DataSourceObjectProtocol(), + lambda object: all( + hasattr(object, x) + for x in ("variables", "dimensions", "groups", "ncattrs") + ), + netcdf.load_cubes, # using the same call : it must distinguish. + priority=4, + constraint_aware_handler=True, + ) +) + # # UM Fieldsfiles. diff --git a/lib/iris/fileformats/cf.py b/lib/iris/fileformats/cf.py index a21e1d975f..569549df62 100644 --- a/lib/iris/fileformats/cf.py +++ b/lib/iris/fileformats/cf.py @@ -1043,17 +1043,25 @@ class CFReader: # TODO: remove once iris.experimental.ugrid.CFUGridReader is folded in. CFGroup = CFGroup - def __init__(self, filename, warn=False, monotonic=False): - self._dataset = None - self._filename = os.path.expanduser(filename) + def __init__(self, file_source, warn=False, monotonic=False): + # Ensure safe operation for destructor, should init fail. + self._own_file = False + if isinstance(file_source, str): + # Create from filepath : open it + own it (=close when we die). + self._filename = os.path.expanduser(file_source) + self._dataset = _thread_safe_nc.DatasetWrapper( + self._filename, mode="r" + ) + self._own_file = True + else: + # We have been passed an open dataset. + # We use it but don't own it (don't close it). + self._dataset = file_source + self._filename = self._dataset.filepath() #: Collection of CF-netCDF variables associated with this netCDF file self.cf_group = self.CFGroup() - self._dataset = _thread_safe_nc.DatasetWrapper( - self._filename, mode="r" - ) - # Issue load optimisation warning. if warn and self._dataset.file_format in [ "NETCDF3_CLASSIC", @@ -1311,7 +1319,7 @@ def _reset(self): def _close(self): # Explicitly close dataset to prevent file remaining open. - if self._dataset is not None: + if self._own_file and self._dataset is not None: self._dataset.close() self._dataset = None diff --git a/lib/iris/fileformats/netcdf/loader.py b/lib/iris/fileformats/netcdf/loader.py index 8fcab61d17..2d75a786fe 100644 --- a/lib/iris/fileformats/netcdf/loader.py +++ b/lib/iris/fileformats/netcdf/loader.py @@ -13,6 +13,7 @@ Also : `CF Conventions `_. """ +from collections.abc import Iterable import warnings import numpy as np @@ -174,25 +175,35 @@ def _get_actual_dtype(cf_var): def _get_cf_var_data(cf_var, filename): - # Get lazy chunked data out of a cf variable. - dtype = _get_actual_dtype(cf_var) - - # Create cube with deferred data, but no metadata - fill_value = getattr( - cf_var.cf_data, - "_FillValue", - _thread_safe_nc.default_fillvals[cf_var.dtype.str[1:]], - ) - proxy = NetCDFDataProxy( - cf_var.shape, dtype, filename, cf_var.cf_name, fill_value - ) - # Get the chunking specified for the variable : this is either a shape, or - # maybe the string "contiguous". - chunks = cf_var.cf_data.chunking() - # In the "contiguous" case, pass chunks=None to 'as_lazy_data'. - if chunks == "contiguous": - chunks = None - return as_lazy_data(proxy, chunks=chunks) + if hasattr(cf_var, "_in_memory_data"): + # The variable is not an actual netCDF4 file variable, but an emulating + # object with an attached data array (either numpy or dask), which can be + # returned immediately as-is. This is used as a hook to translate data to/from + # netcdf data container objects in other packages, such as xarray. + # See https://github.com/SciTools/iris/issues/4994 "Xarray bridge". + result = cf_var._in_memory_data + else: + # Get lazy chunked data out of a cf variable. + dtype = _get_actual_dtype(cf_var) + + # Create cube with deferred data, but no metadata + fill_value = getattr( + cf_var.cf_data, + "_FillValue", + _thread_safe_nc.default_fillvals[cf_var.dtype.str[1:]], + ) + proxy = NetCDFDataProxy( + cf_var.shape, dtype, filename, cf_var.cf_name, fill_value + ) + # Get the chunking specified for the variable : this is either a shape, or + # maybe the string "contiguous". + chunks = cf_var.cf_data.chunking() + # In the "contiguous" case, pass chunks=None to 'as_lazy_data'. + if chunks == "contiguous": + chunks = None + result = as_lazy_data(proxy, chunks=chunks) + + return result class _OrderedAddableList(list): @@ -456,14 +467,15 @@ def inner(cf_datavar): return result -def load_cubes(filenames, callback=None, constraints=None): +def load_cubes(file_sources, callback=None, constraints=None): """ - Loads cubes from a list of NetCDF filenames/OPeNDAP URLs. + Loads cubes from a list of NetCDF file_sources/OPeNDAP URLs. Args: - * filenames (string/list): - One or more NetCDF filenames/OPeNDAP URLs to load from. + * file_sources (string/list): + One or more NetCDF file_sources/OPeNDAP URLs to load from. + OR open datasets. Kwargs: @@ -491,18 +503,18 @@ def load_cubes(filenames, callback=None, constraints=None): # Create an actions engine. engine = _actions_engine() - if isinstance(filenames, str): - filenames = [filenames] + if isinstance(file_sources, str) or not isinstance(file_sources, Iterable): + file_sources = [file_sources] - for filename in filenames: - # Ingest the netCDF file. + for file_source in file_sources: + # Ingest the file. At present may be a filepath or an open netCDF4.Dataset. meshes = {} if PARSE_UGRID_ON_LOAD: cf_reader_class = CFUGridReader else: cf_reader_class = iris.fileformats.cf.CFReader - with cf_reader_class(filename) as cf: + with cf_reader_class(file_source) as cf: if PARSE_UGRID_ON_LOAD: meshes = _meshes_from_cf(cf) @@ -536,7 +548,7 @@ def load_cubes(filenames, callback=None, constraints=None): if mesh is not None: mesh_coords, mesh_dim = _build_mesh_coords(mesh, cf_var) - cube = _load_cube(engine, cf, cf_var, filename) + cube = _load_cube(engine, cf, cf_var, cf.filename) # Attach the mesh (if present) to the cube. for mesh_coord in mesh_coords: @@ -550,7 +562,7 @@ def load_cubes(filenames, callback=None, constraints=None): warnings.warn("{}".format(e)) # Perform any user registered callback function. - cube = run_callback(callback, cube, cf_var, filename) + cube = run_callback(callback, cube, cf_var, file_source) # Callback mechanism may return None, which must not be yielded if cube is None: diff --git a/lib/iris/fileformats/netcdf/saver.py b/lib/iris/fileformats/netcdf/saver.py index f7f4864f9e..a146dff93c 100644 --- a/lib/iris/fileformats/netcdf/saver.py +++ b/lib/iris/fileformats/netcdf/saver.py @@ -462,7 +462,7 @@ def _setncattr(variable, name, attribute): NOTE: variable needs to be a _thread_safe_nc._ThreadSafeWrapper subclass. """ - assert hasattr(variable, "THREAD_SAFE_FLAG") + # assert hasattr(variable, "THREAD_SAFE_FLAG") attribute = _bytes_if_ascii(attribute) return variable.setncattr(name, attribute) @@ -507,6 +507,7 @@ def __init__(self, filename, netcdf_format): * filename (string): Name of the netCDF file to save the cube. + OR a writeable object supporting the netCF4.Dataset api. * netcdf_format (string): Underlying netCDF file format, one of 'NETCDF4', 'NETCDF4_CLASSIC', @@ -549,29 +550,36 @@ def __init__(self, filename, netcdf_format): #: A dictionary, mapping formula terms to owner cf variable name self._formula_terms_cache = {} #: NetCDF dataset - try: - self._dataset = _thread_safe_nc.DatasetWrapper( - filename, mode="w", format=netcdf_format - ) - except RuntimeError: - dir_name = os.path.dirname(filename) - if not os.path.isdir(dir_name): - msg = "No such file or directory: {}".format(dir_name) - raise IOError(msg) - if not os.access(dir_name, os.R_OK | os.W_OK): - msg = "Permission denied: {}".format(filename) - raise IOError(msg) - else: - raise + self._dataset = None # this line just for the API page + # Detect if we were passed a pre-opened dataset + self._to_open_dataset = hasattr(filename, "createVariable") + if self._to_open_dataset: + self._dataset = filename + else: + try: + self._dataset = _thread_safe_nc.DatasetWrapper( + filename, mode="w", format=netcdf_format + ) + except RuntimeError: + dir_name = os.path.dirname(filename) + if not os.path.isdir(dir_name): + msg = "No such file or directory: {}".format(dir_name) + raise IOError(msg) + if not os.access(dir_name, os.R_OK | os.W_OK): + msg = "Permission denied: {}".format(filename) + raise IOError(msg) + else: + raise def __enter__(self): return self def __exit__(self, type, value, traceback): """Flush any buffered data to the CF-netCDF file before closing.""" - self._dataset.sync() - self._dataset.close() + if not self._to_open_dataset: + # Only close if the Saver created it. + self._dataset.close() def write( self, @@ -1621,10 +1629,7 @@ def _create_cf_bounds(self, coord, cf_var, cf_name): cf_var.dimensions + (bounds_dimension_name,), ) self._lazy_stream_data( - data=bounds, - fill_value=None, - fill_warn=True, - cf_var=cf_var_bounds, + data=bounds, cf_var=cf_var_bounds, fill_value=None ) def _get_cube_variable_name(self, cube): @@ -1957,9 +1962,7 @@ def _create_generic_cf_array_var( self._create_cf_bounds(element, cf_var, cf_name) # Add the data to the CF-netCDF variable. - self._lazy_stream_data( - data=data, fill_value=fill_value, fill_warn=True, cf_var=cf_var - ) + self._lazy_stream_data(data=data, cf_var=cf_var, fill_value=fill_value) # Add names + units self._set_cf_var_attributes(cf_var, element) @@ -2333,7 +2336,7 @@ def set_packing_ncattrs(cfvar): NOTE: cfvar needs to be a _thread_safe_nc._ThreadSafeWrapper subclass. """ - assert hasattr(cfvar, "THREAD_SAFE_FLAG") + # assert hasattr(cfvar, "THREAD_SAFE_FLAG") if packing: if scale_factor: _setncattr(cfvar, "scale_factor", scale_factor) @@ -2352,9 +2355,9 @@ def set_packing_ncattrs(cfvar): set_packing_ncattrs(cf_var) self._lazy_stream_data( data=data, + cf_var=cf_var, fill_value=fill_value, fill_warn=(not packing), - cf_var=cf_var, ) if cube.standard_name: @@ -2445,7 +2448,7 @@ def _increment_name(self, varname): return "{}_{}".format(varname, num) @staticmethod - def _lazy_stream_data(data, fill_value, fill_warn, cf_var): + def _lazy_stream_data(data, cf_var, fill_value, fill_warn=True): if hasattr(data, "shape") and data.shape == (1,) + cf_var.shape: # (Don't do this check for string data). # Reduce dimensionality where the data array has an extra dimension @@ -2454,64 +2457,79 @@ def _lazy_stream_data(data, fill_value, fill_warn, cf_var): # contains just 1 row, so the cf_var is 1D. data = data.squeeze(axis=0) - if is_lazy_data(data): + if hasattr(cf_var, "_in_memory_data"): + # The variable is not an actual netCDF4 file variable, but an emulating + # object with an attached data array (either numpy or dask), which should be + # copied immediately to the target. This is used as a hook to translate + # data to/from netcdf data container objects in other packages, such as + # xarray. + # See https://github.com/SciTools/iris/issues/4994 "Xarray bridge". + # N.B. also, in this case there is no need for fill-value checking as the + # data is not being translated to an in-file representation. + cf_var._in_memory_data = data + else: + if is_lazy_data(data): - def store(data, cf_var, fill_value): - # Store lazy data and check whether it is masked and contains - # the fill value - target = _FillValueMaskCheckAndStoreTarget(cf_var, fill_value) - da.store([data], [target]) - return target.is_masked, target.contains_value + def store(data, cf_var, fill_value): + # Store lazy data and check whether it is masked and contains + # the fill value + target = _FillValueMaskCheckAndStoreTarget( + cf_var, fill_value + ) + da.store([data], [target]) + return target.is_masked, target.contains_value - else: + else: - def store(data, cf_var, fill_value): - cf_var[:] = data - is_masked = np.ma.is_masked(data) - contains_value = fill_value is not None and fill_value in data - return is_masked, contains_value + def store(data, cf_var, fill_value): + cf_var[:] = data + is_masked = np.ma.is_masked(data) + contains_value = ( + fill_value is not None and fill_value in data + ) + return is_masked, contains_value - dtype = cf_var.dtype + dtype = cf_var.dtype - # fill_warn allows us to skip warning if packing attributes have been - # specified. It would require much more complex operations to work out - # what the values and fill_value _would_ be in such a case. - if fill_warn: - if fill_value is not None: - fill_value_to_check = fill_value + # fill_warn allows us to skip warning if packing attributes have been + # specified. It would require much more complex operations to work out + # what the values and fill_value _would_ be in such a case. + if fill_warn: + if fill_value is not None: + fill_value_to_check = fill_value + else: + fill_value_to_check = _thread_safe_nc.default_fillvals[ + dtype.str[1:] + ] else: - fill_value_to_check = _thread_safe_nc.default_fillvals[ - dtype.str[1:] - ] - else: - fill_value_to_check = None + fill_value_to_check = None - # Store the data and check if it is masked and contains the fill value. - is_masked, contains_fill_value = store( - data, cf_var, fill_value_to_check - ) + # Store the data and check if it is masked and contains the fill value. + is_masked, contains_fill_value = store( + data, cf_var, fill_value_to_check + ) - if dtype.itemsize == 1 and fill_value is None: - if is_masked: + if dtype.itemsize == 1 and fill_value is None: + if is_masked: + msg = ( + "CF var '{}' contains byte data with masked points, but " + "no fill_value keyword was given. As saved, these " + "points will read back as valid values. To save as " + "masked byte data, `_FillValue` needs to be explicitly " + "set. For Cube data this can be done via the 'fill_value' " + "keyword during saving, otherwise use ncedit/equivalent." + ) + warnings.warn(msg.format(cf_var.name)) + elif contains_fill_value: msg = ( - "CF var '{}' contains byte data with masked points, but " - "no fill_value keyword was given. As saved, these " - "points will read back as valid values. To save as " - "masked byte data, `_FillValue` needs to be explicitly " - "set. For Cube data this can be done via the 'fill_value' " + "CF var '{}' contains unmasked data points equal to the " + "fill-value, {}. As saved, these points will read back " + "as missing data. To save these as normal values, " + "`_FillValue` needs to be set to not equal any valid data " + "points. For Cube data this can be done via the 'fill_value' " "keyword during saving, otherwise use ncedit/equivalent." ) - warnings.warn(msg.format(cf_var.name)) - elif contains_fill_value: - msg = ( - "CF var '{}' contains unmasked data points equal to the " - "fill-value, {}. As saved, these points will read back " - "as missing data. To save these as normal values, " - "`_FillValue` needs to be set to not equal any valid data " - "points. For Cube data this can be done via the 'fill_value' " - "keyword during saving, otherwise use ncedit/equivalent." - ) - warnings.warn(msg.format(cf_var.name, fill_value)) + warnings.warn(msg.format(cf_var.name, fill_value)) def save( diff --git a/lib/iris/io/__init__.py b/lib/iris/io/__init__.py index 7dd08c723c..d2b178dd6c 100644 --- a/lib/iris/io/__init__.py +++ b/lib/iris/io/__init__.py @@ -94,6 +94,8 @@ def decode_uri(uri, default="file"): In addition to well-formed URIs, it also supports bare file paths as strings or :class:`pathlib.PurePath`. Both Windows and UNIX style paths are accepted. + It also supports 'bare objects', i.e. anything which is not a string. + These are identified with a scheme of 'data', and returned unchanged. .. testsetup:: @@ -119,20 +121,31 @@ def decode_uri(uri, default="file"): >>> print(decode_uri('dataZoo/...')) ('file', 'dataZoo/...') + >>> print(decode_uri({})) + ('data', {}) + """ if isinstance(uri, pathlib.PurePath): uri = str(uri) - # make sure scheme has at least 2 letters to avoid windows drives - # put - last in the brackets so it refers to the character, not a range - # reference on valid schemes: http://tools.ietf.org/html/std66#section-3.1 - match = re.match(r"^([a-zA-Z][a-zA-Z0-9+.-]+):(.+)", uri) - if match: - scheme = match.group(1) - part = match.group(2) + + if isinstance(uri, str): + # make sure scheme has at least 2 letters to avoid windows drives + # put - last in the brackets so it refers to the character, not a range + # reference on valid schemes: http://tools.ietf.org/html/std66#section-3.1 + match = re.match(r"^([a-zA-Z][a-zA-Z0-9+.-]+):(.+)", uri) + if match: + scheme = match.group(1) + part = match.group(2) + else: + # Catch bare UNIX and Windows paths + scheme = default + part = uri else: - # Catch bare UNIX and Windows paths - scheme = default + # We can pass things other than strings, like open files. + # These are simply identified as 'data objects'. + scheme = "data" part = uri + return scheme, part @@ -255,6 +268,25 @@ def load_http(urls, callback): yield cube +def load_data_objects(urls, callback): + """ + Takes a list of data-source objects and a callback function, and returns a + generator of Cubes. + The 'objects' take the place of 'uris' in the load calls. + The appropriate types of the data-source objects are expected to be + recognised by the handlers : This is done in the usual way by passing the + context to the format picker to get a handler for each. + + .. note:: + + Typically, this function should not be called directly; instead, the + intended interface for loading is :func:`iris.load`. + + """ + # NOTE: this operation is currently *identical* to the http one. + yield from load_http(urls, callback) + + def _dot_save(cube, target): # A simple wrapper for `iris.fileformats.dot.save` which allows the # saver to be registered without triggering the import of diff --git a/lib/iris/io/format_picker.py b/lib/iris/io/format_picker.py index a8e333c566..9def0ada98 100644 --- a/lib/iris/io/format_picker.py +++ b/lib/iris/io/format_picker.py @@ -331,3 +331,22 @@ def get_element(self, basename, file_handle): from iris.io import decode_uri return decode_uri(basename)[0] + + +class DataSourceObjectProtocol(FileElement): + """ + A :class:`FileElement` that simply returns the URI entry itself. + + This enables a arbitrary non-string data object to be passed, subject to + subsequent checks on the object itself (specified in the handler). + + """ + + def __init__(self): + super().__init__(requires_fh=False) + + def get_element(self, basename, file_handle): + # In this context, there should *not* be a file opened by the handler. + # Just return 'basename', which in this case is not a name, or even a + # string, but a passed 'data object'. + return basename diff --git a/lib/iris/tests/integration/netcdf/test_general.py b/lib/iris/tests/integration/netcdf/test_general.py index 63b977674d..cbb30d13e4 100644 --- a/lib/iris/tests/integration/netcdf/test_general.py +++ b/lib/iris/tests/integration/netcdf/test_general.py @@ -25,6 +25,7 @@ from iris.cube import Cube, CubeList import iris.exceptions from iris.fileformats.netcdf import Saver, UnknownCellMethodWarning +import iris.fileformats.netcdf._thread_safe_nc as threadsafe_nc from iris.tests.stock.netcdf import ncgen_from_cdl @@ -356,5 +357,74 @@ def test_lat_not_loaded(self): _ = cube.coord("lat") +@tests.skip_data +class TestDatasetLoad(tests.IrisTest): + def test_basic_load(self): + # test loading from an open Dataset, in place of a filepath spec. + filepath = tests.get_data_path( + ["NetCDF", "global", "xyz_t", "GEMS_CO2_Apr2006.nc"] + ) + phenom_id = "Carbon Dioxide" + expected = iris.load_cube(filepath, phenom_id) + ds = None + try: + ds = threadsafe_nc.DatasetWrapper(filepath) + result = iris.load_cube(ds, phenom_id) + finally: + if ds is not None: + ds.close() + + self.assertEqual(expected, result) + + +@tests.skip_data +class TestDatasetSave(tests.IrisTest): + @classmethod + def setUpClass(cls): + # Create a temp directory for transient test files. + cls.temp_dir = tempfile.mkdtemp() + + @classmethod + def tearDownClass(cls): + # Destroy the temp directory. + shutil.rmtree(cls.temp_dir) + + def test_basic_save(self): + # test saving to a Dataset, in place of a filepath spec. + + # load some test data (--> 2 cubes) + filepath = tests.get_data_path( + ["NetCDF", "global", "xyz_t", "GEMS_CO2_Apr2006.nc"] + ) + testdata = iris.load(filepath) + + # Give the cubes a definite order, since this is not stable ! + testdata = sorted(testdata, key=lambda cube: cube.name()) + + # Save to netcdf file in the usual way. + filepath_direct = f"{self.temp_dir}/tmp_direct.nc" + iris.save(testdata, filepath_direct) + # Check against test-specific CDL result file. + self.assertCDL(filepath_direct) + + # Save indirectly via netcdf dataset. + filepath_indirect = f"{self.temp_dir}/tmp_indirect.nc" + nc_dataset = threadsafe_nc.DatasetWrapper(filepath_indirect, "w") + iris.save(testdata, nc_dataset, saver="nc") + # Do some very basic sanity checks on the Dataset object. + self.assertEqual( + ["time", "levelist", "latitude", "longitude"], + list(nc_dataset.dimensions), + ) + self.assertEqual( + ["co2", "time", "levelist", "latitude", "longitude", "lnsp"], + list(nc_dataset.variables), + ) + # Save to file. + nc_dataset.close() + # Check the saved file against the same CDL as the 'normal' save. + self.assertCDL(filepath_indirect) + + if __name__ == "__main__": tests.main() diff --git a/lib/iris/tests/results/file_load/known_loaders.txt b/lib/iris/tests/results/file_load/known_loaders.txt index 9b0a074574..98ac3e4a07 100644 --- a/lib/iris/tests/results/file_load/known_loaders.txt +++ b/lib/iris/tests/results/file_load/known_loaders.txt @@ -4,6 +4,7 @@ * NetCDF 64 bit offset format (priority 5) * NetCDF_v4 (priority 5) * UM Post Processing file (PP) (priority 5) + * NetCDF dataset (priority 4) * UM Fieldsfile (FF) post v5.2 (priority 4) * ABF (priority 3) * ABL (priority 3) diff --git a/lib/iris/tests/results/integration/netcdf/general/TestDatasetSave/basic_save.cdl b/lib/iris/tests/results/integration/netcdf/general/TestDatasetSave/basic_save.cdl new file mode 100644 index 0000000000..133c886d87 --- /dev/null +++ b/lib/iris/tests/results/integration/netcdf/general/TestDatasetSave/basic_save.cdl @@ -0,0 +1,34 @@ +dimensions: + latitude = 181 ; + levelist = 60 ; + longitude = 360 ; + time = 1 ; +variables: + double co2(time, levelist, latitude, longitude) ; + co2:long_name = "Carbon Dioxide" ; + co2:units = "kg kg**-1" ; + int time(time) ; + time:axis = "T" ; + time:units = "hours since 1900-01-01 00:00:0.0" ; + time:standard_name = "time" ; + time:long_name = "time" ; + time:calendar = "standard" ; + int levelist(levelist) ; + levelist:long_name = "model_level_number" ; + float latitude(latitude) ; + latitude:axis = "Y" ; + latitude:units = "degrees_north" ; + latitude:standard_name = "latitude" ; + latitude:long_name = "latitude" ; + float longitude(longitude) ; + longitude:axis = "X" ; + longitude:units = "degrees_east" ; + longitude:standard_name = "longitude" ; + longitude:long_name = "longitude" ; + double lnsp(time, levelist, latitude, longitude) ; + lnsp:long_name = "Logarithm of surface pressure" ; + +// global attributes: + :history = "2009-08-25 13:46:31 GMT by mars2netcdf-0.92" ; + :Conventions = "CF-1.7" ; +} diff --git a/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_ancil_var.py b/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_ancil_var.py index b057a41a3e..922015e885 100644 --- a/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_ancil_var.py +++ b/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_ancil_var.py @@ -15,6 +15,7 @@ from iris.exceptions import CannotAddError from iris.fileformats._nc_load_rules.helpers import build_ancil_var +import iris.fileformats.netcdf._thread_safe_nc as threadsafe_nc @pytest.fixture @@ -31,6 +32,7 @@ def mock_engine(): def mock_cf_av_var(monkeypatch): data = np.arange(6) output = mock.Mock( + spec=threadsafe_nc.VariableWrapper, dimensions=("foo",), scale_factor=1, add_offset=0, diff --git a/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_auxiliary_coordinate.py b/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_auxiliary_coordinate.py index 13622b72e2..1e3ad0181a 100644 --- a/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_auxiliary_coordinate.py +++ b/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_auxiliary_coordinate.py @@ -22,6 +22,7 @@ from iris.exceptions import CannotAddError from iris.fileformats._nc_load_rules.helpers import build_auxiliary_coordinate from iris.fileformats.cf import CFVariable +import iris.fileformats.netcdf._thread_safe_nc as threadsafe_nc class TestBoundsVertexDim(tests.IrisTest): @@ -227,6 +228,7 @@ def setUp(self): points = np.arange(6) self.cf_coord_var = mock.Mock( + spec=threadsafe_nc.VariableWrapper, dimensions=("foo",), scale_factor=1, add_offset=0, @@ -245,6 +247,7 @@ def setUp(self): bounds = np.arange(12).reshape(6, 2) self.cf_bounds_var = mock.Mock( + spec=threadsafe_nc.VariableWrapper, dimensions=("x", "nv"), scale_factor=1, add_offset=0, diff --git a/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_cell_measure.py b/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_cell_measure.py index efbb0649c9..85648f9810 100644 --- a/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_cell_measure.py +++ b/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_cell_measure.py @@ -15,6 +15,7 @@ from iris.exceptions import CannotAddError from iris.fileformats._nc_load_rules.helpers import build_cell_measures +import iris.fileformats.netcdf._thread_safe_nc as threadsafe_nc @pytest.fixture @@ -31,6 +32,7 @@ def mock_engine(): def mock_cf_cm_var(monkeypatch): data = np.arange(6) output = mock.Mock( + spec=threadsafe_nc.VariableWrapper, dimensions=("foo",), scale_factor=1, add_offset=0, diff --git a/lib/iris/tests/unit/fileformats/netcdf/loader/test__get_cf_var_data.py b/lib/iris/tests/unit/fileformats/netcdf/loader/test__get_cf_var_data.py index 054c8e2db1..3227bce6e2 100644 --- a/lib/iris/tests/unit/fileformats/netcdf/loader/test__get_cf_var_data.py +++ b/lib/iris/tests/unit/fileformats/netcdf/loader/test__get_cf_var_data.py @@ -25,7 +25,7 @@ def setUp(self): self.shape = (300000, 240, 200) self.expected_chunks = _optimum_chunksize(self.shape, self.shape) - def _make(self, chunksizes): + def _make(self, chunksizes, **extra_properties): cf_data = mock.Mock(_FillValue=None) cf_data.chunking = mock.MagicMock(return_value=chunksizes) cf_var = mock.MagicMock( @@ -34,6 +34,7 @@ def _make(self, chunksizes): cf_data=cf_data, cf_name="DUMMY_VAR", shape=self.shape, + **extra_properties, ) return cf_var @@ -68,6 +69,14 @@ def test_cf_data_contiguous(self): lazy_data_chunks = [c[0] for c in lazy_data.chunks] self.assertArrayEqual(lazy_data_chunks, self.expected_chunks) + def test_cf_data_emulation(self): + # Check that a variable emulation object passes its real data directly. + emulated_data = mock.Mock() + # Make a cf_var with a special extra '_in_memory_data' property. + cf_var = self._make(chunksizes=None, _in_memory_data=emulated_data) + result = _get_cf_var_data(cf_var, self.filename) + self.assertIs(emulated_data, result) + if __name__ == "__main__": tests.main() diff --git a/lib/iris/tests/unit/fileformats/netcdf/saver/test__lazy_stream_data.py b/lib/iris/tests/unit/fileformats/netcdf/saver/test__lazy_stream_data.py new file mode 100644 index 0000000000..f118d1e489 --- /dev/null +++ b/lib/iris/tests/unit/fileformats/netcdf/saver/test__lazy_stream_data.py @@ -0,0 +1,77 @@ +# Copyright Iris contributors +# +# This file is part of Iris and is released under the LGPL license. +# See COPYING and COPYING.LESSER in the root of the repository for full +# licensing details. +""" +Tests for :meth:`iris.fileformats.netcdf.saver.Saver._lazy_stream_data`. +""" + +from unittest import mock + +import dask.array as da +import numpy as np +import pytest + +import iris.fileformats.netcdf._thread_safe_nc as threadsafe_nc +from iris.fileformats.netcdf.saver import Saver + + +class Test_streaming: + @pytest.fixture(autouse=True) + def setUp(self, tmp_path_factory): + self.saver = Saver( + filename=tmp_path_factory.mktemp("stream") / "tmp.nc", + netcdf_format="NETCDF4", + ) + self.fill_value = 2.0 + self.real_data = np.ma.masked_array([1.0, 2.0, 3.0], mask=[0, 1, 0]) + self.lazy_data = da.from_array(self.real_data) + self.cf_var = mock.MagicMock( + spec=threadsafe_nc.VariableWrapper, shape=(3,), dtype=np.float32 + ) + self.emulated_data = mock.Mock(shape=(3,)) + self.emulated_var = mock.Mock( + spec=threadsafe_nc.VariableWrapper, + shape=(3,), + dtype=np.float32, + _in_memory_data=self.emulated_data, + ) + + def test_real_data(self): + # When source is real data, it should be directly assigned to cf_var. + self.saver._lazy_stream_data( + data=self.real_data, cf_var=self.cf_var, fill_value=self.fill_value + ) + assert self.cf_var.__setitem__.call_count == 1 + call_args = self.cf_var.__setitem__.call_args[0] + assert call_args[0] == slice(None) + assert ( + call_args[1] is self.real_data + ) # N.B. equality here is *not* good enough ! + + def test_lazy_data(self): + # When source is lazy data, it should be passed to da.store. + with mock.patch( + "iris.fileformats.netcdf.saver.da.store" + ) as mock_store: + self.saver._lazy_stream_data( + data=self.lazy_data, + cf_var=self.cf_var, + fill_value=self.fill_value, + ) + assert mock_store.call_count == 1 + (arg1,) = mock_store.call_args[0][0] + assert arg1 is self.lazy_data + + def test_emulated_data(self): + # When the var is an "emulated" var, data should be directly assigned to its + # '_in_memory_data' property. + assert self.emulated_var._in_memory_data is self.emulated_data + mock_newdata = mock.Mock(shape=(3,)) + self.saver._lazy_stream_data( + data=mock_newdata, + cf_var=self.emulated_var, + fill_value=self.fill_value, + ) + assert self.emulated_var._in_memory_data is mock_newdata