From 24a08eb3ee2dacdb0b39e86be626226358700a34 Mon Sep 17 00:00:00 2001 From: Daniel Zullo Date: Wed, 26 Apr 2023 22:25:11 +0200 Subject: [PATCH] Allow optionally forcing power requests A power request might need to be forced to implement safety mechanisms, even when some components might be seemingly failing (i.e. when there is not proper consumption information, the user wants to slowly discharge batteries to prevent potential peak breaches). Signed-off-by: Daniel Zullo --- RELEASE_NOTES.md | 2 + .../power_distributing/power_distributing.py | 130 ++++++++++-- .../sdk/actor/power_distributing/request.py | 3 + tests/actor/test_power_distributing.py | 194 ++++++++++++++++++ 4 files changed, 313 insertions(+), 16 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 4fee8cb94..4b14cb8ea 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -16,6 +16,8 @@ This release drops support for Python versions older than 3.11. * Now `frequenz.sdk.timeseries.Sample` uses a more sensible comparison. Before this release `Sample`s were compared only based on the `timestamp`. This was due to a limitation in Python versions earlier than 3.10. Now that the minimum supported version is 3.11 this hack is not needed anymore and `Sample`s are compared using both `timestamp` and `value` as most people probably expects. +* A power request can now be forced by setting the `force` attribute. This is especially helpful as a safety measure when components appear to be failing, such as when battery metrics are unavailable. Note that applications previously relying on automatic fallback to all batteries when none of them was working will now require the force attribute to be explicitly set in the request. + ## New Features diff --git a/src/frequenz/sdk/actor/power_distributing/power_distributing.py b/src/frequenz/sdk/actor/power_distributing/power_distributing.py index 2edd35f30..345669e2f 100644 --- a/src/frequenz/sdk/actor/power_distributing/power_distributing.py +++ b/src/frequenz/sdk/actor/power_distributing/power_distributing.py @@ -17,6 +17,7 @@ import logging from asyncio.tasks import ALL_COMPLETED from dataclasses import dataclass, replace +from datetime import datetime, timedelta, timezone from math import isnan from typing import ( # pylint: disable=unused-import Any, @@ -62,6 +63,34 @@ class _User: """The bidirectional channel to communicate with the user.""" +@dataclass +class _CacheEntry: + """Represents an entry in the cache with expiry time.""" + + inv_bat_pair: InvBatPair + """The inverter and adjacent battery data pair.""" + + def __init__( + self, inv_bat_pair: InvBatPair, duration: timedelta = timedelta(hours=2.5) + ) -> None: + """Initialize a CacheEntry instance. + + Args: + inv_bat_pair: the inverter and adjacent battery data pair to cache. + duration: the duration of the cache entry. + """ + self.inv_bat_pair = inv_bat_pair + self._expiry_time = datetime.now(tz=timezone.utc) + duration + + def is_expired(self) -> bool: + """Check whether the cache entry has expired. + + Returns: + whether the cache entry has expired. + """ + return datetime.now(tz=timezone.utc) >= self._expiry_time + + @actor class PowerDistributingActor: # pylint: disable=too-many-instance-attributes @@ -211,6 +240,10 @@ def __init__( max_data_age_sec=10.0, ) + self._cached_metrics: dict[int, _CacheEntry | None] = { + bat_id: None for bat_id, _ in self._bat_inv_map.items() + } + def _create_users_tasks(self) -> List[asyncio.Task[None]]: """For each user create a task to wait for request. @@ -224,7 +257,7 @@ def _create_users_tasks(self) -> List[asyncio.Task[None]]: ) return tasks - def _get_upper_bound(self, batteries: Set[int]) -> float: + def _get_upper_bound(self, batteries: Set[int], force_use_all: bool) -> float: """Get total upper bound of power to be set for given batteries. Note, output of that function doesn't guarantee that this bound will be @@ -232,17 +265,20 @@ def _get_upper_bound(self, batteries: Set[int]) -> float: Args: batteries: List of batteries + force_use_all: whether all batteries in the power request must be used. Returns: Upper bound for `set_power` operation. """ - pairs_data: List[InvBatPair] = self._get_components_data(batteries) + pairs_data: List[InvBatPair] = self._get_components_data( + batteries, force_use_all + ) return sum( min(battery.power_upper_bound, inverter.active_power_upper_bound) for battery, inverter in pairs_data ) - def _get_lower_bound(self, batteries: Set[int]) -> float: + def _get_lower_bound(self, batteries: Set[int], force_use_all: bool) -> float: """Get total lower bound of power to be set for given batteries. Note, output of that function doesn't guarantee that this bound will be @@ -250,11 +286,14 @@ def _get_lower_bound(self, batteries: Set[int]) -> float: Args: batteries: List of batteries + force_use_all: whether all batteries in the power request must be used. Returns: Lower bound for `set_power` operation. """ - pairs_data: List[InvBatPair] = self._get_components_data(batteries) + pairs_data: List[InvBatPair] = self._get_components_data( + batteries, force_use_all + ) return sum( max(battery.power_lower_bound, inverter.active_power_lower_bound) for battery, inverter in pairs_data @@ -282,21 +321,19 @@ async def run(self) -> None: try: pairs_data: List[InvBatPair] = self._get_components_data( - request.batteries + request.batteries, request.force ) except KeyError as err: await user.channel.send(Error(request=request, msg=str(err))) continue - if len(pairs_data) == 0: + if not pairs_data and not request.force: error_msg = f"No data for the given batteries {str(request.batteries)}" await user.channel.send(Error(request=request, msg=str(error_msg))) continue try: - distribution = self.distribution_algorithm.distribute_power( - request.power, pairs_data - ) + distribution = self._get_power_distribution(request, pairs_data) except ValueError as err: error_msg = f"Couldn't distribute power, error: {str(err)}" await user.channel.send(Error(request=request, msg=str(error_msg))) @@ -379,6 +416,52 @@ async def _set_distributed_power( return self._parse_result(tasks, distribution.distribution, timeout_sec) + def _get_power_distribution( + self, request: Request, inv_bat_pairs: List[InvBatPair] + ) -> DistributionResult: + """Get power distribution result for the batteries in the request. + + Args: + request: the power request to process. + inv_bat_pairs: the battery and adjacent inverter data pairs. + + Returns: + the power distribution result. + """ + + def distribute_power_equally( + power: float, batteries: set[int] + ) -> DistributionResult: + power_per_battery = power / len(batteries) + return DistributionResult( + distribution={ + self._bat_inv_map[battery_id]: power_per_battery + for battery_id in batteries + }, + remaining_power=0.0, + ) + + if request.force and not inv_bat_pairs: + return distribute_power_equally(request.power, request.batteries) + + available_bat_ids = {battery.component_id for battery, _ in inv_bat_pairs} + unavailable_bat_ids = request.batteries - available_bat_ids + + result = self.distribution_algorithm.distribute_power( + request.power, inv_bat_pairs + ) + + if request.force and unavailable_bat_ids: + additional_result = distribute_power_equally( + result.remaining_power, unavailable_bat_ids + ) + + for inv_id, power in additional_result.distribution.items(): + result.distribution[inv_id] = power + result.remaining_power = 0.0 + + return result + def _check_request(self, request: Request) -> Optional[Result]: """Check whether the given request if correct. @@ -388,7 +471,7 @@ def _check_request(self, request: Request) -> Optional[Result]: Returns: Result for the user if the request is wrong, None otherwise. """ - if len(request.batteries) == 0: + if not request.batteries: return Error(request=request, msg="Empty battery IDs in the request") for battery in request.batteries: @@ -401,11 +484,11 @@ def _check_request(self, request: Request) -> Optional[Result]: if not request.adjust_power: if request.power < 0: - bound = self._get_lower_bound(request.batteries) + bound = self._get_lower_bound(request.batteries, request.force) if request.power < bound: return OutOfBound(request=request, bound=bound) else: - bound = self._get_upper_bound(request.batteries) + bound = self._get_upper_bound(request.batteries, request.force) if request.power > bound: return OutOfBound(request=request, bound=bound) @@ -554,11 +637,14 @@ def _get_components_pairs( return bat_inv_map, inv_bat_map - def _get_components_data(self, batteries: Set[int]) -> List[InvBatPair]: + def _get_components_data( + self, batteries: Set[int], force_use_all: bool + ) -> List[InvBatPair]: """Get data for the given batteries and adjacent inverters. Args: batteries: Batteries that needs data. + force_use_all: whether all batteries in the power request must be used. Raises: KeyError: If any battery in the given list doesn't exists in microgrid. @@ -568,7 +654,9 @@ def _get_components_data(self, batteries: Set[int]) -> List[InvBatPair]: """ pairs_data: List[InvBatPair] = [] working_batteries = ( - self._all_battery_status.get_working_batteries(batteries) or batteries + batteries + if force_use_all + else self._all_battery_status.get_working_batteries(batteries) ) for battery_id in working_batteries: @@ -581,6 +669,12 @@ def _get_components_data(self, batteries: Set[int]) -> List[InvBatPair]: inverter_id: int = self._bat_inv_map[battery_id] data = self._get_battery_inverter_data(battery_id, inverter_id) + if not data and force_use_all: + cached_entry = self._cached_metrics[battery_id] + if cached_entry and not cached_entry.is_expired(): + data = cached_entry.inv_bat_pair + else: + data = None if data is None: _logger.warning( "Skipping battery %d because its message isn't correct.", @@ -648,7 +742,9 @@ def _get_battery_inverter_data( # If all values are ok then return them. if not any(map(isnan, replaceable_metrics)): - return InvBatPair(battery_data, inverter_data) + inv_bat_pair = InvBatPair(battery_data, inverter_data) + self._cached_metrics[battery_id] = _CacheEntry(inv_bat_pair) + return inv_bat_pair # Replace NaN with the corresponding value in the adjacent component. # If both metrics are None, return None to ignore this battery. @@ -670,10 +766,12 @@ def _get_battery_inverter_data( elif isnan(inv_bound): inverter_new_metrics[inv_attr] = bat_bound - return InvBatPair( + inv_bat_pair = InvBatPair( replace(battery_data, **battery_new_metrics), replace(inverter_data, **inverter_new_metrics), ) + self._cached_metrics[battery_id] = _CacheEntry(inv_bat_pair) + return inv_bat_pair async def _create_channels(self) -> None: """Create channels to get data of components in microgrid.""" diff --git a/src/frequenz/sdk/actor/power_distributing/request.py b/src/frequenz/sdk/actor/power_distributing/request.py index abf852628..a3edec52a 100644 --- a/src/frequenz/sdk/actor/power_distributing/request.py +++ b/src/frequenz/sdk/actor/power_distributing/request.py @@ -29,3 +29,6 @@ class Request: If `False` and the power is outside the batteries' bounds, the request will fail and be replied to with an `OutOfBound` result. """ + + force: bool = False + """Whether to force the power request regardless the status of components.""" diff --git a/tests/actor/test_power_distributing.py b/tests/actor/test_power_distributing.py index 81d72e546..8afb8c7da 100644 --- a/tests/actor/test_power_distributing.py +++ b/tests/actor/test_power_distributing.py @@ -771,6 +771,7 @@ async def test_use_all_batteries_none_is_working( power=1200.0, batteries={106, 206}, request_timeout_sec=SAFETY_TIMEOUT, + force=True, ) await channel.client_handle.send(request) @@ -790,3 +791,196 @@ async def test_use_all_batteries_none_is_working( assert result.request == request await distributor._stop_actor() + + async def test_force_request_a_battery_is_not_working( + self, mocker: MockerFixture + ) -> None: + """Test force request when a battery is not working.""" + await self.init_mock_microgrid(mocker) + + mocker.patch("asyncio.sleep", new_callable=AsyncMock) + + batteries = {106, 206} + + attrs = {"get_working_batteries.return_value": batteries - {106}} + mocker.patch( + "frequenz.sdk.actor.power_distributing.power_distributing.BatteryPoolStatus", + return_value=MagicMock(spec=BatteryPoolStatus, **attrs), + ) + + channel = Bidirectional[Request, Result]("user1", "power_distributor") + battery_status_channel = Broadcast[BatteryStatus]("battery_status") + distributor = PowerDistributingActor( + {"user1": channel.service_handle}, + battery_status_sender=battery_status_channel.new_sender(), + ) + + request = Request( + power=1200.0, + batteries=batteries, + request_timeout_sec=SAFETY_TIMEOUT, + force=True, + ) + + await channel.client_handle.send(request) + + done, pending = await asyncio.wait( + [asyncio.create_task(channel.client_handle.receive())], + timeout=SAFETY_TIMEOUT, + ) + + assert len(pending) == 0 + assert len(done) == 1 + result = done.pop().result() + assert isinstance(result, Success) + assert result.succeeded_batteries == {106, 206} + assert result.excess_power == approx(200.0) + assert result.succeeded_power == approx(1000.0) + assert result.request == request + + await distributor._stop_actor() + + async def test_force_request_battery_nan_value_non_cached( + self, mocker: MockerFixture + ) -> None: + """Test battery with NaN in SoC, capacity or power is used if request is forced.""" + mock_microgrid = await self.init_mock_microgrid(mocker) + + mocker.patch("asyncio.sleep", new_callable=AsyncMock) + + batteries = {106, 206} + + attrs = {"get_working_batteries.return_value": batteries} + mocker.patch( + "frequenz.sdk.actor.power_distributing.power_distributing.BatteryPoolStatus", + return_value=MagicMock(spec=BatteryPoolStatus, **attrs), + ) + + channel = Bidirectional[Request, Result]("user1", "power_distributor") + battery_status_channel = Broadcast[BatteryStatus]("battery_status") + distributor = PowerDistributingActor( + {"user1": channel.service_handle}, + battery_status_sender=battery_status_channel.new_sender(), + ) + + request = Request( + power=1200.0, + batteries=batteries, + request_timeout_sec=SAFETY_TIMEOUT, + force=True, + ) + + batteries_data = ( + battery_msg( + 106, + soc=Metric(float("NaN"), Bound(20, 80)), + capacity=Metric(float("NaN")), + power=Bound(-1000, 1000), + ), + battery_msg( + 206, + soc=Metric(40, Bound(20, 80)), + capacity=Metric(float("NaN")), + power=Bound(-1000, 1000), + ), + ) + + for battery in batteries_data: + await mock_microgrid.send(battery) + + await channel.client_handle.send(request) + + done, pending = await asyncio.wait( + [asyncio.create_task(channel.client_handle.receive())], + timeout=SAFETY_TIMEOUT, + ) + + assert len(pending) == 0 + assert len(done) == 1 + result: Result = done.pop().result() + assert isinstance(result, Success) + assert result.succeeded_batteries == batteries + assert result.succeeded_power == approx(1199.9999) + assert result.excess_power == approx(0.0) + assert result.request == request + + await distributor._stop_actor() + + async def test_force_request_batteries_nan_values_cached( + self, mocker: MockerFixture + ) -> None: + """Test battery with NaN in SoC, capacity or power is used if request is forced.""" + mock_microgrid = await self.init_mock_microgrid(mocker) + + mocker.patch("asyncio.sleep", new_callable=AsyncMock) + + batteries = {106, 206, 306} + + attrs = {"get_working_batteries.return_value": batteries} + mocker.patch( + "frequenz.sdk.actor.power_distributing.power_distributing.BatteryPoolStatus", + return_value=MagicMock(spec=BatteryPoolStatus, **attrs), + ) + + channel = Bidirectional[Request, Result]("user1", "power_distributor") + battery_status_channel = Broadcast[BatteryStatus]("battery_status") + distributor = PowerDistributingActor( + {"user1": channel.service_handle}, + battery_status_sender=battery_status_channel.new_sender(), + ) + + request = Request( + power=1200.0, + batteries=batteries, + request_timeout_sec=SAFETY_TIMEOUT, + force=True, + ) + + async def test_result() -> None: + done, pending = await asyncio.wait( + [asyncio.create_task(channel.client_handle.receive())], + timeout=SAFETY_TIMEOUT, + ) + assert len(pending) == 0 + assert len(done) == 1 + result: Result = done.pop().result() + assert isinstance(result, Success) + assert result.succeeded_batteries == batteries + assert result.succeeded_power == approx(1199.9999) + assert result.excess_power == approx(0.0) + assert result.request == request + + batteries_data = ( + battery_msg( + 106, + soc=Metric(float("NaN"), Bound(20, 80)), + capacity=Metric(98000), + power=Bound(-1000, 1000), + ), + battery_msg( + 206, + soc=Metric(40, Bound(20, 80)), + capacity=Metric(float("NaN")), + power=Bound(-1000, 1000), + ), + battery_msg( + 306, + soc=Metric(40, Bound(20, 80)), + capacity=Metric(float(98000)), + power=Bound(float("NaN"), float("NaN")), + ), + ) + + # This request is needed to set the battery metrics cache to have valid + # metrics so that the distribution algorithm can be used in the next + # request where the batteries report NaN in the metrics. + await channel.client_handle.send(request) + await test_result() + + for battery in batteries_data: + await mock_microgrid.send(battery) + + await channel.client_handle.send(request) + await test_result() + + await distributor._stop_actor()