Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions docs/src/whatsnew/latest.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ This document explains the changes made to Iris for this release
🚀 Performance Enhancements
===========================

#. N/A

#. `@rcomer`_ made :meth:`~iris.cube.Cube.aggregated_by` faster. (:pull:`4970`)

🔥 Deprecations
===============
Expand Down
135 changes: 51 additions & 84 deletions lib/iris/cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -4178,98 +4178,65 @@ def aggregated_by(
data_shape = list(self.shape + aggregator.aggregate_shape(**kwargs))
data_shape[dimension_to_groupby] = len(groupby)

# Aggregate the group-by data.
# Choose appropriate data and functions for data aggregation.
if aggregator.lazy_func is not None and self.has_lazy_data():
front_slice = (slice(None, None),) * dimension_to_groupby
back_slice = (slice(None, None),) * (
len(data_shape) - dimension_to_groupby - 1
)
stack = da.stack
input_data = self.lazy_data()
agg_method = aggregator.lazy_aggregate
else:
input_data = self.data
# Note numpy.stack does not preserve masks.
stack = ma.stack if ma.isMaskedArray(input_data) else np.stack
agg_method = aggregator.aggregate

# Create data and weights slices.
front_slice = (slice(None),) * dimension_to_groupby
back_slice = (slice(None),) * (
len(data_shape) - dimension_to_groupby - 1
)

# Create cube and weights slices
groupby_subcubes = map(
lambda groupby_slice: self[
groupby_subarrs = map(
lambda groupby_slice: iris.util._slice_data_with_keys(
input_data, front_slice + (groupby_slice,) + back_slice
)[1],
groupby.group(),
)

if weights is not None:
groupby_subweights = map(
lambda groupby_slice: weights[
front_slice + (groupby_slice,) + back_slice
].lazy_data(),
],
groupby.group(),
)
if weights is not None:
groupby_subweights = map(
lambda groupby_slice: weights[
front_slice + (groupby_slice,) + back_slice
],
groupby.group(),
)
else:
groupby_subweights = (None for _ in range(len(groupby)))
else:
groupby_subweights = (None for _ in range(len(groupby)))

agg = iris.analysis.create_weighted_aggregator_fn(
aggregator.lazy_aggregate, axis=dimension_to_groupby, **kwargs
# Aggregate data slices.
agg = iris.analysis.create_weighted_aggregator_fn(
agg_method, axis=dimension_to_groupby, **kwargs
)
result = list(map(agg, groupby_subarrs, groupby_subweights))

# If weights are returned, "result" is a list of tuples (each tuple
# contains two elements; the first is the aggregated data, the
# second is the aggregated weights). Convert these to two lists
# (one for the aggregated data and one for the aggregated weights)
# before combining the different slices.
if return_weights:
result, weights_result = list(zip(*result))
aggregateby_weights = stack(
weights_result, axis=dimension_to_groupby
)
result = list(map(agg, groupby_subcubes, groupby_subweights))

# If weights are returned, "result" is a list of tuples (each tuple
# contains two elements; the first is the aggregated data, the
# second is the aggregated weights). Convert these to two lists
# (one for the aggregated data and one for the aggregated weights)
# before combining the different slices.
if return_weights:
result, weights_result = list(zip(*result))
aggregateby_weights = da.stack(
weights_result, axis=dimension_to_groupby
)
else:
aggregateby_weights = None
aggregateby_data = da.stack(result, axis=dimension_to_groupby)
else:
cube_slice = [slice(None, None)] * len(data_shape)
for i, groupby_slice in enumerate(groupby.group()):
# Slice the cube with the group-by slice to create a group-by
# sub-cube.
cube_slice[dimension_to_groupby] = groupby_slice
groupby_sub_cube = self[tuple(cube_slice)]

# Slice the weights
if weights is not None:
groupby_sub_weights = weights[tuple(cube_slice)]
kwargs["weights"] = groupby_sub_weights

# Perform the aggregation over the group-by sub-cube and
# repatriate the aggregated data into the aggregate-by cube
# data. If weights are also returned, handle them separately.
result = aggregator.aggregate(
groupby_sub_cube.data, axis=dimension_to_groupby, **kwargs
)
if return_weights:
weights_result = result[1]
result = result[0]
else:
weights_result = None

# Determine aggregation result data type for the aggregate-by
# cube data on first pass.
if i == 0:
if ma.isMaskedArray(self.data):
aggregateby_data = ma.zeros(
data_shape, dtype=result.dtype
)
else:
aggregateby_data = np.zeros(
data_shape, dtype=result.dtype
)
if weights_result is not None:
aggregateby_weights = np.zeros(
data_shape, dtype=weights_result.dtype
)
else:
aggregateby_weights = None
cube_slice[dimension_to_groupby] = i
aggregateby_data[tuple(cube_slice)] = result
if weights_result is not None:
aggregateby_weights[tuple(cube_slice)] = weights_result

# Restore original weights.
if weights is not None:
kwargs["weights"] = weights
aggregateby_weights = None

aggregateby_data = stack(result, axis=dimension_to_groupby)
# Ensure plain ndarray is output if plain ndarray was input.
if ma.isMaskedArray(aggregateby_data) and not ma.isMaskedArray(
input_data
):
aggregateby_data = ma.getdata(aggregateby_data)

# Add the aggregation meta data to the aggregate-by cube.
aggregator.update_metadata(
Expand Down
8 changes: 3 additions & 5 deletions lib/iris/tests/unit/cube/test_Cube__aggregated_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ def setUp(self):

self.mock_agg = mock.Mock(spec=Aggregator)
self.mock_agg.cell_method = []
self.mock_agg.aggregate = mock.Mock(
return_value=mock.Mock(dtype="object")
)
self.mock_agg.aggregate = mock.Mock(return_value=np.arange(4))
self.mock_agg.aggregate_shape = mock.Mock(return_value=())
self.mock_agg.lazy_func = None
self.mock_agg.post_process = mock.Mock(side_effect=lambda x, y, z: x)
Expand All @@ -79,8 +77,8 @@ def setUp(self):

def mock_weighted_aggregate(*_, **kwargs):
if kwargs.get("returned", False):
return (mock.Mock(dtype="object"), mock.Mock(dtype="object"))
return mock.Mock(dtype="object")
return (np.arange(11), np.ones(11))
return np.arange(4)

self.mock_weighted_agg.aggregate = mock.Mock(
side_effect=mock_weighted_aggregate
Expand Down