From 9797c16a6f30f1ccc94856813a308ef53ea215af Mon Sep 17 00:00:00 2001 From: Daniel Zullo Date: Fri, 5 May 2023 14:49:08 +0200 Subject: [PATCH 1/7] Fix wrong documentation in tests Signed-off-by: Daniel Zullo --- tests/actor/test_power_distributing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/actor/test_power_distributing.py b/tests/actor/test_power_distributing.py index 4f5bade57..e8a4db50e 100644 --- a/tests/actor/test_power_distributing.py +++ b/tests/actor/test_power_distributing.py @@ -246,7 +246,7 @@ async def test_battery_soc_nan(self, mocker: MockerFixture) -> None: async def test_battery_capacity_nan(self, mocker: MockerFixture) -> None: # pylint: disable=too-many-locals - """Test if power distribution works with single user works.""" + """Test battery with capacity set to NaN is not used.""" mock_microgrid = await self.init_mock_microgrid(mocker) await mock_microgrid.send( @@ -299,7 +299,7 @@ async def test_battery_capacity_nan(self, mocker: MockerFixture) -> None: async def test_battery_power_bounds_nan(self, mocker: MockerFixture) -> None: # pylint: disable=too-many-locals - """Check if missign""" + """Test battery with power bounds set to NaN is not used.""" mock_microgrid = await self.init_mock_microgrid(mocker) # Battery 206 should work even if his inverter sends NaN From 366462c5c7abd0dcfcea0831bc6eab4459ce98f4 Mon Sep 17 00:00:00 2001 From: Daniel Zullo Date: Wed, 17 May 2023 23:46:41 +0200 Subject: [PATCH 2/7] Use future annotations in power distribution tests Future annotations is already imported and it should be used instead. Signed-off-by: Daniel Zullo --- tests/actor/test_power_distributing.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/actor/test_power_distributing.py b/tests/actor/test_power_distributing.py index e8a4db50e..63b63e911 100644 --- a/tests/actor/test_power_distributing.py +++ b/tests/actor/test_power_distributing.py @@ -2,10 +2,13 @@ # Copyright © 2023 Frequenz Energy-as-a-Service GmbH """Tests power distributor""" + +from __future__ import annotations + import asyncio import re from dataclasses import dataclass -from typing import Set, Tuple, TypeVar +from typing import TypeVar from unittest.mock import AsyncMock, MagicMock from frequenz.channels import Bidirectional, Broadcast, Receiver, Sender @@ -48,7 +51,7 @@ class TestPowerDistributingActor: # pylint: disable=protected-access """Test tool to distribute power""" - def component_graph(self) -> Tuple[Set[Component], Set[Connection]]: + def component_graph(self) -> tuple[set[Component], set[Connection]]: """Create graph components Returns: From 6116e958ffadc77ac628da975201f8ea4281c5dd Mon Sep 17 00:00:00 2001 From: Daniel Zullo Date: Fri, 5 May 2023 15:26:38 +0200 Subject: [PATCH 3/7] Test all batteries are used if none of them works A test was not added when the power distributing actor was updated in the past to use all the batteries for cases where all of them are reported as not working batteries. Signed-off-by: Daniel Zullo --- tests/actor/test_power_distributing.py | 45 ++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/tests/actor/test_power_distributing.py b/tests/actor/test_power_distributing.py index 63b63e911..c14bd6296 100644 --- a/tests/actor/test_power_distributing.py +++ b/tests/actor/test_power_distributing.py @@ -745,3 +745,48 @@ async def test_not_all_batteries_are_working(self, mocker: MockerFixture) -> Non assert result.excess_power == approx(700.0) assert result.succeeded_power == approx(500.0) assert result.request == request + + async def test_use_all_batteries_none_is_working( + self, mocker: MockerFixture + ) -> None: + """Test all batteries are used if none of them works.""" + await self.init_mock_microgrid(mocker) + + mocker.patch("asyncio.sleep", new_callable=AsyncMock) + + attrs: dict[str, set[int]] = {"get_working_batteries.return_value": set()} + mocker.patch( + "frequenz.sdk.actor.power_distributing.power_distributing.BatteryPoolStatus", + return_value=MagicMock(spec=BatteryPoolStatus, **attrs), + ) + + channel = Bidirectional[Request, Result]("user1", "power_distributor") + battery_status_channel = Broadcast[BatteryStatus]("battery_status") + distributor = PowerDistributingActor( + users_channels={"user1": channel.service_handle}, + battery_status_sender=battery_status_channel.new_sender(), + ) + + request = Request( + power=1200.0, + batteries={106, 206}, + request_timeout_sec=SAFETY_TIMEOUT, + ) + + await channel.client_handle.send(request) + + done, pending = await asyncio.wait( + [asyncio.create_task(channel.client_handle.receive())], + timeout=SAFETY_TIMEOUT, + ) + + assert len(pending) == 0 + assert len(done) == 1 + result = done.pop().result() + assert isinstance(result, Success) + assert result.succeeded_batteries == {106, 206} + assert result.excess_power == approx(200.0) + assert result.succeeded_power == approx(1000.0) + assert result.request == request + + await distributor._stop_actor() From a1bf628b4ffe28b77c455821b8795cd3bddad3ee Mon Sep 17 00:00:00 2001 From: Daniel Zullo Date: Wed, 17 May 2023 23:41:18 +0200 Subject: [PATCH 4/7] Amend existent power distributing test To make it consistent with recently added power distributing tests. Signed-off-by: Daniel Zullo --- tests/actor/test_power_distributing.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/tests/actor/test_power_distributing.py b/tests/actor/test_power_distributing.py index c14bd6296..81d72e546 100644 --- a/tests/actor/test_power_distributing.py +++ b/tests/actor/test_power_distributing.py @@ -704,48 +704,48 @@ async def test_power_distributor_one_user_adjust_power_success( assert result.request == request async def test_not_all_batteries_are_working(self, mocker: MockerFixture) -> None: - # pylint: disable=too-many-locals """Test if power distribution works if not all batteries are working.""" await self.init_mock_microgrid(mocker) - channel = Bidirectional[Request, Result]("user1", "power_distributor") + mocker.patch("asyncio.sleep", new_callable=AsyncMock) - request = Request( - power=1200.0, batteries={106, 206}, request_timeout_sec=SAFETY_TIMEOUT - ) + batteries = {106, 206} - attrs = {"get_working_batteries.return_value": request.batteries - {106}} + attrs = {"get_working_batteries.return_value": batteries - {106}} mocker.patch( "frequenz.sdk.actor.power_distributing.power_distributing.BatteryPoolStatus", return_value=MagicMock(spec=BatteryPoolStatus, **attrs), ) - mocker.patch("asyncio.sleep", new_callable=AsyncMock) - + channel = Bidirectional[Request, Result]("user1", "power_distributor") battery_status_channel = Broadcast[BatteryStatus]("battery_status") distributor = PowerDistributingActor( users_channels={"user1": channel.service_handle}, battery_status_sender=battery_status_channel.new_sender(), ) - client_handle = channel.client_handle - await client_handle.send(request) + request = Request( + power=1200.0, batteries=batteries, request_timeout_sec=SAFETY_TIMEOUT + ) + + await channel.client_handle.send(request) done, pending = await asyncio.wait( - [asyncio.create_task(client_handle.receive())], + [asyncio.create_task(channel.client_handle.receive())], timeout=SAFETY_TIMEOUT, ) - await distributor._stop_actor() assert len(pending) == 0 assert len(done) == 1 - result = done.pop().result() assert isinstance(result, Success) + assert result.succeeded_batteries == {206} assert result.excess_power == approx(700.0) assert result.succeeded_power == approx(500.0) assert result.request == request + await distributor._stop_actor() + async def test_use_all_batteries_none_is_working( self, mocker: MockerFixture ) -> None: From 7ee1bf3b7fac0485b470692139a0391af46fa48c Mon Sep 17 00:00:00 2001 From: Daniel Zullo Date: Wed, 26 Apr 2023 19:53:40 +0200 Subject: [PATCH 5/7] Simplify working batteries check The _get_working_batteries method had unnecessary complexity and was redundant as it was returning the original set of batteries if there was any working battery. Therefore, this commit removes the _get_working_batteries method and simplifies the check for working batteries. Signed-off-by: Daniel Zullo --- .../power_distributing/power_distributing.py | 22 +++---------------- 1 file changed, 3 insertions(+), 19 deletions(-) diff --git a/src/frequenz/sdk/actor/power_distributing/power_distributing.py b/src/frequenz/sdk/actor/power_distributing/power_distributing.py index 5066a06e2..a7b8b9b1e 100644 --- a/src/frequenz/sdk/actor/power_distributing/power_distributing.py +++ b/src/frequenz/sdk/actor/power_distributing/power_distributing.py @@ -551,24 +551,6 @@ def _get_components_pairs( return bat_inv_map, inv_bat_map - def _get_working_batteries(self, batteries: Set[int]) -> Set[int]: - """Get subset with working batteries. - - If none of the given batteries are working, then treat all of them - as working. - - Args: - batteries: requested batteries - - Returns: - Subset with working batteries or input set if none of the given batteries - are working. - """ - working_batteries = self._all_battery_status.get_working_batteries(batteries) - if len(working_batteries) == 0: - return batteries - return working_batteries - def _get_components_data(self, batteries: Set[int]) -> List[InvBatPair]: """Get data for the given batteries and adjacent inverters. @@ -582,7 +564,9 @@ def _get_components_data(self, batteries: Set[int]) -> List[InvBatPair]: Pairs of battery and adjacent inverter data. """ pairs_data: List[InvBatPair] = [] - working_batteries = self._get_working_batteries(batteries) + working_batteries = ( + self._all_battery_status.get_working_batteries(batteries) or batteries + ) for battery_id in working_batteries: if battery_id not in self._battery_receivers: From 054038133f93bcb549ae1865f9aae353659291c3 Mon Sep 17 00:00:00 2001 From: Daniel Zullo Date: Wed, 17 May 2023 23:56:56 +0200 Subject: [PATCH 6/7] Add battery request check for empty set Requests with empty batteries set need to be rejected. Signed-off-by: Daniel Zullo --- .../sdk/actor/power_distributing/power_distributing.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/frequenz/sdk/actor/power_distributing/power_distributing.py b/src/frequenz/sdk/actor/power_distributing/power_distributing.py index a7b8b9b1e..2edd35f30 100644 --- a/src/frequenz/sdk/actor/power_distributing/power_distributing.py +++ b/src/frequenz/sdk/actor/power_distributing/power_distributing.py @@ -388,6 +388,9 @@ def _check_request(self, request: Request) -> Optional[Result]: Returns: Result for the user if the request is wrong, None otherwise. """ + if len(request.batteries) == 0: + return Error(request=request, msg="Empty battery IDs in the request") + for battery in request.batteries: if battery not in self._battery_receivers: msg = ( From 7fa7b70b3d26266d0c2f336300b18b9c2ae878fb Mon Sep 17 00:00:00 2001 From: Daniel Zullo Date: Wed, 26 Apr 2023 22:25:11 +0200 Subject: [PATCH 7/7] Allow optionally forcing power requests A power request might need to be forced to implement safety mechanisms, even when some components might be seemingly failing (i.e. when there is not proper consumption information, the user wants to slowly discharge batteries to prevent potential peak breaches). Signed-off-by: Daniel Zullo --- RELEASE_NOTES.md | 2 + .../power_distributing/power_distributing.py | 142 ++++++++++--- .../sdk/actor/power_distributing/request.py | 11 + .../sdk/power/_distribution_algorithm.py | 21 ++ tests/actor/test_power_distributing.py | 194 ++++++++++++++++++ 5 files changed, 345 insertions(+), 25 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index e40e73b2e..c13f83dd0 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -24,6 +24,8 @@ This release drops support for Python versions older than 3.11. soc_rx = battery_pool.soc.new_receiver() # new ``` +* A power request can now be forced by setting the `include_broken` attribute. This is especially helpful as a safety measure when components appear to be failing, such as when battery metrics are unavailable. Note that applications previously relying on automatic fallback to all batteries when none of them was working will now require the `include_broken` attribute to be explicitly set in the request. + ## New Features diff --git a/src/frequenz/sdk/actor/power_distributing/power_distributing.py b/src/frequenz/sdk/actor/power_distributing/power_distributing.py index 2edd35f30..33c993da1 100644 --- a/src/frequenz/sdk/actor/power_distributing/power_distributing.py +++ b/src/frequenz/sdk/actor/power_distributing/power_distributing.py @@ -15,18 +15,12 @@ import asyncio import logging +import time from asyncio.tasks import ALL_COMPLETED from dataclasses import dataclass, replace +from datetime import timedelta from math import isnan -from typing import ( # pylint: disable=unused-import - Any, - Dict, - Iterable, - List, - Optional, - Set, - Tuple, -) +from typing import Any, Dict, Iterable, List, Optional, Self, Set, Tuple import grpc from frequenz.channels import Bidirectional, Peekable, Receiver, Sender @@ -62,6 +56,40 @@ class _User: """The bidirectional channel to communicate with the user.""" +@dataclass +class _CacheEntry: + """Represents an entry in the cache with expiry time.""" + + inv_bat_pair: InvBatPair + """The inverter and adjacent battery data pair.""" + + expiry_time: int + """The expiration time (taken from the monotonic clock) of the cache entry.""" + + @classmethod + def from_ttl( + cls, inv_bat_pair: InvBatPair, ttl: timedelta = timedelta(hours=2.5) + ) -> Self: + """Initialize a CacheEntry instance from a TTL (Time-To-Live). + + Args: + inv_bat_pair: the inverter and adjacent battery data pair to cache. + ttl: the time a cache entry is kept alive. + + Returns: + this class instance. + """ + return cls(inv_bat_pair, time.monotonic_ns() + int(ttl.total_seconds() * 1e9)) + + def has_expired(self) -> bool: + """Check whether the cache entry has expired. + + Returns: + whether the cache entry has expired. + """ + return time.monotonic_ns() >= self.expiry_time + + @actor class PowerDistributingActor: # pylint: disable=too-many-instance-attributes @@ -211,6 +239,10 @@ def __init__( max_data_age_sec=10.0, ) + self._cached_metrics: dict[int, _CacheEntry | None] = { + bat_id: None for bat_id, _ in self._bat_inv_map.items() + } + def _create_users_tasks(self) -> List[asyncio.Task[None]]: """For each user create a task to wait for request. @@ -224,7 +256,7 @@ def _create_users_tasks(self) -> List[asyncio.Task[None]]: ) return tasks - def _get_upper_bound(self, batteries: Set[int]) -> float: + def _get_upper_bound(self, batteries: Set[int], include_broken: bool) -> float: """Get total upper bound of power to be set for given batteries. Note, output of that function doesn't guarantee that this bound will be @@ -232,17 +264,21 @@ def _get_upper_bound(self, batteries: Set[int]) -> float: Args: batteries: List of batteries + include_broken: whether all batteries in the batteries set in the + request must be used regardless the status. Returns: Upper bound for `set_power` operation. """ - pairs_data: List[InvBatPair] = self._get_components_data(batteries) + pairs_data: List[InvBatPair] = self._get_components_data( + batteries, include_broken + ) return sum( min(battery.power_upper_bound, inverter.active_power_upper_bound) for battery, inverter in pairs_data ) - def _get_lower_bound(self, batteries: Set[int]) -> float: + def _get_lower_bound(self, batteries: Set[int], include_broken: bool) -> float: """Get total lower bound of power to be set for given batteries. Note, output of that function doesn't guarantee that this bound will be @@ -250,11 +286,15 @@ def _get_lower_bound(self, batteries: Set[int]) -> float: Args: batteries: List of batteries + include_broken: whether all batteries in the batteries set in the + request must be used regardless the status. Returns: Lower bound for `set_power` operation. """ - pairs_data: List[InvBatPair] = self._get_components_data(batteries) + pairs_data: List[InvBatPair] = self._get_components_data( + batteries, include_broken + ) return sum( max(battery.power_lower_bound, inverter.active_power_lower_bound) for battery, inverter in pairs_data @@ -282,21 +322,19 @@ async def run(self) -> None: try: pairs_data: List[InvBatPair] = self._get_components_data( - request.batteries + request.batteries, request.include_broken ) except KeyError as err: await user.channel.send(Error(request=request, msg=str(err))) continue - if len(pairs_data) == 0: + if not pairs_data and not request.include_broken: error_msg = f"No data for the given batteries {str(request.batteries)}" await user.channel.send(Error(request=request, msg=str(error_msg))) continue try: - distribution = self.distribution_algorithm.distribute_power( - request.power, pairs_data - ) + distribution = self._get_power_distribution(request, pairs_data) except ValueError as err: error_msg = f"Couldn't distribute power, error: {str(err)}" await user.channel.send(Error(request=request, msg=str(error_msg))) @@ -379,6 +417,44 @@ async def _set_distributed_power( return self._parse_result(tasks, distribution.distribution, timeout_sec) + def _get_power_distribution( + self, request: Request, inv_bat_pairs: List[InvBatPair] + ) -> DistributionResult: + """Get power distribution result for the batteries in the request. + + Args: + request: the power request to process. + inv_bat_pairs: the battery and adjacent inverter data pairs. + + Returns: + the power distribution result. + """ + available_bat_ids = {battery.component_id for battery, _ in inv_bat_pairs} + unavailable_bat_ids = request.batteries - available_bat_ids + unavailable_inv_ids = { + self._bat_inv_map[battery_id] for battery_id in unavailable_bat_ids + } + + if request.include_broken and not available_bat_ids: + return self.distribution_algorithm.distribute_power_equally( + request.power, unavailable_inv_ids + ) + + result = self.distribution_algorithm.distribute_power( + request.power, inv_bat_pairs + ) + + if request.include_broken and unavailable_inv_ids: + additional_result = self.distribution_algorithm.distribute_power_equally( + result.remaining_power, unavailable_inv_ids + ) + + for inv_id, power in additional_result.distribution.items(): + result.distribution[inv_id] = power + result.remaining_power = 0.0 + + return result + def _check_request(self, request: Request) -> Optional[Result]: """Check whether the given request if correct. @@ -388,7 +464,7 @@ def _check_request(self, request: Request) -> Optional[Result]: Returns: Result for the user if the request is wrong, None otherwise. """ - if len(request.batteries) == 0: + if not request.batteries: return Error(request=request, msg="Empty battery IDs in the request") for battery in request.batteries: @@ -401,11 +477,11 @@ def _check_request(self, request: Request) -> Optional[Result]: if not request.adjust_power: if request.power < 0: - bound = self._get_lower_bound(request.batteries) + bound = self._get_lower_bound(request.batteries, request.include_broken) if request.power < bound: return OutOfBound(request=request, bound=bound) else: - bound = self._get_upper_bound(request.batteries) + bound = self._get_upper_bound(request.batteries, request.include_broken) if request.power > bound: return OutOfBound(request=request, bound=bound) @@ -554,11 +630,15 @@ def _get_components_pairs( return bat_inv_map, inv_bat_map - def _get_components_data(self, batteries: Set[int]) -> List[InvBatPair]: + def _get_components_data( + self, batteries: Set[int], include_broken: bool + ) -> List[InvBatPair]: """Get data for the given batteries and adjacent inverters. Args: batteries: Batteries that needs data. + include_broken: whether all batteries in the batteries set in the + request must be used regardless the status. Raises: KeyError: If any battery in the given list doesn't exists in microgrid. @@ -568,7 +648,9 @@ def _get_components_data(self, batteries: Set[int]) -> List[InvBatPair]: """ pairs_data: List[InvBatPair] = [] working_batteries = ( - self._all_battery_status.get_working_batteries(batteries) or batteries + batteries + if include_broken + else self._all_battery_status.get_working_batteries(batteries) ) for battery_id in working_batteries: @@ -581,6 +663,12 @@ def _get_components_data(self, batteries: Set[int]) -> List[InvBatPair]: inverter_id: int = self._bat_inv_map[battery_id] data = self._get_battery_inverter_data(battery_id, inverter_id) + if not data and include_broken: + cached_entry = self._cached_metrics[battery_id] + if cached_entry and not cached_entry.has_expired(): + data = cached_entry.inv_bat_pair + else: + data = None if data is None: _logger.warning( "Skipping battery %d because its message isn't correct.", @@ -648,7 +736,9 @@ def _get_battery_inverter_data( # If all values are ok then return them. if not any(map(isnan, replaceable_metrics)): - return InvBatPair(battery_data, inverter_data) + inv_bat_pair = InvBatPair(battery_data, inverter_data) + self._cached_metrics[battery_id] = _CacheEntry.from_ttl(inv_bat_pair) + return inv_bat_pair # Replace NaN with the corresponding value in the adjacent component. # If both metrics are None, return None to ignore this battery. @@ -670,10 +760,12 @@ def _get_battery_inverter_data( elif isnan(inv_bound): inverter_new_metrics[inv_attr] = bat_bound - return InvBatPair( + inv_bat_pair = InvBatPair( replace(battery_data, **battery_new_metrics), replace(inverter_data, **inverter_new_metrics), ) + self._cached_metrics[battery_id] = _CacheEntry.from_ttl(inv_bat_pair) + return inv_bat_pair async def _create_channels(self) -> None: """Create channels to get data of components in microgrid.""" diff --git a/src/frequenz/sdk/actor/power_distributing/request.py b/src/frequenz/sdk/actor/power_distributing/request.py index abf852628..d2da69032 100644 --- a/src/frequenz/sdk/actor/power_distributing/request.py +++ b/src/frequenz/sdk/actor/power_distributing/request.py @@ -29,3 +29,14 @@ class Request: If `False` and the power is outside the batteries' bounds, the request will fail and be replied to with an `OutOfBound` result. """ + + include_broken: bool = False + """Whether to use all batteries included in the batteries set regardless the status. + + if `True`, the remaining power after distributing between working batteries + will be distributed equally between broken batteries. Also if all batteries + in the batteries set are broken then the power is distributed equally between + broken batteries. + + if `False`, the power will be only distributed between the working batteries. + """ diff --git a/src/frequenz/sdk/power/_distribution_algorithm.py b/src/frequenz/sdk/power/_distribution_algorithm.py index c50df54fb..d90feea0e 100644 --- a/src/frequenz/sdk/power/_distribution_algorithm.py +++ b/src/frequenz/sdk/power/_distribution_algorithm.py @@ -423,6 +423,27 @@ def _greedy_distribute_remaining_power( return DistributionResult(new_distribution, remaining_power) + def distribute_power_equally( + self, power: float, inverters: set[int] + ) -> DistributionResult: + """Distribute the power equally between the inverters in the set. + + This function is mainly useful to set the power for components that are + broken or have no metrics available. + + Args: + power: the power to distribute. + inverters: the inverters to set the power to. + + Returns: + the power distribution result. + """ + power_per_inverter = power / len(inverters) + return DistributionResult( + distribution={id: power_per_inverter for id in inverters}, + remaining_power=0.0, + ) + def distribute_power( self, power: float, components: List[InvBatPair] ) -> DistributionResult: diff --git a/tests/actor/test_power_distributing.py b/tests/actor/test_power_distributing.py index 81d72e546..63d52e4ec 100644 --- a/tests/actor/test_power_distributing.py +++ b/tests/actor/test_power_distributing.py @@ -771,6 +771,7 @@ async def test_use_all_batteries_none_is_working( power=1200.0, batteries={106, 206}, request_timeout_sec=SAFETY_TIMEOUT, + include_broken=True, ) await channel.client_handle.send(request) @@ -790,3 +791,196 @@ async def test_use_all_batteries_none_is_working( assert result.request == request await distributor._stop_actor() + + async def test_force_request_a_battery_is_not_working( + self, mocker: MockerFixture + ) -> None: + """Test force request when a battery is not working.""" + await self.init_mock_microgrid(mocker) + + mocker.patch("asyncio.sleep", new_callable=AsyncMock) + + batteries = {106, 206} + + attrs = {"get_working_batteries.return_value": batteries - {106}} + mocker.patch( + "frequenz.sdk.actor.power_distributing.power_distributing.BatteryPoolStatus", + return_value=MagicMock(spec=BatteryPoolStatus, **attrs), + ) + + channel = Bidirectional[Request, Result]("user1", "power_distributor") + battery_status_channel = Broadcast[BatteryStatus]("battery_status") + distributor = PowerDistributingActor( + {"user1": channel.service_handle}, + battery_status_sender=battery_status_channel.new_sender(), + ) + + request = Request( + power=1200.0, + batteries=batteries, + request_timeout_sec=SAFETY_TIMEOUT, + include_broken=True, + ) + + await channel.client_handle.send(request) + + done, pending = await asyncio.wait( + [asyncio.create_task(channel.client_handle.receive())], + timeout=SAFETY_TIMEOUT, + ) + + assert len(pending) == 0 + assert len(done) == 1 + result = done.pop().result() + assert isinstance(result, Success) + assert result.succeeded_batteries == {106, 206} + assert result.excess_power == approx(200.0) + assert result.succeeded_power == approx(1000.0) + assert result.request == request + + await distributor._stop_actor() + + async def test_force_request_battery_nan_value_non_cached( + self, mocker: MockerFixture + ) -> None: + """Test battery with NaN in SoC, capacity or power is used if request is forced.""" + mock_microgrid = await self.init_mock_microgrid(mocker) + + mocker.patch("asyncio.sleep", new_callable=AsyncMock) + + batteries = {106, 206} + + attrs = {"get_working_batteries.return_value": batteries} + mocker.patch( + "frequenz.sdk.actor.power_distributing.power_distributing.BatteryPoolStatus", + return_value=MagicMock(spec=BatteryPoolStatus, **attrs), + ) + + channel = Bidirectional[Request, Result]("user1", "power_distributor") + battery_status_channel = Broadcast[BatteryStatus]("battery_status") + distributor = PowerDistributingActor( + {"user1": channel.service_handle}, + battery_status_sender=battery_status_channel.new_sender(), + ) + + request = Request( + power=1200.0, + batteries=batteries, + request_timeout_sec=SAFETY_TIMEOUT, + include_broken=True, + ) + + batteries_data = ( + battery_msg( + 106, + soc=Metric(float("NaN"), Bound(20, 80)), + capacity=Metric(float("NaN")), + power=Bound(-1000, 1000), + ), + battery_msg( + 206, + soc=Metric(40, Bound(20, 80)), + capacity=Metric(float("NaN")), + power=Bound(-1000, 1000), + ), + ) + + for battery in batteries_data: + await mock_microgrid.send(battery) + + await channel.client_handle.send(request) + + done, pending = await asyncio.wait( + [asyncio.create_task(channel.client_handle.receive())], + timeout=SAFETY_TIMEOUT, + ) + + assert len(pending) == 0 + assert len(done) == 1 + result: Result = done.pop().result() + assert isinstance(result, Success) + assert result.succeeded_batteries == batteries + assert result.succeeded_power == approx(1199.9999) + assert result.excess_power == approx(0.0) + assert result.request == request + + await distributor._stop_actor() + + async def test_force_request_batteries_nan_values_cached( + self, mocker: MockerFixture + ) -> None: + """Test battery with NaN in SoC, capacity or power is used if request is forced.""" + mock_microgrid = await self.init_mock_microgrid(mocker) + + mocker.patch("asyncio.sleep", new_callable=AsyncMock) + + batteries = {106, 206, 306} + + attrs = {"get_working_batteries.return_value": batteries} + mocker.patch( + "frequenz.sdk.actor.power_distributing.power_distributing.BatteryPoolStatus", + return_value=MagicMock(spec=BatteryPoolStatus, **attrs), + ) + + channel = Bidirectional[Request, Result]("user1", "power_distributor") + battery_status_channel = Broadcast[BatteryStatus]("battery_status") + distributor = PowerDistributingActor( + {"user1": channel.service_handle}, + battery_status_sender=battery_status_channel.new_sender(), + ) + + request = Request( + power=1200.0, + batteries=batteries, + request_timeout_sec=SAFETY_TIMEOUT, + include_broken=True, + ) + + async def test_result() -> None: + done, pending = await asyncio.wait( + [asyncio.create_task(channel.client_handle.receive())], + timeout=SAFETY_TIMEOUT, + ) + assert len(pending) == 0 + assert len(done) == 1 + result: Result = done.pop().result() + assert isinstance(result, Success) + assert result.succeeded_batteries == batteries + assert result.succeeded_power == approx(1199.9999) + assert result.excess_power == approx(0.0) + assert result.request == request + + batteries_data = ( + battery_msg( + 106, + soc=Metric(float("NaN"), Bound(20, 80)), + capacity=Metric(98000), + power=Bound(-1000, 1000), + ), + battery_msg( + 206, + soc=Metric(40, Bound(20, 80)), + capacity=Metric(float("NaN")), + power=Bound(-1000, 1000), + ), + battery_msg( + 306, + soc=Metric(40, Bound(20, 80)), + capacity=Metric(float(98000)), + power=Bound(float("NaN"), float("NaN")), + ), + ) + + # This request is needed to set the battery metrics cache to have valid + # metrics so that the distribution algorithm can be used in the next + # request where the batteries report NaN in the metrics. + await channel.client_handle.send(request) + await test_result() + + for battery in batteries_data: + await mock_microgrid.send(battery) + + await channel.client_handle.send(request) + await test_result() + + await distributor._stop_actor()