diff --git a/docs/src/whatsnew/latest.rst b/docs/src/whatsnew/latest.rst index fd24a697cf..74d25ca390 100644 --- a/docs/src/whatsnew/latest.rst +++ b/docs/src/whatsnew/latest.rst @@ -51,8 +51,7 @@ This document explains the changes made to Iris for this release 🚀 Performance Enhancements =========================== -#. N/A - +#. `@rcomer`_ made :meth:`~iris.cube.Cube.aggregated_by` faster. (:pull:`4970`) 🔥 Deprecations =============== diff --git a/lib/iris/cube.py b/lib/iris/cube.py index 7c6fd55c10..4c52303b2f 100644 --- a/lib/iris/cube.py +++ b/lib/iris/cube.py @@ -4178,98 +4178,65 @@ def aggregated_by( data_shape = list(self.shape + aggregator.aggregate_shape(**kwargs)) data_shape[dimension_to_groupby] = len(groupby) - # Aggregate the group-by data. + # Choose appropriate data and functions for data aggregation. if aggregator.lazy_func is not None and self.has_lazy_data(): - front_slice = (slice(None, None),) * dimension_to_groupby - back_slice = (slice(None, None),) * ( - len(data_shape) - dimension_to_groupby - 1 - ) + stack = da.stack + input_data = self.lazy_data() + agg_method = aggregator.lazy_aggregate + else: + input_data = self.data + # Note numpy.stack does not preserve masks. + stack = ma.stack if ma.isMaskedArray(input_data) else np.stack + agg_method = aggregator.aggregate + + # Create data and weights slices. + front_slice = (slice(None),) * dimension_to_groupby + back_slice = (slice(None),) * ( + len(data_shape) - dimension_to_groupby - 1 + ) - # Create cube and weights slices - groupby_subcubes = map( - lambda groupby_slice: self[ + groupby_subarrs = map( + lambda groupby_slice: iris.util._slice_data_with_keys( + input_data, front_slice + (groupby_slice,) + back_slice + )[1], + groupby.group(), + ) + + if weights is not None: + groupby_subweights = map( + lambda groupby_slice: weights[ front_slice + (groupby_slice,) + back_slice - ].lazy_data(), + ], groupby.group(), ) - if weights is not None: - groupby_subweights = map( - lambda groupby_slice: weights[ - front_slice + (groupby_slice,) + back_slice - ], - groupby.group(), - ) - else: - groupby_subweights = (None for _ in range(len(groupby))) + else: + groupby_subweights = (None for _ in range(len(groupby))) - agg = iris.analysis.create_weighted_aggregator_fn( - aggregator.lazy_aggregate, axis=dimension_to_groupby, **kwargs + # Aggregate data slices. + agg = iris.analysis.create_weighted_aggregator_fn( + agg_method, axis=dimension_to_groupby, **kwargs + ) + result = list(map(agg, groupby_subarrs, groupby_subweights)) + + # If weights are returned, "result" is a list of tuples (each tuple + # contains two elements; the first is the aggregated data, the + # second is the aggregated weights). Convert these to two lists + # (one for the aggregated data and one for the aggregated weights) + # before combining the different slices. + if return_weights: + result, weights_result = list(zip(*result)) + aggregateby_weights = stack( + weights_result, axis=dimension_to_groupby ) - result = list(map(agg, groupby_subcubes, groupby_subweights)) - - # If weights are returned, "result" is a list of tuples (each tuple - # contains two elements; the first is the aggregated data, the - # second is the aggregated weights). Convert these to two lists - # (one for the aggregated data and one for the aggregated weights) - # before combining the different slices. - if return_weights: - result, weights_result = list(zip(*result)) - aggregateby_weights = da.stack( - weights_result, axis=dimension_to_groupby - ) - else: - aggregateby_weights = None - aggregateby_data = da.stack(result, axis=dimension_to_groupby) else: - cube_slice = [slice(None, None)] * len(data_shape) - for i, groupby_slice in enumerate(groupby.group()): - # Slice the cube with the group-by slice to create a group-by - # sub-cube. - cube_slice[dimension_to_groupby] = groupby_slice - groupby_sub_cube = self[tuple(cube_slice)] - - # Slice the weights - if weights is not None: - groupby_sub_weights = weights[tuple(cube_slice)] - kwargs["weights"] = groupby_sub_weights - - # Perform the aggregation over the group-by sub-cube and - # repatriate the aggregated data into the aggregate-by cube - # data. If weights are also returned, handle them separately. - result = aggregator.aggregate( - groupby_sub_cube.data, axis=dimension_to_groupby, **kwargs - ) - if return_weights: - weights_result = result[1] - result = result[0] - else: - weights_result = None - - # Determine aggregation result data type for the aggregate-by - # cube data on first pass. - if i == 0: - if ma.isMaskedArray(self.data): - aggregateby_data = ma.zeros( - data_shape, dtype=result.dtype - ) - else: - aggregateby_data = np.zeros( - data_shape, dtype=result.dtype - ) - if weights_result is not None: - aggregateby_weights = np.zeros( - data_shape, dtype=weights_result.dtype - ) - else: - aggregateby_weights = None - cube_slice[dimension_to_groupby] = i - aggregateby_data[tuple(cube_slice)] = result - if weights_result is not None: - aggregateby_weights[tuple(cube_slice)] = weights_result - - # Restore original weights. - if weights is not None: - kwargs["weights"] = weights + aggregateby_weights = None + + aggregateby_data = stack(result, axis=dimension_to_groupby) + # Ensure plain ndarray is output if plain ndarray was input. + if ma.isMaskedArray(aggregateby_data) and not ma.isMaskedArray( + input_data + ): + aggregateby_data = ma.getdata(aggregateby_data) # Add the aggregation meta data to the aggregate-by cube. aggregator.update_metadata( diff --git a/lib/iris/tests/unit/cube/test_Cube__aggregated_by.py b/lib/iris/tests/unit/cube/test_Cube__aggregated_by.py index 9e60631c33..c591e45f63 100644 --- a/lib/iris/tests/unit/cube/test_Cube__aggregated_by.py +++ b/lib/iris/tests/unit/cube/test_Cube__aggregated_by.py @@ -67,9 +67,7 @@ def setUp(self): self.mock_agg = mock.Mock(spec=Aggregator) self.mock_agg.cell_method = [] - self.mock_agg.aggregate = mock.Mock( - return_value=mock.Mock(dtype="object") - ) + self.mock_agg.aggregate = mock.Mock(return_value=np.arange(4)) self.mock_agg.aggregate_shape = mock.Mock(return_value=()) self.mock_agg.lazy_func = None self.mock_agg.post_process = mock.Mock(side_effect=lambda x, y, z: x) @@ -79,8 +77,8 @@ def setUp(self): def mock_weighted_aggregate(*_, **kwargs): if kwargs.get("returned", False): - return (mock.Mock(dtype="object"), mock.Mock(dtype="object")) - return mock.Mock(dtype="object") + return (np.arange(11), np.ones(11)) + return np.arange(4) self.mock_weighted_agg.aggregate = mock.Mock( side_effect=mock_weighted_aggregate