Skip to content

Commit

Permalink
Make interface for BatteryPool metrics consistent with power methods (#…
Browse files Browse the repository at this point in the history
…381)

The `soc`, `capacity`, `power_bounds` methods of the `BatteryPool`
were directly returning receivers.  They've now been converted to
properties, from which `new_receiver()` has to be called, to get a
receiver.

New:

``` python
soc_recv = battery_pool.soc.new_receiver()
```

Old:

``` python
soc_recv = battery_pool.soc()
```
  • Loading branch information
shsms authored May 25, 2023
2 parents 16a677b + 7e77a18 commit 23ff30c
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 42 deletions.
8 changes: 8 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ This release drops support for Python versions older than 3.11.

* Now `frequenz.sdk.timeseries.Sample` uses a more sensible comparison. Before this release `Sample`s were compared only based on the `timestamp`. This was due to a limitation in Python versions earlier than 3.10. Now that the minimum supported version is 3.11 this hack is not needed anymore and `Sample`s are compared using both `timestamp` and `value` as most people probably expects.

* `BatteryPool` metric streaming interfaces have changed for `soc`, `capacity` and `power_bounds`:

```python
soc_rx = battery_pool.soc() # old

soc_rx = battery_pool.soc.new_receiver() # new
```

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
Expand Down
6 changes: 3 additions & 3 deletions examples/battery_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ async def main() -> None:

battery_pool = microgrid.battery_pool()
receivers: Dict[str, Receiver[Any]] = {
"soc": await battery_pool.soc(maxsize=1),
"capacity": await battery_pool.capacity(maxsize=1),
"power_bounds": await battery_pool.power_bounds(maxsize=1),
"soc": battery_pool.soc.new_receiver(maxsize=1),
"capacity": battery_pool.capacity.new_receiver(maxsize=1),
"power_bounds": battery_pool.power_bounds.new_receiver(maxsize=1),
}

merged_channel = MergeNamed[Any](**receivers)
Expand Down
14 changes: 9 additions & 5 deletions src/frequenz/sdk/timeseries/battery_pool/_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from frequenz.channels import Broadcast, Receiver

from ..._internal._asyncio import cancel_and_await
from ..._internal._constants import WAIT_FOR_COMPONENT_DATA_SEC
from ..._internal._constants import RECEIVER_MAX_SIZE, WAIT_FOR_COMPONENT_DATA_SEC
from ._component_metric_fetcher import (
ComponentMetricFetcher,
LatestBatteryMetricsFetcher,
Expand All @@ -25,7 +25,7 @@
_logger = logging.getLogger(__name__)


class AggregateMethod(Generic[T], ABC):
class MetricAggregator(Generic[T], ABC):
"""Interface to control how the component data should be aggregated and send."""

@abstractmethod
Expand All @@ -37,7 +37,9 @@ def update_working_batteries(self, new_working_batteries: set[int]) -> None:
"""

@abstractmethod
def new_receiver(self, maxsize: int | None) -> Receiver[T | None]:
def new_receiver(
self, maxsize: int | None = RECEIVER_MAX_SIZE
) -> Receiver[T | None]:
"""Return new receiver for the aggregated metric results.
Args:
Expand All @@ -61,7 +63,7 @@ def name(cls) -> str:
"""


class SendOnUpdate(AggregateMethod[T]):
class SendOnUpdate(MetricAggregator[T]):
"""Wait for the change of the components metrics and send updated result.
This method will cache the component metrics. When any metric change it will
Expand Down Expand Up @@ -110,7 +112,9 @@ def name(cls) -> str:
"""
return "SendOnUpdate"

def new_receiver(self, maxsize: int | None) -> Receiver[T | None]:
def new_receiver(
self, maxsize: int | None = RECEIVER_MAX_SIZE
) -> Receiver[T | None]:
"""Return new receiver for the aggregated metric results.
Args:
Expand Down
58 changes: 27 additions & 31 deletions src/frequenz/sdk/timeseries/battery_pool/battery_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from frequenz.channels import Receiver, Sender

from ..._internal._asyncio import cancel_and_await
from ..._internal._constants import RECEIVER_MAX_SIZE
from ...actor import ChannelRegistry, ComponentMetricRequest
from ...actor.power_distributing._battery_pool_status import BatteryStatus
from ...microgrid import connection_manager
Expand All @@ -25,7 +24,7 @@
FormulaGeneratorConfig,
FormulaType,
)
from ._methods import AggregateMethod, SendOnUpdate
from ._methods import MetricAggregator, SendOnUpdate
from ._metric_calculator import CapacityCalculator, PowerBoundsCalculator, SoCCalculator
from ._result_types import CapacityMetrics, PowerMetrics, SoCMetrics

Expand Down Expand Up @@ -85,7 +84,7 @@ def __init__( # pylint: disable=too-many-arguments
)

self._min_update_interval = min_update_interval
self._active_methods: dict[str, AggregateMethod[Any]] = {}
self._active_methods: dict[str, MetricAggregator[Any]] = {}

self._namespace: str = f"battery-pool-{self._batteries}-{uuid.uuid4()}"
self._formula_pool: FormulaEnginePool = FormulaEnginePool(
Expand Down Expand Up @@ -184,19 +183,19 @@ def consumption_power(self) -> FormulaEngine:
assert isinstance(engine, FormulaEngine)
return engine

async def soc(
self, maxsize: int | None = RECEIVER_MAX_SIZE
) -> Receiver[SoCMetrics | None]:
@property
def soc(self) -> MetricAggregator[SoCMetrics]:
"""Get receiver to receive new soc metrics when they change.
Soc formulas are described in the receiver return type.
None will be send if there is no component to calculate metric.
Soc formulas are described in the receiver return type. None will be send if
there is no component to calculate metric.
Args:
maxsize: Maxsize of the receiver channel.
A receiver from the MetricAggregator can be obtained by calling the
`new_receiver` method.
Returns:
Receiver for this metric.
A MetricAggregator that will calculate and stream the aggregate soc of
all batteries in the pool.
"""
method_name = SendOnUpdate.name() + "_" + SoCCalculator.name()

Expand All @@ -208,22 +207,21 @@ async def soc(
min_update_interval=self._min_update_interval,
)

running_method = self._active_methods[method_name]
return running_method.new_receiver(maxsize)
return self._active_methods[method_name]

async def capacity(
self, maxsize: int | None = RECEIVER_MAX_SIZE
) -> Receiver[CapacityMetrics | None]:
@property
def capacity(self) -> MetricAggregator[CapacityMetrics]:
"""Get receiver to receive new capacity metrics when they change.
Capacity formulas are described in the receiver return type.
None will be send if there is no component to calculate metrics.
Capacity formulas are described in the receiver return type. None will be send
if there is no component to calculate metrics.
Args:
maxsize: Maxsize of the receiver channel.
A receiver from the MetricAggregator can be obtained by calling the
`new_receiver` method.
Returns:
Receiver for this metric.
A MetricAggregator that will calculate and stream the capacity of all
batteries in the pool.
"""
method_name = SendOnUpdate.name() + "_" + CapacityCalculator.name()

Expand All @@ -235,22 +233,21 @@ async def capacity(
min_update_interval=self._min_update_interval,
)

running_method = self._active_methods[method_name]
return running_method.new_receiver(maxsize)
return self._active_methods[method_name]

async def power_bounds(
self, maxsize: int | None = RECEIVER_MAX_SIZE
) -> Receiver[PowerMetrics | None]:
@property
def power_bounds(self) -> MetricAggregator[PowerMetrics]:
"""Get receiver to receive new power bounds when they change.
Power bounds formulas are described in the receiver return type.
None will be send if there is no component to calculate metrics.
Args:
maxsize: Maxsize of the receivers channel.
A receiver from the MetricAggregator can be obtained by calling the
`new_receiver` method.
Returns:
Receiver for this metric.
A MetricAggregator that will calculate and stream the power bounds
of all batteries in the pool.
"""
method_name = SendOnUpdate.name() + "_" + PowerBoundsCalculator.name()

Expand All @@ -262,8 +259,7 @@ async def power_bounds(
min_update_interval=self._min_update_interval,
)

running_method = self._active_methods[method_name]
return running_method.new_receiver(maxsize)
return self._active_methods[method_name]

async def stop(self) -> None:
"""Stop all pending async tasks."""
Expand Down
6 changes: 3 additions & 3 deletions tests/timeseries/_battery_pool/test_battery_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ async def run_capacity_test(setup_args: SetupArgs) -> None:
sampling_rate=0.05,
)

capacity_receiver = await battery_pool.capacity(maxsize=50)
capacity_receiver = battery_pool.capacity.new_receiver(maxsize=50)

# First metrics delivers slower because of the startup delay in the pool.
msg = await asyncio.wait_for(
Expand Down Expand Up @@ -633,7 +633,7 @@ async def run_soc_test(setup_args: SetupArgs) -> None:
sampling_rate=0.05,
)

receiver = await battery_pool.soc(maxsize=50)
receiver = battery_pool.soc.new_receiver(maxsize=50)

# First metrics delivers slower because of the startup delay in the pool.
msg = await asyncio.wait_for(
Expand Down Expand Up @@ -775,7 +775,7 @@ async def run_power_bounds_test( # pylint: disable=too-many-locals
sampling_rate=0.1,
)

receiver = await battery_pool.power_bounds(maxsize=50)
receiver = battery_pool.power_bounds.new_receiver(maxsize=50)

# First metrics delivers slower because of the startup delay in the pool.
msg = await asyncio.wait_for(
Expand Down

0 comments on commit 23ff30c

Please sign in to comment.