diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index 705f7939c8..a23d47a2e0 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -225,7 +225,9 @@ def _optimum_chunksize( ) -def as_lazy_data(data, chunks=None, asarray=False, dims_fixed=None): +def as_lazy_data( + data, chunks=None, asarray=False, dims_fixed=None, dask_chunking=False +): """ Convert the input array `data` to a :class:`dask.array.Array`. @@ -249,6 +251,11 @@ def as_lazy_data(data, chunks=None, asarray=False, dims_fixed=None): 'True' values indicate a dimension which can not be changed, i.e. the result for that index must equal the value in 'chunks' or data.shape. + * dask_chunking (bool): + If True, Iris chunking optimisation will be bypassed, and dask's default + chunking will be used instead. Including a value for chunks while dask_chunking + is set to True will result in a failure. + Returns: The input array converted to a :class:`dask.array.Array`. @@ -260,26 +267,38 @@ def as_lazy_data(data, chunks=None, asarray=False, dims_fixed=None): but reduced by a factor if that exceeds the dask default chunksize. """ - if chunks is None: - # No existing chunks : Make a chunk the shape of the entire input array - # (but we will subdivide it if too big). - chunks = list(data.shape) - - # Adjust chunk size for better dask performance, - # NOTE: but only if no shape dimension is zero, so that we can handle the - # PPDataProxy of "raw" landsea-masked fields, which have a shape of (0, 0). - if all(elem > 0 for elem in data.shape): - # Expand or reduce the basic chunk shape to an optimum size. - chunks = _optimum_chunksize( - chunks, shape=data.shape, dtype=data.dtype, dims_fixed=dims_fixed - ) - + if dask_chunking: + if chunks is not None: + raise ValueError( + f"Dask chunking chosen, but chunks already assigned value {chunks}" + ) + lazy_params = {"asarray": asarray, "meta": np.ndarray} + else: + if chunks is None: + # No existing chunks : Make a chunk the shape of the entire input array + # (but we will subdivide it if too big). + chunks = list(data.shape) + + # Adjust chunk size for better dask performance, + # NOTE: but only if no shape dimension is zero, so that we can handle the + # PPDataProxy of "raw" landsea-masked fields, which have a shape of (0, 0). + if all(elem > 0 for elem in data.shape): + # Expand or reduce the basic chunk shape to an optimum size. + chunks = _optimum_chunksize( + chunks, + shape=data.shape, + dtype=data.dtype, + dims_fixed=dims_fixed, + ) + lazy_params = { + "chunks": chunks, + "asarray": asarray, + "meta": np.ndarray, + } if isinstance(data, ma.core.MaskedConstant): data = ma.masked_array(data.data, mask=data.mask) if not is_lazy_data(data): - data = da.from_array( - data, chunks=chunks, asarray=asarray, meta=np.ndarray - ) + data = da.from_array(data, **lazy_params) return data diff --git a/lib/iris/fileformats/netcdf/loader.py b/lib/iris/fileformats/netcdf/loader.py index ae535a241d..e4f3a61345 100644 --- a/lib/iris/fileformats/netcdf/loader.py +++ b/lib/iris/fileformats/netcdf/loader.py @@ -16,6 +16,7 @@ from collections.abc import Iterable, Mapping from contextlib import contextmanager from copy import deepcopy +from enum import Enum, auto import threading from typing import Union import warnings @@ -236,34 +237,51 @@ def _get_cf_var_data(cf_var, filename): ) # Get the chunking specified for the variable : this is either a shape, or # maybe the string "contiguous". - chunks = cf_var.cf_data.chunking() - # In the "contiguous" case, pass chunks=None to 'as_lazy_data'. - if chunks == "contiguous": - # Equivalent to chunks=None, but value required by chunking control - chunks = list(cf_var.shape) - - # Modify the chunking in the context of an active chunking control. - # N.B. settings specific to this named var override global ('*') ones. - dim_chunks = CHUNK_CONTROL.var_dim_chunksizes.get( - cf_var.cf_name - ) or CHUNK_CONTROL.var_dim_chunksizes.get("*") - if not dim_chunks: - dims_fixed = None + if CHUNK_CONTROL.mode is ChunkControl.Modes.AS_DASK: + result = as_lazy_data(proxy, chunks=None, dask_chunking=True) else: - # Modify the chunks argument, and pass in a list of 'fixed' dims, for - # any of our dims which are controlled. + chunks = cf_var.cf_data.chunking() + if ( + chunks is None + and CHUNK_CONTROL.mode is ChunkControl.Modes.FROM_FILE + ): + raise KeyError( + f"{cf_var.cf_name} does not contain pre-existing chunk specifications." + f"Instead, you might wish to use CHUNK_CONTROL.set(), or just use default" + f" behaviour outside of a context manager. " + ) + # In the "contiguous" case, pass chunks=None to 'as_lazy_data'. + if chunks == "contiguous": + # Equivalent to chunks=None, but value required by chunking control + chunks = list(cf_var.shape) + + # Modify the chunking in the context of an active chunking control. + # N.B. settings specific to this named var override global ('*') ones. + dim_chunks = CHUNK_CONTROL.var_dim_chunksizes.get( + cf_var.cf_name + ) or CHUNK_CONTROL.var_dim_chunksizes.get("*") dims = cf_var.cf_data.dimensions - dims_fixed = np.zeros(len(dims), dtype=bool) - for i_dim, dim_name in enumerate(dims): - dim_chunksize = dim_chunks.get(dim_name) - if dim_chunksize: - chunks[i_dim] = dim_chunksize - dims_fixed[i_dim] = True - if dims_fixed is None: - dims_fixed = [dims_fixed] - result = as_lazy_data( - proxy, chunks=chunks, dims_fixed=tuple(dims_fixed) - ) + if CHUNK_CONTROL.mode is ChunkControl.Modes.FROM_FILE: + dims_fixed = np.ones(len(dims), dtype=bool) + elif not dim_chunks: + dims_fixed = None + else: + # Modify the chunks argument, and pass in a list of 'fixed' dims, for + # any of our dims which are controlled. + dims_fixed = np.zeros(len(dims), dtype=bool) + for i_dim, dim_name in enumerate(dims): + dim_chunksize = dim_chunks.get(dim_name) + if dim_chunksize: + if dim_chunksize == -1: + chunks[i_dim] = cf_var.shape[i_dim] + else: + chunks[i_dim] = dim_chunksize + dims_fixed[i_dim] = True + if dims_fixed is None: + dims_fixed = [dims_fixed] + result = as_lazy_data( + proxy, chunks=chunks, dims_fixed=tuple(dims_fixed) + ) return result @@ -651,6 +669,11 @@ def load_cubes(file_sources, callback=None, constraints=None): class ChunkControl(threading.local): + class Modes(Enum): + DEFAULT = auto() + FROM_FILE = auto() + AS_DASK = auto() + def __init__(self, var_dim_chunksizes=None): """ Provide user control of Dask chunking. @@ -673,6 +696,7 @@ def __init__(self, var_dim_chunksizes=None): """ self.var_dim_chunksizes = var_dim_chunksizes or {} + self.mode = self.Modes.DEFAULT @contextmanager def set( @@ -719,7 +743,8 @@ def set( ``dask.config.set({'array.chunk-size': '250MiB'})``. """ - old_settings = deepcopy(self.var_dim_chunksizes) + old_mode = self.mode + old_var_dim_chunksizes = deepcopy(self.var_dim_chunksizes) if var_names is None: var_names = ["*"] elif isinstance(var_names, str): @@ -749,7 +774,44 @@ def set( dim_chunks[dim_name] = chunksize yield finally: - self.var_dim_chunksizes = old_settings + self.var_dim_chunksizes = old_var_dim_chunksizes + self.mode = old_mode + + @contextmanager + def from_file(self) -> None: + """ + Ensures the chunks are loaded in from file variables, else will throw an error. + + Notes + ----- + This function acts as a contextmanager, for use in a 'with' block. + """ + old_mode = self.mode + old_var_dim_chunksizes = deepcopy(self.var_dim_chunksizes) + try: + self.mode = self.Modes.FROM_FILE + yield + finally: + self.mode = old_mode + self.var_dim_chunksizes = old_var_dim_chunksizes + + @contextmanager + def as_dask(self) -> None: + """ + Ensures the chunks are decided from dask. + + Notes + ----- + This function acts as a contextmanager, for use in a 'with' block. + """ + old_mode = self.mode + old_var_dim_chunksizes = deepcopy(self.var_dim_chunksizes) + try: + self.mode = self.Modes.AS_DASK + yield + finally: + self.mode = old_mode + self.var_dim_chunksizes = old_var_dim_chunksizes # Note: the CHUNK_CONTROL object controls chunk sizing in the