diff --git a/src/frequenz/sdk/actor/power_distributing/_component_status/__init__.py b/src/frequenz/sdk/actor/power_distributing/_component_status/__init__.py index 8fb33415c..929cbc726 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_status/__init__.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_status/__init__.py @@ -11,6 +11,7 @@ ComponentStatusTracker, SetPowerResult, ) +from ._ev_charger_status_tracker import EVChargerStatusTracker __all__ = [ "BatteryStatusTracker", @@ -18,5 +19,6 @@ "ComponentStatus", "ComponentStatusEnum", "ComponentStatusTracker", + "EVChargerStatusTracker", "SetPowerResult", ] diff --git a/src/frequenz/sdk/actor/power_distributing/_component_status/_ev_charger_status_tracker.py b/src/frequenz/sdk/actor/power_distributing/_component_status/_ev_charger_status_tracker.py new file mode 100644 index 000000000..1f887e8e3 --- /dev/null +++ b/src/frequenz/sdk/actor/power_distributing/_component_status/_ev_charger_status_tracker.py @@ -0,0 +1,204 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Background service that tracks the status of an EV charger.""" + + +import asyncio +import logging +from datetime import datetime, timedelta, timezone + +from frequenz.channels import Receiver, Sender, select, selected_from +from frequenz.channels.timer import SkipMissedAndDrift, Timer +from frequenz.client.microgrid import ( + EVChargerCableState, + EVChargerComponentState, + EVChargerData, +) +from typing_extensions import override + +from frequenz.sdk.microgrid import connection_manager + +from ..._background_service import BackgroundService +from ._blocking_status import BlockingStatus +from ._component_status import ( + ComponentStatus, + ComponentStatusEnum, + ComponentStatusTracker, + SetPowerResult, +) + +_logger = logging.getLogger(__name__) + + +class EVChargerStatusTracker(ComponentStatusTracker, BackgroundService): + """Status tracker for EV chargers. + + It reports an EV charger as `WORKING` when an EV is connected to it, + and power can be allocated to it, and `NOT_WORKING` otherwise. + + If it receives a power assignment failure from the PowerDistributor, + when the component is expected to be `WORKING`, it is marked as + `UNCERTAIN` for a specific interval, before being marked `WORKING` + again. + """ + + @override + def __init__( # pylint: disable=too-many-arguments + self, + component_id: int, + max_data_age: timedelta, + max_blocking_duration: timedelta, + status_sender: Sender[ComponentStatus], + set_power_result_receiver: Receiver[SetPowerResult], + ) -> None: + """Create an `EVChargerStatusTracker` instance. + + Args: + component_id: ID of the EV charger to monitor the status of. + max_data_age: max duration to wait for, before marking a component as + NOT_WORKING, unless new data arrives. + max_blocking_duration: duration for which the component status should be + UNCERTAIN if a request to the component failed unexpectedly. + status_sender: Channel sender to send status updates to. + set_power_result_receiver: Receiver to fetch PowerDistributor responses + from, to get the status of the most recent request made for an EV + Charger. + """ + self._component_id = component_id + self._max_data_age = max_data_age + self._status_sender = status_sender + self._set_power_result_receiver = set_power_result_receiver + + self._last_status = ComponentStatusEnum.NOT_WORKING + self._blocking_status = BlockingStatus( + min_duration=timedelta(seconds=1.0), + max_duration=max_blocking_duration, + ) + + BackgroundService.__init__(self, name=f"EVChargerStatusTracker({component_id})") + + @override + def start(self) -> None: + """Start the status tracker.""" + self._tasks.add(asyncio.create_task(self._run_forever())) + + def _is_working(self, ev_data: EVChargerData) -> bool: + """Return whether the given data indicates that the component is working.""" + return ev_data.cable_state in ( + EVChargerCableState.EV_PLUGGED, + EVChargerCableState.EV_LOCKED, + ) and ev_data.component_state in ( + EVChargerComponentState.READY, + EVChargerComponentState.CHARGING, + EVChargerComponentState.DISCHARGING, + ) + + def _is_stale(self, ev_data: EVChargerData) -> bool: + """Return whether the given data is stale.""" + now = datetime.now(tz=timezone.utc) + stale = now - ev_data.timestamp > self._max_data_age + return stale + + async def _run_forever(self) -> None: + """Run the status tracker forever.""" + while True: + try: + await self._run() + except Exception: # pylint: disable=broad-except + _logger.exception( + "Restarting after exception in EVChargerStatusTracker.run()" + ) + await asyncio.sleep(1.0) + + def _handle_ev_data(self, ev_data: EVChargerData) -> ComponentStatusEnum: + """Handle new EV charger data.""" + if self._is_stale(ev_data): + if self._last_status == ComponentStatusEnum.WORKING: + _logger.warning( + "EV charger %s data is stale. Last timestamp: %s", + self._component_id, + ev_data.timestamp, + ) + return ComponentStatusEnum.NOT_WORKING + + if self._is_working(ev_data): + if self._last_status == ComponentStatusEnum.NOT_WORKING: + _logger.warning( + "EV charger %s is in WORKING state.", + self._component_id, + ) + return ComponentStatusEnum.WORKING + + if self._last_status == ComponentStatusEnum.WORKING: + _logger.warning( + "EV charger %s is in NOT_WORKING state. " + "Cable state: %s, component state: %s", + self._component_id, + ev_data.cable_state, + ev_data.component_state, + ) + return ComponentStatusEnum.NOT_WORKING + + def _handle_set_power_result( + self, set_power_result: SetPowerResult + ) -> ComponentStatusEnum: + """Handle a new set power result.""" + if self._component_id in set_power_result.succeeded: + return ComponentStatusEnum.WORKING + + self._blocking_status.block() + if self._last_status == ComponentStatusEnum.WORKING: + _logger.warning( + "EV charger %s is in UNCERTAIN state. Set power result: %s", + self._component_id, + set_power_result, + ) + return ComponentStatusEnum.UNCERTAIN + + async def _run(self) -> None: + """Run the status tracker.""" + api_client = connection_manager.get().api_client + ev_data_rx = await api_client.ev_charger_data(self._component_id) + set_power_result_rx = self._set_power_result_receiver + missing_data_timer = Timer(self._max_data_age, SkipMissedAndDrift()) + + # Send initial status + await self._status_sender.send( + ComponentStatus(self._component_id, self._last_status) + ) + + async for selected in select( + ev_data_rx, set_power_result_rx, missing_data_timer + ): + new_status = ComponentStatusEnum.NOT_WORKING + if selected_from(selected, ev_data_rx): + missing_data_timer.reset() + new_status = self._handle_ev_data(selected.message) + elif selected_from(selected, set_power_result_rx): + new_status = self._handle_set_power_result(selected.message) + elif selected_from(selected, missing_data_timer): + _logger.warning( + "No EV charger %s data received for %s. Setting status to NOT_WORKING.", + self._component_id, + self._max_data_age, + ) + + # Send status update if status changed + if ( + self._blocking_status.is_blocked() + and new_status != ComponentStatusEnum.NOT_WORKING + ): + new_status = ComponentStatusEnum.UNCERTAIN + + if new_status != self._last_status: + _logger.info( + "EV charger %s status changed from %s to %s", + self._component_id, + self._last_status, + new_status, + ) + self._last_status = new_status + await self._status_sender.send( + ComponentStatus(self._component_id, new_status) + ) diff --git a/tests/actor/power_distributing/_component_status/test_battery_status.py b/tests/actor/power_distributing/_component_status/test_battery_status.py index 609b9bbba..792df2a57 100644 --- a/tests/actor/power_distributing/_component_status/test_battery_status.py +++ b/tests/actor/power_distributing/_component_status/test_battery_status.py @@ -1,5 +1,6 @@ # License: MIT # Copyright © 2023 Frequenz Energy-as-a-Service GmbH + """Tests for BatteryStatusTracker.""" # pylint: disable=too-many-lines @@ -38,6 +39,7 @@ from tests.timeseries.mock_microgrid import MockMicrogrid from ....utils.component_data_wrapper import BatteryDataWrapper, InverterDataWrapper +from ....utils.receive_timeout import Timeout, receive_timeout def battery_data( # pylint: disable=too-many-arguments @@ -123,26 +125,6 @@ class Message(Generic[T]): INVERTER_ID = 8 -class _Timeout: - """Sentinel for timeout.""" - - -async def recv_timeout(recv: Receiver[T], timeout: float = 0.1) -> T | type[_Timeout]: - """Receive message from receiver with timeout. - - Args: - recv: Receiver to receive message from. - timeout: Timeout in seconds. - - Returns: - Received message or _Timeout if timeout is reached. - """ - try: - return await asyncio.wait_for(recv.receive(), timeout=timeout) - except asyncio.TimeoutError: - return _Timeout - - # pylint: disable=protected-access, unused-argument class TestBatteryStatus: """Tests BatteryStatusTracker.""" @@ -952,7 +934,7 @@ async def test_critical_error( # --- battery warning error (keeps working) --- await self._send_healthy_inverter(mock_microgrid) await self._send_warning_error_battery(mock_microgrid) - assert await recv_timeout(status_receiver, timeout=0.1) is _Timeout + assert await receive_timeout(status_receiver, timeout=0.1) is Timeout await self._send_healthy_battery(mock_microgrid) await self._send_healthy_inverter(mock_microgrid) @@ -971,7 +953,7 @@ async def test_critical_error( # --- inverter warning error (keeps working) --- await self._send_healthy_battery(mock_microgrid) await self._send_warning_error_inverter(mock_microgrid) - assert await recv_timeout(status_receiver, timeout=0.1) is _Timeout + assert await receive_timeout(status_receiver, timeout=0.1) is Timeout await self._send_healthy_battery(mock_microgrid) await self._send_healthy_inverter(mock_microgrid) @@ -1023,11 +1005,11 @@ async def test_stale_data( # --- stale battery data --- await self._send_healthy_inverter(mock_microgrid) await self._send_healthy_battery(mock_microgrid, timestamp) - assert await recv_timeout(status_receiver) is _Timeout + assert await receive_timeout(status_receiver) is Timeout await self._send_healthy_inverter(mock_microgrid) await self._send_healthy_battery(mock_microgrid, timestamp) - assert await recv_timeout(status_receiver, 0.3) == ComponentStatus( + assert await receive_timeout(status_receiver, 0.3) == ComponentStatus( BATTERY_ID, ComponentStatusEnum.NOT_WORKING ) @@ -1039,11 +1021,11 @@ async def test_stale_data( # --- stale inverter data --- await self._send_healthy_battery(mock_microgrid) await self._send_healthy_inverter(mock_microgrid, timestamp) - assert await recv_timeout(status_receiver) is _Timeout + assert await receive_timeout(status_receiver) is Timeout await self._send_healthy_battery(mock_microgrid) await self._send_healthy_inverter(mock_microgrid, timestamp) - assert await recv_timeout(status_receiver, 0.3) == ComponentStatus( + assert await receive_timeout(status_receiver, 0.3) == ComponentStatus( BATTERY_ID, ComponentStatusEnum.NOT_WORKING ) diff --git a/tests/actor/power_distributing/_component_status/test_ev_charger_status.py b/tests/actor/power_distributing/_component_status/test_ev_charger_status.py new file mode 100644 index 000000000..925f70169 --- /dev/null +++ b/tests/actor/power_distributing/_component_status/test_ev_charger_status.py @@ -0,0 +1,165 @@ +# License: MIT +# Copyright © 2023 Frequenz Energy-as-a-Service GmbH + +"""Tests for EVChargerStatusTracker.""" + +import asyncio +from datetime import datetime, timedelta, timezone + +from frequenz.channels import Broadcast +from frequenz.client.microgrid import EVChargerCableState, EVChargerComponentState +from pytest_mock import MockerFixture + +from frequenz.sdk._internal._asyncio import cancel_and_await +from frequenz.sdk.actor.power_distributing._component_status import ( + ComponentStatus, + ComponentStatusEnum, + EVChargerStatusTracker, + SetPowerResult, +) + +from ....timeseries.mock_microgrid import MockMicrogrid +from ....utils.component_data_wrapper import EvChargerDataWrapper +from ....utils.receive_timeout import Timeout, receive_timeout + +_EV_CHARGER_ID = 6 + + +class TestEVChargerStatusTracker: + """Tests for EVChargerStatusTracker.""" + + async def test_status_changes(self, mocker: MockerFixture) -> None: + """Test that the status changes as expected.""" + mock_microgrid = MockMicrogrid(grid_meter=True, mocker=mocker) + mock_microgrid.add_ev_chargers(3) + + status_channel = Broadcast[ComponentStatus](name="battery_status") + set_power_result_channel = Broadcast[SetPowerResult](name="set_power_result") + set_power_result_sender = set_power_result_channel.new_sender() + + async with ( + mock_microgrid, + EVChargerStatusTracker( + component_id=_EV_CHARGER_ID, + max_data_age=timedelta(seconds=0.2), + max_blocking_duration=timedelta(seconds=1), + status_sender=status_channel.new_sender(), + set_power_result_receiver=set_power_result_channel.new_receiver(), + ), + ): + status_receiver = status_channel.new_receiver() + # The status is initially not working. + assert ( + await status_receiver.receive() + ).value == ComponentStatusEnum.NOT_WORKING + + # When an EV is plugged, it is working + await mock_microgrid.mock_client.send( + EvChargerDataWrapper( + _EV_CHARGER_ID, + datetime.now(tz=timezone.utc), + active_power=0.0, + component_state=EVChargerComponentState.READY, + cable_state=EVChargerCableState.EV_PLUGGED, + ) + ) + assert await receive_timeout(status_receiver) == ComponentStatus( + _EV_CHARGER_ID, ComponentStatusEnum.WORKING + ) + + # When an EV is locked, no change in status + await mock_microgrid.mock_client.send( + EvChargerDataWrapper( + _EV_CHARGER_ID, + datetime.now(tz=timezone.utc), + active_power=0.0, + component_state=EVChargerComponentState.READY, + cable_state=EVChargerCableState.EV_LOCKED, + ) + ) + assert await receive_timeout(status_receiver) is Timeout + + # When an EV is unplugged, it is not working + await mock_microgrid.mock_client.send( + EvChargerDataWrapper( + _EV_CHARGER_ID, + datetime.now(tz=timezone.utc), + active_power=0.0, + component_state=EVChargerComponentState.READY, + cable_state=EVChargerCableState.UNPLUGGED, + ) + ) + assert await receive_timeout(status_receiver) == ComponentStatus( + _EV_CHARGER_ID, ComponentStatusEnum.NOT_WORKING + ) + + # Get it back to working again + await mock_microgrid.mock_client.send( + EvChargerDataWrapper( + _EV_CHARGER_ID, + datetime.now(tz=timezone.utc), + active_power=0.0, + component_state=EVChargerComponentState.READY, + cable_state=EVChargerCableState.EV_LOCKED, + ) + ) + assert await receive_timeout(status_receiver) == ComponentStatus( + _EV_CHARGER_ID, ComponentStatusEnum.WORKING + ) + + # When there's no new data, it should become not working + assert await receive_timeout(status_receiver, 0.1) is Timeout + assert await receive_timeout(status_receiver, 0.2) == ComponentStatus( + _EV_CHARGER_ID, ComponentStatusEnum.NOT_WORKING + ) + + # Get it back to working again + await asyncio.sleep(0.1) + await mock_microgrid.mock_client.send( + EvChargerDataWrapper( + _EV_CHARGER_ID, + datetime.now(tz=timezone.utc), + active_power=0.0, + component_state=EVChargerComponentState.READY, + cable_state=EVChargerCableState.EV_LOCKED, + ) + ) + assert await receive_timeout(status_receiver) == ComponentStatus( + _EV_CHARGER_ID, ComponentStatusEnum.WORKING + ) + + async def keep_sending_healthy_message() -> None: + while True: + await mock_microgrid.mock_client.send( + EvChargerDataWrapper( + _EV_CHARGER_ID, + datetime.now(tz=timezone.utc), + active_power=0.0, + component_state=EVChargerComponentState.READY, + cable_state=EVChargerCableState.EV_LOCKED, + ) + ) + await asyncio.sleep(0.1) + + _keep_sending_healthy_message_task = asyncio.create_task( + keep_sending_healthy_message() + ) + + # when there's a PowerDistributor failure for the component, status should + # become uncertain. + await set_power_result_sender.send( + SetPowerResult( + succeeded=set(), + failed={_EV_CHARGER_ID}, + ) + ) + assert await receive_timeout(status_receiver) == ComponentStatus( + _EV_CHARGER_ID, ComponentStatusEnum.UNCERTAIN + ) + + # After the blocking duration, it should become working again. + assert await receive_timeout(status_receiver) is Timeout + assert await receive_timeout(status_receiver, 1.0) == ComponentStatus( + _EV_CHARGER_ID, ComponentStatusEnum.WORKING + ) + await cancel_and_await(_keep_sending_healthy_message_task) diff --git a/tests/timeseries/mock_resampler.py b/tests/timeseries/mock_resampler.py index 02ab81628..3007e747f 100644 --- a/tests/timeseries/mock_resampler.py +++ b/tests/timeseries/mock_resampler.py @@ -197,6 +197,8 @@ def power_3_phase_senders( self._request_handler_task = task def _handle_task_done(self, task: asyncio.Task[None]) -> None: + if task.cancelled(): + return if exc := task.exception(): raise SystemExit(f"Task {task.get_name()!r} failed: {exc}") from exc diff --git a/tests/utils/receive_timeout.py b/tests/utils/receive_timeout.py new file mode 100644 index 000000000..ce768312b --- /dev/null +++ b/tests/utils/receive_timeout.py @@ -0,0 +1,31 @@ +# License: MIT +# Copyright © 2023 Frequenz Energy-as-a-Service GmbH + +"""Utility for receiving messages with timeout.""" + +import asyncio +from typing import TypeVar + +from frequenz.channels import Receiver + +T = TypeVar("T") + + +class Timeout: + """Sentinel for timeout.""" + + +async def receive_timeout(recv: Receiver[T], timeout: float = 0.1) -> T | type[Timeout]: + """Receive message from receiver with timeout. + + Args: + recv: Receiver to receive message from. + timeout: Timeout in seconds. + + Returns: + Received message or Timeout if timeout is reached. + """ + try: + return await asyncio.wait_for(recv.receive(), timeout=timeout) + except asyncio.TimeoutError: + return Timeout