Skip to content
55 changes: 37 additions & 18 deletions lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,9 @@ def _optimum_chunksize(
)


def as_lazy_data(data, chunks=None, asarray=False, dims_fixed=None):
def as_lazy_data(
data, chunks=None, asarray=False, dims_fixed=None, dask_chunking=False
):
"""
Convert the input array `data` to a :class:`dask.array.Array`.

Expand All @@ -249,6 +251,11 @@ def as_lazy_data(data, chunks=None, asarray=False, dims_fixed=None):
'True' values indicate a dimension which can not be changed, i.e. the
result for that index must equal the value in 'chunks' or data.shape.

* dask_chunking (bool):
If True, Iris chunking optimisation will be bypassed, and dask's default
chunking will be used instead. Including a value for chunks while dask_chunking
is set to True will result in a failure.

Returns:
The input array converted to a :class:`dask.array.Array`.

Expand All @@ -260,26 +267,38 @@ def as_lazy_data(data, chunks=None, asarray=False, dims_fixed=None):
but reduced by a factor if that exceeds the dask default chunksize.

"""
if chunks is None:
# No existing chunks : Make a chunk the shape of the entire input array
# (but we will subdivide it if too big).
chunks = list(data.shape)

# Adjust chunk size for better dask performance,
# NOTE: but only if no shape dimension is zero, so that we can handle the
# PPDataProxy of "raw" landsea-masked fields, which have a shape of (0, 0).
if all(elem > 0 for elem in data.shape):
# Expand or reduce the basic chunk shape to an optimum size.
chunks = _optimum_chunksize(
chunks, shape=data.shape, dtype=data.dtype, dims_fixed=dims_fixed
)

if dask_chunking:
if chunks is not None:
raise ValueError(
f"Dask chunking chosen, but chunks already assigned value {chunks}"
)
lazy_params = {"asarray": asarray, "meta": np.ndarray}
else:
if chunks is None:
# No existing chunks : Make a chunk the shape of the entire input array
# (but we will subdivide it if too big).
chunks = list(data.shape)

# Adjust chunk size for better dask performance,
# NOTE: but only if no shape dimension is zero, so that we can handle the
# PPDataProxy of "raw" landsea-masked fields, which have a shape of (0, 0).
if all(elem > 0 for elem in data.shape):
# Expand or reduce the basic chunk shape to an optimum size.
chunks = _optimum_chunksize(
chunks,
shape=data.shape,
dtype=data.dtype,
dims_fixed=dims_fixed,
)
lazy_params = {
"chunks": chunks,
"asarray": asarray,
"meta": np.ndarray,
}
if isinstance(data, ma.core.MaskedConstant):
data = ma.masked_array(data.data, mask=data.mask)
if not is_lazy_data(data):
data = da.from_array(
data, chunks=chunks, asarray=asarray, meta=np.ndarray
)
data = da.from_array(data, **lazy_params)
return data


Expand Down
118 changes: 90 additions & 28 deletions lib/iris/fileformats/netcdf/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from collections.abc import Iterable, Mapping
from contextlib import contextmanager
from copy import deepcopy
from enum import Enum, auto
import threading
from typing import Union
import warnings
Expand Down Expand Up @@ -236,34 +237,51 @@ def _get_cf_var_data(cf_var, filename):
)
# Get the chunking specified for the variable : this is either a shape, or
# maybe the string "contiguous".
chunks = cf_var.cf_data.chunking()
# In the "contiguous" case, pass chunks=None to 'as_lazy_data'.
if chunks == "contiguous":
# Equivalent to chunks=None, but value required by chunking control
chunks = list(cf_var.shape)

# Modify the chunking in the context of an active chunking control.
# N.B. settings specific to this named var override global ('*') ones.
dim_chunks = CHUNK_CONTROL.var_dim_chunksizes.get(
cf_var.cf_name
) or CHUNK_CONTROL.var_dim_chunksizes.get("*")
if not dim_chunks:
dims_fixed = None
if CHUNK_CONTROL.mode is ChunkControl.Modes.AS_DASK:
result = as_lazy_data(proxy, chunks=None, dask_chunking=True)
else:
# Modify the chunks argument, and pass in a list of 'fixed' dims, for
# any of our dims which are controlled.
chunks = cf_var.cf_data.chunking()
if (
chunks is None
and CHUNK_CONTROL.mode is ChunkControl.Modes.FROM_FILE
):
raise KeyError(
f"{cf_var.cf_name} does not contain pre-existing chunk specifications."
f"Instead, you might wish to use CHUNK_CONTROL.set(), or just use default"
f" behaviour outside of a context manager. "
)
# In the "contiguous" case, pass chunks=None to 'as_lazy_data'.
if chunks == "contiguous":
# Equivalent to chunks=None, but value required by chunking control
chunks = list(cf_var.shape)

# Modify the chunking in the context of an active chunking control.
# N.B. settings specific to this named var override global ('*') ones.
dim_chunks = CHUNK_CONTROL.var_dim_chunksizes.get(
cf_var.cf_name
) or CHUNK_CONTROL.var_dim_chunksizes.get("*")
dims = cf_var.cf_data.dimensions
dims_fixed = np.zeros(len(dims), dtype=bool)
for i_dim, dim_name in enumerate(dims):
dim_chunksize = dim_chunks.get(dim_name)
if dim_chunksize:
chunks[i_dim] = dim_chunksize
dims_fixed[i_dim] = True
if dims_fixed is None:
dims_fixed = [dims_fixed]
result = as_lazy_data(
proxy, chunks=chunks, dims_fixed=tuple(dims_fixed)
)
if CHUNK_CONTROL.mode is ChunkControl.Modes.FROM_FILE:
dims_fixed = np.ones(len(dims), dtype=bool)
elif not dim_chunks:
dims_fixed = None
else:
# Modify the chunks argument, and pass in a list of 'fixed' dims, for
# any of our dims which are controlled.
dims_fixed = np.zeros(len(dims), dtype=bool)
for i_dim, dim_name in enumerate(dims):
dim_chunksize = dim_chunks.get(dim_name)
if dim_chunksize:
if dim_chunksize == -1:
chunks[i_dim] = cf_var.shape[i_dim]
else:
chunks[i_dim] = dim_chunksize
dims_fixed[i_dim] = True
if dims_fixed is None:
dims_fixed = [dims_fixed]
result = as_lazy_data(
proxy, chunks=chunks, dims_fixed=tuple(dims_fixed)
)
return result


Expand Down Expand Up @@ -651,6 +669,11 @@ def load_cubes(file_sources, callback=None, constraints=None):


class ChunkControl(threading.local):
class Modes(Enum):
DEFAULT = auto()
FROM_FILE = auto()
AS_DASK = auto()

def __init__(self, var_dim_chunksizes=None):
"""
Provide user control of Dask chunking.
Expand All @@ -673,6 +696,7 @@ def __init__(self, var_dim_chunksizes=None):

"""
self.var_dim_chunksizes = var_dim_chunksizes or {}
self.mode = self.Modes.DEFAULT

@contextmanager
def set(
Expand Down Expand Up @@ -719,7 +743,8 @@ def set(
``dask.config.set({'array.chunk-size': '250MiB'})``.

"""
old_settings = deepcopy(self.var_dim_chunksizes)
old_mode = self.mode
old_var_dim_chunksizes = deepcopy(self.var_dim_chunksizes)
if var_names is None:
var_names = ["*"]
elif isinstance(var_names, str):
Expand Down Expand Up @@ -749,7 +774,44 @@ def set(
dim_chunks[dim_name] = chunksize
yield
finally:
self.var_dim_chunksizes = old_settings
self.var_dim_chunksizes = old_var_dim_chunksizes
self.mode = old_mode

@contextmanager
def from_file(self) -> None:
"""
Ensures the chunks are loaded in from file variables, else will throw an error.

Notes
-----
This function acts as a contextmanager, for use in a 'with' block.
"""
old_mode = self.mode
old_var_dim_chunksizes = deepcopy(self.var_dim_chunksizes)
try:
self.mode = self.Modes.FROM_FILE
yield
finally:
self.mode = old_mode
self.var_dim_chunksizes = old_var_dim_chunksizes

@contextmanager
def as_dask(self) -> None:
"""
Ensures the chunks are decided from dask.

Notes
-----
This function acts as a contextmanager, for use in a 'with' block.
"""
old_mode = self.mode
old_var_dim_chunksizes = deepcopy(self.var_dim_chunksizes)
try:
self.mode = self.Modes.AS_DASK
yield
finally:
self.mode = old_mode
self.var_dim_chunksizes = old_var_dim_chunksizes


# Note: the CHUNK_CONTROL object controls chunk sizing in the
Expand Down