diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index ecaeaff7a..1579f0254 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -16,6 +16,8 @@ - Support for per-component interaction in `EVChargerPool` has been removed. +- New `propose_power` and `power_status` methods have been added to the `EVChargerPool` similar to the `BatteryPool`. These method interface with the `PowerManager` and `PowerDistributor`, which currently uses a first-come-first-serve algorithm to distribute power to EVs. + ## New Features diff --git a/benchmarks/power_distribution/power_distributor.py b/benchmarks/power_distribution/power_distributor.py index 2360a938e..6b188cb44 100644 --- a/benchmarks/power_distribution/power_distributor.py +++ b/benchmarks/power_distribution/power_distributor.py @@ -119,6 +119,7 @@ async def run_test( # pylint: disable=too-many-locals requests_receiver=power_request_channel.new_receiver(), results_sender=power_result_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), + wait_for_data_sec=2.0, ): tasks: list[Coroutine[Any, Any, list[Result]]] = [] tasks.append(send_requests(batteries, num_requests)) diff --git a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py index 38ba40ced..a5e9570e6 100644 --- a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py +++ b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py @@ -133,17 +133,22 @@ def _add_bounds_tracker(self, component_ids: frozenset[int]) -> None: microgrid, ) - if self._component_category is not ComponentCategory.BATTERY: + bounds_receiver: Receiver[SystemBounds] + # pylint: disable=protected-access + if self._component_category is ComponentCategory.BATTERY: + battery_pool = microgrid.battery_pool(component_ids) + bounds_receiver = battery_pool._system_power_bounds.new_receiver() + elif self._component_category is ComponentCategory.EV_CHARGER: + ev_charger_pool = microgrid.ev_charger_pool(component_ids) + bounds_receiver = ev_charger_pool._system_power_bounds.new_receiver() + # pylint: enable=protected-access + else: err = ( "PowerManagingActor: Unsupported component category: " f"{self._component_category}" ) _logger.error(err) raise NotImplementedError(err) - battery_pool = microgrid.battery_pool(component_ids) - # pylint: disable=protected-access - bounds_receiver = battery_pool._system_power_bounds.new_receiver() - # pylint: enable=protected-access self._system_bounds[component_ids] = SystemBounds( timestamp=datetime.now(tz=timezone.utc), diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/__init__.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/__init__.py index d9cb07266..0ec7d84be 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_managers/__init__.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/__init__.py @@ -5,8 +5,10 @@ from ._battery_manager import BatteryManager from ._component_manager import ComponentManager +from ._ev_charger_manager import EVChargerManager __all__ = [ "BatteryManager", "ComponentManager", + "EVChargerManager", ] diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py index 41d621d98..8b5f1cba0 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py @@ -127,8 +127,18 @@ class BatteryManager(ComponentManager): def __init__( self, component_pool_status_sender: Sender[ComponentPoolStatus], + results_sender: Sender[Result], ): - """Initialize the battery data manager.""" + """Initialize this instance. + + Args: + component_pool_status_sender: Channel sender to send the status of the + battery pool to. This status is used by the battery pool metric + streams, to dynamically adjust the values based on the health of the + individual batteries. + results_sender: Channel sender to send the power distribution results to. + """ + self._results_sender = results_sender self._batteries = connection_manager.get().component_graph.components( component_categories={ComponentCategory.BATTERY} ) @@ -181,20 +191,18 @@ async def stop(self) -> None: await self._component_pool_status_tracker.stop() @override - async def distribute_power(self, request: Request) -> Result: + async def distribute_power(self, request: Request) -> None: """Distribute the requested power to the components. Args: request: Request to get the distribution for. - - Returns: - Result of the distribution. """ distribution_result = await self._get_distribution(request) if not isinstance(distribution_result, DistributionResult): - return distribution_result - result = await self._distribute_power(request, distribution_result) - return result + result = distribution_result + else: + result = await self._distribute_power(request, distribution_result) + await self._results_sender.send(result) async def _get_distribution(self, request: Request) -> DistributionResult | Result: """Get the distribution of the batteries. diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_component_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_component_manager.py index 372583c0a..f9eba111e 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_managers/_component_manager.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_component_manager.py @@ -20,12 +20,14 @@ class ComponentManager(abc.ABC): def __init__( self, component_pool_status_sender: Sender[ComponentPoolStatus], + results_sender: Sender[Result], ): """Initialize the component data manager. Args: component_pool_status_sender: Channel for sending information about which components are expected to be working. + results_sender: Channel for sending the results of power distribution. """ @abc.abstractmethod @@ -37,7 +39,7 @@ async def start(self) -> None: """Start the component data manager.""" @abc.abstractmethod - async def distribute_power(self, request: Request) -> Result: + async def distribute_power(self, request: Request) -> None: """Distribute the requested power to the components. Args: diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/__init__.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/__init__.py new file mode 100644 index 000000000..e0e4beca7 --- /dev/null +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/__init__.py @@ -0,0 +1,10 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Manage ev chargers for the power distributor.""" + +from ._ev_charger_manager import EVChargerManager + +__all__ = [ + "EVChargerManager", +] diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_config.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_config.py new file mode 100644 index 000000000..7b66a9f70 --- /dev/null +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_config.py @@ -0,0 +1,27 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Configuration for the power distributor's EV charger manager.""" + +from collections import abc +from dataclasses import dataclass, field +from datetime import timedelta + +from .....timeseries import Current + + +@dataclass(frozen=True) +class EVDistributionConfig: + """Configuration for the power distributor's EV charger manager.""" + + component_ids: abc.Set[int] + """The component ids of the EV chargers.""" + + min_current: Current = field(default_factory=lambda: Current.from_amperes(6.0)) + """The minimum current that can be allocated to an EV charger.""" + + initial_current: Current = field(default_factory=lambda: Current.from_amperes(10.0)) + """The initial current that can be allocated to an EV charger.""" + + increase_power_interval: timedelta = timedelta(seconds=60) + """The interval at which the power can be increased for an EV charger.""" diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py new file mode 100644 index 000000000..d7e95637e --- /dev/null +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py @@ -0,0 +1,480 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Manage EV chargers for the power distributor.""" + +import asyncio +import collections.abc +import logging +from datetime import datetime, timedelta, timezone + +import grpc +from frequenz.channels import Broadcast, Sender, merge, select, selected_from +from frequenz.channels.timer import SkipMissedAndDrift, Timer +from frequenz.client.microgrid import ApiClient, ComponentCategory, EVChargerData +from typing_extensions import override + +from frequenz.sdk import microgrid + +from ....._internal._channels import LatestValueCache +from ....._internal._math import is_close_to_zero +from .....timeseries import Power, Sample3Phase, Voltage +from ..._component_pool_status_tracker import ComponentPoolStatusTracker + +# from .._component_pool_status_tracker import ComponentPoolStatusTracker +from ..._component_status import ComponentPoolStatus, EVChargerStatusTracker +from ...request import Request +from ...result import PartialFailure, Result, Success +from .._component_manager import ComponentManager +from ._config import EVDistributionConfig +from ._states import EvcState, EvcStates + +_logger = logging.getLogger(__name__) + +_DEFAULT_API_REQUEST_TIMEOUT = timedelta(seconds=5.0) +_TGT_POWER_RESEND_INTERVAL = timedelta(seconds=5.0) + + +class EVChargerManager(ComponentManager): + """Manage ev chargers for the power distributor.""" + + @override + def __init__( + self, + component_pool_status_sender: Sender[ComponentPoolStatus], + results_sender: Sender[Result], + ): + """Initialize the ev charger data manager. + + Args: + component_pool_status_sender: Channel for sending information about which + components are expected to be working. + results_sender: Channel for sending results of power distribution. + """ + self._results_sender = results_sender + self._ev_charger_ids = self._get_ev_charger_ids() + self._evc_states = EvcStates() + self._voltage_cache: LatestValueCache[Sample3Phase[Voltage]] = LatestValueCache( + microgrid.voltage().new_receiver() + ) + self._config = EVDistributionConfig(component_ids=self._ev_charger_ids) + self._component_pool_status_tracker = ComponentPoolStatusTracker( + component_ids=self._ev_charger_ids, + component_status_sender=component_pool_status_sender, + max_data_age=timedelta(seconds=10.0), + max_blocking_duration=timedelta(seconds=30.0), + component_status_tracker_type=EVChargerStatusTracker, + ) + self._target_power = Power.zero() + self._target_power_channel = Broadcast[Request](name="target_power") + self._target_power_tx = self._target_power_channel.new_sender() + self._task: asyncio.Task[None] | None = None + self._latest_request: Request = Request(Power.zero(), set()) + + @override + def component_ids(self) -> collections.abc.Set[int]: + """Return the set of ev charger ids.""" + return self._ev_charger_ids + + @override + async def start(self) -> None: + """Start the ev charger data manager.""" + self._task = asyncio.create_task(self._run_forever()) + + @override + async def distribute_power(self, request: Request) -> None: + """Distribute the requested power to the ev chargers. + + Args: + request: Request to get the distribution for. + """ + await self._target_power_tx.send(request) + + @override + async def stop(self) -> None: + """Stop the ev charger manager.""" + await self._component_pool_status_tracker.stop() + + def _get_ev_charger_ids(self) -> collections.abc.Set[int]: + """Return the IDs of all EV chargers present in the component graph.""" + return { + evc.component_id + for evc in microgrid.connection_manager.get().component_graph.components( + component_categories={ComponentCategory.EV_CHARGER} + ) + } + + def _allocate_new_ev(self, component_id: int) -> dict[int, Power]: + """Allocate power to a newly connected EV charger. + + Args: + component_id: ID of the EV charger to allocate power to. + + Returns: + A dictionary containing updated power allocations for the EV chargers. + """ + available_power = ( + self._target_power - self._evc_states.get_total_allocated_power() + ) + voltage = self._voltage_cache.get().min() + if voltage is None: + _logger.warning( + "Voltage data is not available. Cannot allocate power to EV charger %s", + component_id, + ) + return {} + initial_power = voltage * self._config.initial_current * 3.0 + if available_power > initial_power: + return {component_id: initial_power} + + min_power = voltage * self._config.min_current * 3.0 + if available_power > min_power: + return {component_id: min_power} + + return {} + + def _act_on_new_data(self, ev_data: EVChargerData) -> dict[int, Power]: + """Act on new data from an EV charger. + + Args: + ev_data: New data from the EV charger. + + Returns: + A dictionary containing updated power allocations for the EV chargers. + """ + component_id = ev_data.component_id + ev_connected = ev_data.is_ev_connected() + ev_previously_connected = self._evc_states.get( + component_id + ).last_data.is_ev_connected() + + # if EV is just connected, try to set config.initial_current, throttle other + # EVs if necessary + ev_newly_connected = ev_connected and not ev_previously_connected + if ev_newly_connected: + _logger.info("New EV connected to EV charger %s", component_id) + return self._allocate_new_ev(component_id) + + # if EV is disconnected, set limit to 0.0. redistribution to other EVs will + # happen separately, when possible. + if not ev_connected: + if ev_previously_connected: + _logger.info("EV disconnected from EV charger %s", component_id) + if self._evc_states.get(component_id).last_allocation > Power.zero(): + return {component_id: Power.zero()} + + # else if last throttling was less than 'increase_power_interval', do nothing. + now = datetime.now(tz=timezone.utc) + last_throttling_time = self._evc_states.get(component_id).last_reallocation_time + if last_throttling_time is not None: + dur = now - last_throttling_time + if dur < self._config.increase_power_interval: + return {} + + # if ev's target power was previously set to zero, treat it like it is newly + # connected + evc = self._evc_states.get(component_id) + if is_close_to_zero(evc.last_allocation.as_watts()): + return self._allocate_new_ev(component_id) + + # if the ev charger is already allocated the max power, do nothing + allottable_power = Power.from_watts( + evc.last_data.active_power_inclusion_upper_bound + - evc.last_allocation.as_watts() + ) + available_power = ( + self._target_power - self._evc_states.get_total_allocated_power() + ) + allottable_power = min(allottable_power, available_power) + + if ( + is_close_to_zero(allottable_power.as_watts()) + or allottable_power < Power.zero() + ): + return {} + + target_power = min( + evc.last_allocation + allottable_power, + Power.from_watts(evc.last_data.active_power_inclusion_upper_bound), + ) + _logger.debug( + "Increasing power to EV charger %s from %s to %s", + component_id, + evc.last_allocation, + target_power, + ) + return {component_id: target_power} + + async def _run_forever(self) -> None: + """Run the EV charger manager forever.""" + while True: + try: + await self._run() + except Exception: # pylint: disable=broad-except + _logger.exception("Recovering from an error in EV charger manager.") + await asyncio.sleep(1.0) + + async def _run(self) -> None: # pylint: disable=too-many-locals + """Run the main event loop of the EV charger manager.""" + api = microgrid.connection_manager.get().api_client + ev_charger_data_rx = merge( + *[await api.ev_charger_data(evc_id) for evc_id in self._ev_charger_ids] + ) + target_power_rx = self._target_power_channel.new_receiver() + api_request_timeout = _DEFAULT_API_REQUEST_TIMEOUT + resend_timer = Timer(_TGT_POWER_RESEND_INTERVAL, SkipMissedAndDrift()) + latest_target_powers: dict[int, Power] = {} + async for selected in select(ev_charger_data_rx, target_power_rx, resend_timer): + resending = False + target_power_changes = {} + now = datetime.now(tz=timezone.utc) + + if selected_from(selected, ev_charger_data_rx): + evc_data = selected.message + # If a new ev charger is added, add it to the state tracker, with + # now as the last reallocation time and last charging time. + # + # This means it won't be assigned any power until the reallocation + # duration has passed. + if evc_data.component_id not in self._evc_states: + self._evc_states.add_evc( + EvcState( + component_id=evc_data.component_id, + last_data=evc_data, + power=Power.zero(), + last_allocation=Power.zero(), + last_reallocation_time=now, + last_charging_time=now, + ) + ) + target_power_changes = {evc_data.component_id: Power.zero()} + + # See if the ev charger has room for more power, and if the last + # allocation was not in the last reallocation duration. + else: + target_power_changes = self._act_on_new_data(evc_data) + self._evc_states.get(evc_data.component_id).update_state(evc_data) + + elif selected_from(selected, target_power_rx): + self._latest_request = selected.message + api_request_timeout = selected.message.request_timeout + self._target_power = selected.message.power + _logger.debug("New target power: %s", self._target_power) + used_power = self._evc_states.get_ev_total_used_power() + allocated_power = self._evc_states.get_total_allocated_power() + if self._target_power < used_power: + diff_power = used_power - self._target_power + target_power_changes = self._throttle_ev_chargers(diff_power) + _logger.warning( + "Throttling EV chargers by %s-%s=%s: %s", + used_power, + self._target_power, + diff_power, + target_power_changes, + ) + elif self._target_power < allocated_power: + diff_power = allocated_power - self._target_power + target_power_changes = self._deallocate_unused_power(diff_power) + + elif selected_from(selected, resend_timer): + target_power_changes = latest_target_powers + resending = True + + if target_power_changes: + _logger.debug("Setting power to EV chargers: %s", target_power_changes) + else: + continue + if not resending: + for component_id, power in target_power_changes.items(): + self._evc_states.get(component_id).update_last_allocation( + power, now + ) + + latest_target_powers.update(target_power_changes) + result = await self._set_api_power( + api, target_power_changes, api_request_timeout + ) + await self._results_sender.send(result) + + async def _set_api_power( + self, + api: ApiClient, + target_power_changes: dict[int, Power], + api_request_timeout: timedelta, + ) -> Result: + """Send the EV charger power changes to the microgrid API. + + Args: + api: The microgrid API client to use for setting the power. + target_power_changes: A dictionary containing the new power allocations for + the EV chargers. + api_request_timeout: The timeout for the API request. + + Returns: + Power distribution result, corresponding to the result of the API + request. + """ + tasks: dict[int, asyncio.Task[None]] = {} + for component_id, power in target_power_changes.items(): + tasks[component_id] = asyncio.create_task( + api.set_power(component_id, power.as_watts()) + ) + _, pending = await asyncio.wait( + tasks.values(), + timeout=api_request_timeout.total_seconds(), + return_when=asyncio.ALL_COMPLETED, + ) + for task in pending: + task.cancel() + await asyncio.gather(*pending, return_exceptions=True) + + failed_components: set[int] = set() + succeeded_components: set[int] = set() + failed_power = Power.zero() + for component_id, task in tasks.items(): + exc = task.exception() + if exc is not None: + failed_components.add(component_id) + failed_power += target_power_changes[component_id] + else: + succeeded_components.add(component_id) + + match task.exception(): + case asyncio.CancelledError: + _logger.warning( + "Timeout while setting power to EV charger %s", component_id + ) + case grpc.aio.AioRpcError as err: + _logger.warning( + "Error while setting power to EV charger %s: %s", + component_id, + err, + ) + if failed_components: + return PartialFailure( + failed_components=failed_components, + succeeded_components=succeeded_components, + failed_power=failed_power, + succeeded_power=self._target_power - failed_power, + excess_power=Power.zero(), + request=self._latest_request, + ) + return Success( + succeeded_components=succeeded_components, + succeeded_power=self._target_power, + excess_power=Power.zero(), + request=self._latest_request, + ) + + def _deallocate_unused_power(self, to_deallocate: Power) -> dict[int, Power]: + """Reduce the power allocated to the EV chargers to meet the target power. + + This prioritizes reducing power to EV chargers that aren't consuming the + allocated power. + + Args: + to_deallocate: The amount of power to reduce the total allocated power by. + + Returns: + A list of new (reduced) charging current limits for a subset of ev + chargers, required to bring down the consumption by the given value. + """ + voltage = self._voltage_cache.get().min() + if voltage is None: + _logger.warning( + "Voltage data is unavailable. Can't deallocate power from EV chargers", + ) + return {} + min_power = voltage * self._config.min_current * 3.0 + + evc_list = list(self._evc_states.values()) + evc_list.sort(key=lambda st: st.last_allocation - st.power, reverse=True) + + deallocated_power = Power.zero() + target_power_changes = {} + + for evc in evc_list: + if deallocated_power >= to_deallocate: + break + ev_to_deallocate = evc.last_allocation - evc.power + if ev_to_deallocate <= Power.zero(): + continue + if ev_to_deallocate >= to_deallocate - deallocated_power: + ev_to_deallocate = to_deallocate - deallocated_power + tgt_power = evc.last_allocation - ev_to_deallocate + if tgt_power < min_power: + tgt_power = Power.zero() + deallocated_power += evc.last_allocation - tgt_power + target_power_changes[evc.component_id] = tgt_power + return target_power_changes + + def _throttle_ev_chargers( # pylint: disable=too-many-locals + self, + throttle_by: Power, + ) -> dict[int, Power]: + """Reduce EV charging power to meet the target power. + + This targets EV chargers that are currently consuming the most. + + Level 1 throttling is done by reducing the power to the minimum current required + to charge the EV. When the consumption is still above the target power, level 2 + throttling is done by reducing the power to 0. + + Args: + throttle_by: The amount of power to reduce the total EV charging power by. + + Returns: + A list of new (reduced) charging current limits for a subset of ev + chargers, required to bring down the consumption by the given value. + """ + min_power = Power.zero() + voltage = self._voltage_cache.get().min() + if voltage is None: + _logger.warning( + "Voltage data is not available. Cannot perform level 1 throttling.", + ) + else: + min_power = voltage * self._config.min_current * 3.0 + + evc_list = list(self._evc_states.values()) + evc_list.sort(key=lambda st: (st.power, st.last_allocation), reverse=True) + + level1_throttling_count = 0 + level1_amps_achieved = Power.zero() + + level2_throttling_count = 0 + level2_amps_achieved = Power.zero() + + for evc in evc_list: + evc_power = evc.power + evc_level1_power = Power.zero() + if evc_power > min_power: + evc_level1_power = evc_power - min_power + + if evc_power == Power.zero(): + evc_power = evc.last_allocation + + if evc_power == Power.zero(): + break + + if level1_amps_achieved < throttle_by: + level1_amps_achieved += evc_level1_power + level1_throttling_count += 1 + else: + break + if level2_amps_achieved < throttle_by: + level2_amps_achieved += evc_power + level2_throttling_count += 1 + + if level1_amps_achieved >= throttle_by: + throttled_powers = { + evc.component_id: min_power + for evc in evc_list[:level1_throttling_count] + } + else: + throttled_powers = { + evc.component_id: Power.zero() + for evc in evc_list[:level2_throttling_count] + } + _logger.debug("Throttling: %s", throttled_powers) + return throttled_powers diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_states.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_states.py new file mode 100644 index 000000000..c698bbaa2 --- /dev/null +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_states.py @@ -0,0 +1,132 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Power distribution state tracking for ev chargers.""" + + +from dataclasses import dataclass +from datetime import datetime +from typing import Iterable + +from frequenz.client.microgrid import EVChargerData + +from .....timeseries import Power + + +@dataclass +class EvcState: + """A class for tracking state of an ev charger.""" + + component_id: int + """The component id of the ev charger.""" + + last_data: EVChargerData + """The last data received from the EV charger.""" + + power: Power + """The power currently used by the EV charger.""" + + last_allocation: Power + """The last allocation made for the EV.""" + + last_reallocation_time: datetime + """The last time the ev charger was allocated power. + + Used to make sure we don't allocate power to the ev charger too often. + """ + + last_charging_time: datetime + """The last time the ev charger was charging. + + Used to de-allocate power from the ev charger if it has not been charging + for a while. + """ + + def update_last_allocation(self, allocation: Power, alloc_time: datetime) -> None: + """Update the last allocation and related timestamps. + + Args: + allocation: The most allocation allocation made for the EV. + alloc_time: The time at which the allocation was made. + """ + self.last_allocation = allocation + self.last_reallocation_time = alloc_time + self.last_charging_time = alloc_time + + def update_state( + self, + latest_ev_data: EVChargerData, + ) -> None: + """Update EvcState from component data. + + Args: + latest_ev_data: latest ev data from component data stream. + """ + self.power = Power.from_watts(latest_ev_data.active_power) + self.last_data = latest_ev_data + + if self.power > Power.zero(): + self.last_charging_time = latest_ev_data.timestamp + + +class EvcStates: + """Tracks states of all ev chargers.""" + + _states: dict[int, EvcState] + + def __init__(self) -> None: + """Initialize this instance.""" + self._states = {} + + def get_ev_total_used_power(self) -> Power: + """Return the total power consumed by all EV Chargers.""" + total_used = Power.zero() + for evc in self._states.values(): + total_used += evc.power + return total_used + + def get_total_allocated_power(self) -> Power: + """Return the total power allocated to all EV Chargers.""" + total_allocated = Power.zero() + for evc in self._states.values(): + total_allocated += evc.last_allocation + return total_allocated + + def get(self, component_id: int) -> EvcState: + """Return a reference to the EvcState object with the given component_id. + + Args: + component_id: identifies the object to retrieve. + + Returns: + The EvcState object with the given component_id. + """ + return self._states[component_id] + + def add_evc(self, state: EvcState) -> None: + """Add the given EvcState object to the list. + + Args: + state: The EvcState object to add to the list. + """ + self._states[state.component_id] = state + + def values(self) -> Iterable[EvcState]: + """Return an iterator over all EvcState objects. + + Returns: + An iterator over all EvcState objects. + """ + return self._states.values() + + def __contains__(self, component_id: int) -> bool: + """Check if the given component_id has an associated EvcState object. + + Args: + component_id: The component id to test. + + Returns: + Boolean indicating whether the given component_id is a known + EvCharger. + """ + return component_id in self._states diff --git a/src/frequenz/sdk/actor/power_distributing/_component_status/__init__.py b/src/frequenz/sdk/actor/power_distributing/_component_status/__init__.py index 8fb33415c..929cbc726 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_status/__init__.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_status/__init__.py @@ -11,6 +11,7 @@ ComponentStatusTracker, SetPowerResult, ) +from ._ev_charger_status_tracker import EVChargerStatusTracker __all__ = [ "BatteryStatusTracker", @@ -18,5 +19,6 @@ "ComponentStatus", "ComponentStatusEnum", "ComponentStatusTracker", + "EVChargerStatusTracker", "SetPowerResult", ] diff --git a/src/frequenz/sdk/actor/power_distributing/_component_status/_ev_charger_status_tracker.py b/src/frequenz/sdk/actor/power_distributing/_component_status/_ev_charger_status_tracker.py new file mode 100644 index 000000000..a53575d83 --- /dev/null +++ b/src/frequenz/sdk/actor/power_distributing/_component_status/_ev_charger_status_tracker.py @@ -0,0 +1,204 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Background service that tracks the status of an EV charger.""" + + +import asyncio +import logging +from datetime import datetime, timedelta, timezone + +from frequenz.channels import Receiver, Sender, select, selected_from +from frequenz.channels.timer import SkipMissedAndDrift, Timer +from frequenz.client.microgrid import ( + EVChargerCableState, + EVChargerComponentState, + EVChargerData, +) +from typing_extensions import override + +from frequenz.sdk.microgrid import connection_manager + +from ..._background_service import BackgroundService +from ._blocking_status import BlockingStatus +from ._component_status import ( + ComponentStatus, + ComponentStatusEnum, + ComponentStatusTracker, + SetPowerResult, +) + +_logger = logging.getLogger(__name__) + + +class EVChargerStatusTracker(ComponentStatusTracker, BackgroundService): + """Status tracker for EV chargers. + + It reports an EV charger as `WORKING` when an EV is connected to it, + and power can be allocated to it, and `NOT_WORKING` otherwise. + + If it receives a power assignment failure from the PowerDistributor, + when the component is expected to be `WORKING`, it is marked as + `UNCERTAIN` for a specific interval, before being marked `WORKING` + again. + """ + + @override + def __init__( # pylint: disable=too-many-arguments + self, + component_id: int, + max_data_age: timedelta, + max_blocking_duration: timedelta, + status_sender: Sender[ComponentStatus], + set_power_result_receiver: Receiver[SetPowerResult], + ) -> None: + """Initialize this instance. + + Args: + component_id: ID of the EV charger to monitor the status of. + max_data_age: max duration to wait for, before marking a component as + NOT_WORKING, unless new data arrives. + max_blocking_duration: duration for which the component status should be + UNCERTAIN if a request to the component failed unexpectedly. + status_sender: Channel sender to send status updates to. + set_power_result_receiver: Receiver to fetch PowerDistributor responses + from, to get the status of the most recent request made for an EV + Charger. + """ + self._component_id = component_id + self._max_data_age = max_data_age + self._status_sender = status_sender + self._set_power_result_receiver = set_power_result_receiver + + self._last_status = ComponentStatusEnum.NOT_WORKING + self._blocking_status = BlockingStatus( + min_duration=timedelta(seconds=1.0), + max_duration=max_blocking_duration, + ) + + BackgroundService.__init__(self, name=f"EVChargerStatusTracker({component_id})") + + @override + def start(self) -> None: + """Start the status tracker.""" + self._tasks.add(asyncio.create_task(self._run_forever())) + + def _is_working(self, ev_data: EVChargerData) -> bool: + """Return whether the given data indicates that the component is working.""" + return ev_data.cable_state in ( + EVChargerCableState.EV_PLUGGED, + EVChargerCableState.EV_LOCKED, + ) and ev_data.component_state in ( + EVChargerComponentState.READY, + EVChargerComponentState.CHARGING, + EVChargerComponentState.DISCHARGING, + ) + + def _is_stale(self, ev_data: EVChargerData) -> bool: + """Return whether the given data is stale.""" + now = datetime.now(tz=timezone.utc) + stale = now - ev_data.timestamp > self._max_data_age + return stale + + async def _run_forever(self) -> None: + """Run the status tracker forever.""" + while True: + try: + await self._run() + except Exception: # pylint: disable=broad-except + _logger.exception( + "Restarting after exception in EVChargerStatusTracker.run()" + ) + await asyncio.sleep(1.0) + + def _handle_ev_data(self, ev_data: EVChargerData) -> ComponentStatusEnum: + """Handle new EV charger data.""" + if self._is_stale(ev_data): + if self._last_status == ComponentStatusEnum.WORKING: + _logger.warning( + "EV charger %s data is stale. Last timestamp: %s", + self._component_id, + ev_data.timestamp, + ) + return ComponentStatusEnum.NOT_WORKING + + if self._is_working(ev_data): + if self._last_status == ComponentStatusEnum.NOT_WORKING: + _logger.warning( + "EV charger %s is in WORKING state.", + self._component_id, + ) + return ComponentStatusEnum.WORKING + + if self._last_status == ComponentStatusEnum.WORKING: + _logger.warning( + "EV charger %s is in NOT_WORKING state. " + "Cable state: %s, component state: %s", + self._component_id, + ev_data.cable_state, + ev_data.component_state, + ) + return ComponentStatusEnum.NOT_WORKING + + def _handle_set_power_result( + self, set_power_result: SetPowerResult + ) -> ComponentStatusEnum: + """Handle a new set power result.""" + if self._component_id in set_power_result.succeeded: + return ComponentStatusEnum.WORKING + + self._blocking_status.block() + if self._last_status == ComponentStatusEnum.WORKING: + _logger.warning( + "EV charger %s is in UNCERTAIN state. Set power result: %s", + self._component_id, + set_power_result, + ) + return ComponentStatusEnum.UNCERTAIN + + async def _run(self) -> None: + """Run the status tracker.""" + api_client = connection_manager.get().api_client + ev_data_rx = await api_client.ev_charger_data(self._component_id) + set_power_result_rx = self._set_power_result_receiver + missing_data_timer = Timer(self._max_data_age, SkipMissedAndDrift()) + + # Send initial status + await self._status_sender.send( + ComponentStatus(self._component_id, self._last_status) + ) + + async for selected in select( + ev_data_rx, set_power_result_rx, missing_data_timer + ): + new_status = ComponentStatusEnum.NOT_WORKING + if selected_from(selected, ev_data_rx): + missing_data_timer.reset() + new_status = self._handle_ev_data(selected.message) + elif selected_from(selected, set_power_result_rx): + new_status = self._handle_set_power_result(selected.message) + elif selected_from(selected, missing_data_timer): + _logger.warning( + "No EV charger %s data received for %s. Setting status to NOT_WORKING.", + self._component_id, + self._max_data_age, + ) + + # Send status update if status changed + if ( + self._blocking_status.is_blocked() + and new_status != ComponentStatusEnum.NOT_WORKING + ): + new_status = ComponentStatusEnum.UNCERTAIN + + if new_status != self._last_status: + _logger.info( + "EV charger %s status changed from %s to %s", + self._component_id, + self._last_status, + new_status, + ) + self._last_status = new_status + await self._status_sender.send( + ComponentStatus(self._component_id, new_status) + ) diff --git a/src/frequenz/sdk/actor/power_distributing/power_distributing.py b/src/frequenz/sdk/actor/power_distributing/power_distributing.py index 4ac523535..6af02716c 100644 --- a/src/frequenz/sdk/actor/power_distributing/power_distributing.py +++ b/src/frequenz/sdk/actor/power_distributing/power_distributing.py @@ -18,7 +18,7 @@ from frequenz.client.microgrid import ComponentCategory from ...actor._actor import Actor -from ._component_managers import BatteryManager, ComponentManager +from ._component_managers import BatteryManager, ComponentManager, EVChargerManager from ._component_status import ComponentPoolStatus from .request import Request from .result import Result @@ -56,7 +56,7 @@ def __init__( # pylint: disable=too-many-arguments requests_receiver: Receiver[Request], results_sender: Sender[Result], component_pool_status_sender: Sender[ComponentPoolStatus], - wait_for_data_sec: float = 2, + wait_for_data_sec: float, *, name: str | None = None, ) -> None: @@ -86,7 +86,13 @@ def __init__( # pylint: disable=too-many-arguments self._component_manager: ComponentManager if component_category == ComponentCategory.BATTERY: - self._component_manager = BatteryManager(component_pool_status_sender) + self._component_manager = BatteryManager( + component_pool_status_sender, results_sender + ) + elif component_category == ComponentCategory.EV_CHARGER: + self._component_manager = EVChargerManager( + component_pool_status_sender, results_sender + ) else: raise ValueError( f"PowerDistributor doesn't support controlling: {component_category}" @@ -107,8 +113,7 @@ async def _run(self) -> None: # pylint: disable=too-many-locals await asyncio.sleep(self._wait_for_data_sec) async for request in self._requests_receiver: - result = await self._component_manager.distribute_power(request) - await self._result_sender.send(result) + await self._component_manager.distribute_power(request) async def stop(self, msg: str | None = None) -> None: """Stop this actor. diff --git a/src/frequenz/sdk/microgrid/_data_pipeline.py b/src/frequenz/sdk/microgrid/_data_pipeline.py index 35e34c8c8..ab46fb64e 100644 --- a/src/frequenz/sdk/microgrid/_data_pipeline.py +++ b/src/frequenz/sdk/microgrid/_data_pipeline.py @@ -38,6 +38,9 @@ ) from ..timeseries.consumer import Consumer from ..timeseries.ev_charger_pool import EVChargerPool + from ..timeseries.ev_charger_pool._ev_charger_pool_reference_store import ( + EVChargerPoolReferenceStore, + ) from ..timeseries.logical_meter import LogicalMeter from ..timeseries.producer import Producer @@ -92,12 +95,15 @@ def __init__( self._battery_power_wrapper = PowerWrapper( ComponentCategory.BATTERY, self._channel_registry ) + self._ev_power_wrapper = PowerWrapper( + ComponentCategory.EV_CHARGER, self._channel_registry + ) self._logical_meter: LogicalMeter | None = None self._consumer: Consumer | None = None self._producer: Producer | None = None self._grid: Grid | None = None - self._ev_charger_pools: dict[frozenset[int], EVChargerPool] = {} + self._ev_charger_pools: dict[frozenset[int], EVChargerPoolReferenceStore] = {} self._battery_pools: dict[frozenset[int], BatteryPoolReferenceStore] = {} self._frequency_instance: GridFrequency | None = None self._voltage_instance: VoltageStreamer | None = None @@ -158,6 +164,8 @@ def producer(self) -> Producer: def ev_charger_pool( self, ev_charger_ids: abc.Set[int] | None = None, + name: str | None = None, + priority: int = -sys.maxsize - 1, ) -> EVChargerPool: """Return the corresponding EVChargerPool instance for the given ids. @@ -167,11 +175,20 @@ def ev_charger_pool( Args: ev_charger_ids: Optional set of IDs of EV Chargers to be managed by the EVChargerPool. + name: An optional name used to identify this instance of the pool or a + corresponding actor in the logs. + priority: The priority of the actor making the call. Returns: An EVChargerPool instance. """ from ..timeseries.ev_charger_pool import EVChargerPool + from ..timeseries.ev_charger_pool._ev_charger_pool_reference_store import ( + EVChargerPoolReferenceStore, + ) + + if not self._ev_power_wrapper.started: + self._ev_power_wrapper.start() # We use frozenset to make a hashable key from the input set. key: frozenset[int] = frozenset() @@ -179,12 +196,21 @@ def ev_charger_pool( key = frozenset(ev_charger_ids) if key not in self._ev_charger_pools: - self._ev_charger_pools[key] = EVChargerPool( + self._ev_charger_pools[key] = EVChargerPoolReferenceStore( channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), + status_receiver=self._ev_power_wrapper.status_channel.new_receiver( + limit=1 + ), + power_manager_requests_sender=( + self._ev_power_wrapper.proposal_channel.new_sender() + ), + power_manager_bounds_subs_sender=( + self._ev_power_wrapper.bounds_subscription_channel.new_sender() + ), component_ids=ev_charger_ids, ) - return self._ev_charger_pools[key] + return EVChargerPool(self._ev_charger_pools[key], name, priority) def grid(self) -> Grid: """Return the grid measuring point.""" @@ -353,21 +379,32 @@ def producer() -> Producer: return _get().producer() -def ev_charger_pool(ev_charger_ids: abc.Set[int] | None = None) -> EVChargerPool: - """Return the corresponding EVChargerPool instance for the given ids. +def ev_charger_pool( + ev_charger_ids: abc.Set[int] | None = None, + name: str | None = None, + priority: int = -sys.maxsize - 1, +) -> EVChargerPool: + """Return a new `EVChargerPool` instance for the given parameters. + + The priority value is used to resolve conflicts when multiple actors are trying to + propose different power values for the same set of EV chargers. - If an EVChargerPool instance for the given ids doesn't exist, a new one is - created and returned. + !!! note + When specifying priority, bigger values indicate higher priority. The default + priority is the lowest possible value. Args: ev_charger_ids: Optional set of IDs of EV Chargers to be managed by the EVChargerPool. If not specified, all EV Chargers available in the component graph are used. + name: An optional name used to identify this instance of the pool or a + corresponding actor in the logs. + priority: The priority of the actor making the call. Returns: - An EVChargerPool instance. + An `EVChargerPool` instance. """ - return _get().ev_charger_pool(ev_charger_ids) + return _get().ev_charger_pool(ev_charger_ids, name, priority) def battery_pool( diff --git a/src/frequenz/sdk/microgrid/_power_wrapper.py b/src/frequenz/sdk/microgrid/_power_wrapper.py index 999e6bbf9..d70fd351a 100644 --- a/src/frequenz/sdk/microgrid/_power_wrapper.py +++ b/src/frequenz/sdk/microgrid/_power_wrapper.py @@ -30,6 +30,8 @@ _logger = logging.getLogger(__name__) +_POWER_DISTRIBUTING_ACTOR_WAIT_FOR_DATA_SEC = 2.0 + class PowerWrapper: """Wrapper around the power managing and power distributing actors.""" @@ -66,6 +68,7 @@ def __init__( self._power_distributing_actor: PowerDistributingActor | None = None self._power_managing_actor: _power_managing.PowerManagingActor | None = None + self._pd_wait_for_data_sec: float = _POWER_DISTRIBUTING_ACTOR_WAIT_FOR_DATA_SEC def _start_power_managing_actor(self) -> None: """Start the power managing actor if it is not already running.""" @@ -134,6 +137,7 @@ def _start_power_distributing_actor(self) -> None: requests_receiver=self._power_distribution_requests_channel.new_receiver(), results_sender=self._power_distribution_results_channel.new_sender(), component_pool_status_sender=self.status_channel.new_sender(), + wait_for_data_sec=self._pd_wait_for_data_sec, ) self._power_distributing_actor.start() diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/__init__.py b/src/frequenz/sdk/timeseries/ev_charger_pool/__init__.py index 1adc8a1eb..999538785 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/__init__.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/__init__.py @@ -4,10 +4,10 @@ """Interactions with EV Chargers.""" from ._ev_charger_pool import EVChargerPool, EVChargerPoolError -from ._set_current_bounds import ComponentCurrentLimit +from ._result_types import EVChargerPoolReport __all__ = [ - "ComponentCurrentLimit", "EVChargerPool", "EVChargerPoolError", + "EVChargerPoolReport", ] diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py index 3112bcf2c..f909af661 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py @@ -4,22 +4,25 @@ """Interactions with pools of EV Chargers.""" +import asyncio +import typing import uuid from collections import abc +from datetime import timedelta -from frequenz.channels import Sender -from frequenz.client.microgrid import ComponentCategory - -from ...actor import ChannelRegistry, ComponentMetricRequest -from ...microgrid import connection_manager +from ..._internal._channels import ReceiverFetcher +from ...actor import _power_managing +from ...timeseries import Bounds +from .._base_types import SystemBounds from .._quantities import Current, Power from ..formula_engine import FormulaEngine, FormulaEngine3Phase -from ..formula_engine._formula_engine_pool import FormulaEnginePool from ..formula_engine._formula_generators import ( EVChargerCurrentFormula, EVChargerPowerFormula, FormulaGeneratorConfig, ) +from ._ev_charger_pool_reference_store import EVChargerPoolReferenceStore +from ._result_types import EVChargerPoolReport class EVChargerPoolError(Exception): @@ -41,11 +44,11 @@ class EVChargerPool: measurements of the EV Chargers in the pool. """ - def __init__( + def __init__( # pylint: disable=too-many-arguments self, - channel_registry: ChannelRegistry, - resampler_subscription_sender: Sender[ComponentMetricRequest], - component_ids: abc.Set[int] | None = None, + ev_charger_pool_ref: EVChargerPoolReferenceStore, + name: str | None, + priority: int, ) -> None: """Create an `EVChargerPool` instance. @@ -55,34 +58,74 @@ def __init__( method for creating `EVChargerPool` instances. Args: - channel_registry: A channel registry instance shared with the resampling - actor. - resampler_subscription_sender: A sender for sending metric requests to the - resampling actor. - component_ids: An optional list of component_ids belonging to this pool. If - not specified, IDs of all EV Chargers in the microgrid will be fetched - from the component graph. + ev_charger_pool_ref: The EV charger pool reference store instance. + name: An optional name used to identify this instance of the pool or a + corresponding actor in the logs. + priority: The priority of the actor using this wrapper. """ - self._channel_registry: ChannelRegistry = channel_registry - self._resampler_subscription_sender: Sender[ComponentMetricRequest] = ( - resampler_subscription_sender - ) - self._component_ids: abc.Set[int] = set() - if component_ids is not None: - self._component_ids = component_ids - else: - graph = connection_manager.get().component_graph - self._component_ids = { - evc.component_id - for evc in graph.components( - component_categories={ComponentCategory.EV_CHARGER} - ) - } - self._namespace: str = f"ev-charger-pool-{uuid.uuid4()}" - self._formula_pool: FormulaEnginePool = FormulaEnginePool( - self._namespace, - self._channel_registry, - self._resampler_subscription_sender, + self._ev_charger_pool = ev_charger_pool_ref + unique_id = str(uuid.uuid4()) + self._source_id = unique_id if name is None else f"{name}-{unique_id}" + self._priority = priority + + async def propose_power( + self, + power: Power | None, + *, + request_timeout: timedelta = timedelta(seconds=5.0), + bounds: Bounds[Power | None] = Bounds(None, None), + ) -> None: + """Send a proposal to the power manager for the pool's set of EV chargers. + + This proposal is for the maximum power that can be set for the EV chargers in + the pool. The actual consumption might be lower based on the number of phases + an EV is drawing power from, and its current state of charge. + + Power values need to follow the Passive Sign Convention (PSC). That is, positive + values indicate charge power and negative values indicate discharge power. + Discharging from EV chargers is currently not supported. + + If the same EV chargers are shared by multiple actors, the power manager will + consider the priority of the actors, the bounds they set, and their preferred + power, when calculating the target power for the EV chargers + + The preferred power of lower priority actors will take precedence as long as + they respect the bounds set by higher priority actors. If lower priority actors + request power values outside of the bounds set by higher priority actors, the + target power will be the closest value to the preferred power that is within the + bounds. + + When there are no other actors trying to use the same EV chargers, the actor's + preferred power would be set as the target power, as long as it falls within the + system power bounds for the EV chargers. + + The result of the request can be accessed using the receiver returned from the + [`power_status`][frequenz.sdk.timeseries.ev_charger_pool.EVChargerPool.power_status] + method, which also streams the bounds that an actor should comply with, based on + its priority. + + Args: + power: The power to propose for the EV chargers in the pool. If `None`, + this proposal will not have any effect on the target power, unless + bounds are specified. If both are `None`, it is equivalent to not + having a proposal or withdrawing a previous one. + request_timeout: The timeout for the request. + bounds: The power bounds for the proposal. These bounds will apply to + actors with a lower priority, and can be overridden by bounds from + actors with a higher priority. If None, the power bounds will be set to + the maximum power of the batteries in the pool. This is currently and + experimental feature. + """ + await self._ev_charger_pool.power_manager_requests_sender.send( + _power_managing.Proposal( + source_id=self._source_id, + preferred_power=power, + bounds=bounds, + component_ids=self._ev_charger_pool.component_ids, + priority=self._priority, + creation_time=asyncio.get_running_loop().time(), + request_timeout=request_timeout, + ) ) @property @@ -92,7 +135,7 @@ def component_ids(self) -> abc.Set[int]: Returns: Set of managed component IDs. """ - return self._component_ids + return self._ev_charger_pool.component_ids @property def current(self) -> FormulaEngine3Phase[Current]: @@ -110,10 +153,14 @@ def current(self) -> FormulaEngine3Phase[Current]: A FormulaEngine that will calculate and stream the total current of all EV Chargers. """ - engine = self._formula_pool.from_3_phase_current_formula_generator( - "ev_charger_total_current", - EVChargerCurrentFormula, - FormulaGeneratorConfig(component_ids=self._component_ids), + engine = ( + self._ev_charger_pool.formula_pool.from_3_phase_current_formula_generator( + "ev_charger_total_current", + EVChargerCurrentFormula, + FormulaGeneratorConfig( + component_ids=self._ev_charger_pool.component_ids + ), + ) ) assert isinstance(engine, FormulaEngine3Phase) return engine @@ -134,16 +181,53 @@ def power(self) -> FormulaEngine[Power]: A FormulaEngine that will calculate and stream the total power of all EV Chargers. """ - engine = self._formula_pool.from_power_formula_generator( + engine = self._ev_charger_pool.formula_pool.from_power_formula_generator( "ev_charger_power", EVChargerPowerFormula, FormulaGeneratorConfig( - component_ids=self._component_ids, + component_ids=self._ev_charger_pool.component_ids, ), ) assert isinstance(engine, FormulaEngine) return engine + @property + def power_status(self) -> ReceiverFetcher[EVChargerPoolReport]: + """Get a receiver to receive new power status reports when they change. + + These include + - the current inclusion/exclusion bounds available for the pool's priority, + - the current target power for the pool's set of batteries, + - the result of the last distribution request for the pool's set of batteries. + + Returns: + A receiver that will stream power status reports for the pool's priority. + """ + sub = _power_managing.ReportRequest( + source_id=self._source_id, + priority=self._priority, + component_ids=self._ev_charger_pool.component_ids, + ) + self._ev_charger_pool.power_bounds_subs[sub.get_channel_name()] = ( + asyncio.create_task( + self._ev_charger_pool.power_manager_bounds_subs_sender.send(sub) + ) + ) + channel = self._ev_charger_pool.channel_registry.get_or_create( + _power_managing._Report, # pylint: disable=protected-access + sub.get_channel_name(), + ) + channel.resend_latest = True + + # More details on why the cast is needed here: + # https://github.com/frequenz-floss/frequenz-sdk-python/issues/823 + return typing.cast(ReceiverFetcher[EVChargerPoolReport], channel) + async def stop(self) -> None: """Stop all tasks and channels owned by the EVChargerPool.""" - await self._formula_pool.stop() + await self._ev_charger_pool.stop() + + @property + def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]: + """Return a receiver fetcher for the system power bounds.""" + return self._ev_charger_pool.bounds_channel diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py new file mode 100644 index 000000000..8dec056de --- /dev/null +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py @@ -0,0 +1,103 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Manages shared state/tasks for a set of EV chargers.""" + + +import asyncio +import uuid +from collections import abc + +from frequenz.channels import Broadcast, Receiver, Sender +from frequenz.client.microgrid import ComponentCategory + +from ...actor import ChannelRegistry, ComponentMetricRequest +from ...actor._power_managing._base_classes import Proposal, ReportRequest +from ...actor.power_distributing import ComponentPoolStatus +from ...microgrid import connection_manager +from .._base_types import SystemBounds +from ..formula_engine._formula_engine_pool import FormulaEnginePool +from ._system_bounds_tracker import EVCSystemBoundsTracker + + +class EVChargerPoolReferenceStore: + """A class for maintaining the shared state/tasks for a set of pool of EV chargers. + + This includes ownership of + - the formula engine pool and metric calculators. + - the tasks for calculating system bounds for the EV chargers. + + These are independent of the priority of the actors and can be shared between + multiple users of the same set of EV chargers. + + They are exposed through the EVChargerPool class. + """ + + def __init__( # pylint: disable=too-many-arguments + self, + channel_registry: ChannelRegistry, + resampler_subscription_sender: Sender[ComponentMetricRequest], + status_receiver: Receiver[ComponentPoolStatus], + power_manager_requests_sender: Sender[Proposal], + power_manager_bounds_subs_sender: Sender[ReportRequest], + component_ids: abc.Set[int] | None = None, + ): + """Create an instance of the class. + + Args: + channel_registry: A channel registry instance shared with the resampling + actor. + resampler_subscription_sender: A sender for sending metric requests to the + resampling actor. + status_receiver: A receiver that streams the status of the EV Chargers in + the pool. + power_manager_requests_sender: A Channel sender for sending power + requests to the power managing actor. + power_manager_bounds_subs_sender: A Channel sender for sending power bounds + subscription requests to the power managing actor. + component_ids: An optional list of component_ids belonging to this pool. If + not specified, IDs of all EV Chargers in the microgrid will be fetched + from the component graph. + """ + self.channel_registry = channel_registry + self.resampler_subscription_sender = resampler_subscription_sender + self.status_receiver = status_receiver + self.power_manager_requests_sender = power_manager_requests_sender + self.power_manager_bounds_subs_sender = power_manager_bounds_subs_sender + + if component_ids is not None: + self.component_ids: frozenset[int] = frozenset(component_ids) + else: + graph = connection_manager.get().component_graph + self.component_ids = frozenset( + { + evc.component_id + for evc in graph.components( + component_categories={ComponentCategory.EV_CHARGER} + ) + } + ) + + self.power_bounds_subs: dict[str, asyncio.Task[None]] = {} + + self.namespace: str = f"ev-charger-pool-{uuid.uuid4()}" + self.formula_pool = FormulaEnginePool( + self.namespace, + self.channel_registry, + self.resampler_subscription_sender, + ) + + self.bounds_channel: Broadcast[SystemBounds] = Broadcast( + name=f"System Bounds for EV Chargers: {component_ids}" + ) + self.bounds_tracker: EVCSystemBoundsTracker = EVCSystemBoundsTracker( + self.component_ids, + self.status_receiver, + self.bounds_channel.new_sender(), + ) + self.bounds_tracker.start() + + async def stop(self) -> None: + """Stop all tasks and channels owned by the EVChargerPool.""" + await self.formula_pool.stop() + await self.bounds_tracker.stop() diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_result_types.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_result_types.py new file mode 100644 index 000000000..11ab0ed1f --- /dev/null +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_result_types.py @@ -0,0 +1,31 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Types for exposing EV charger pool reports.""" + +import typing + +from ...actor import power_distributing +from .._base_types import Bounds +from .._quantities import Power + + +class EVChargerPoolReport(typing.Protocol): + """A status report for an EV chargers pool.""" + + target_power: Power | None + """The currently set power for the EV chargers.""" + + distribution_result: power_distributing.Result | None + """The result of the last power distribution. + + This is `None` if no power distribution has been performed yet. + """ + + @property + def bounds(self) -> Bounds[Power] | None: + """The usable bounds for the EV chargers. + + These bounds are adjusted to any restrictions placed by actors with higher + priorities. + """ diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_system_bounds_tracker.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_system_bounds_tracker.py new file mode 100644 index 000000000..4d18309ed --- /dev/null +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_system_bounds_tracker.py @@ -0,0 +1,152 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""System bounds tracker for the EV chargers.""" + + +import asyncio +import logging +from collections import abc + +from frequenz.channels import Receiver, Sender, merge, select, selected_from +from frequenz.client.microgrid import EVChargerData + +from ...actor import BackgroundService +from ...actor.power_distributing._component_status import ComponentPoolStatus +from ...microgrid import connection_manager +from .. import Power +from .._base_types import Bounds, SystemBounds + +_logger = logging.getLogger(__name__) + + +class EVCSystemBoundsTracker(BackgroundService): + """Track the system bounds for the EV chargers. + + System bounds are the aggregate bounds for the EV chargers in the pool that are + working and have an EV attached to them. They are calculated from the individual + bounds received from the microgrid API. + + The system bounds are sent to the `bounds_sender` whenever they change. + """ + + def __init__( + self, + component_ids: abc.Set[int], + status_receiver: Receiver[ComponentPoolStatus], + bounds_sender: Sender[SystemBounds], + ): + """Initialize this instance. + + Args: + component_ids: The ids of the components to track. + status_receiver: A receiver that streams the status of the EV Chargers in + the pool. + bounds_sender: A sender to send the system bounds to. + """ + super().__init__() + + self._component_ids = component_ids + self._status_receiver = status_receiver + self._bounds_sender = bounds_sender + self._latest_component_data: dict[int, EVChargerData] = {} + self._last_sent_bounds: SystemBounds | None = None + self._component_pool_status = ComponentPoolStatus(set(), set()) + + def start(self) -> None: + """Start the EV charger system bounds tracker.""" + self._tasks.add(asyncio.create_task(self._run_forever())) + + async def _send_bounds(self) -> None: + """Calculate and send the aggregate system bounds if they have changed.""" + if not self._latest_component_data: + return + inclusion_bounds = Bounds( + lower=Power.from_watts( + sum( + data.active_power_inclusion_lower_bound + for data in self._latest_component_data.values() + ) + ), + upper=Power.from_watts( + sum( + data.active_power_inclusion_upper_bound + for data in self._latest_component_data.values() + ) + ), + ) + exclusion_bounds = Bounds( + lower=Power.from_watts( + sum( + data.active_power_exclusion_lower_bound + for data in self._latest_component_data.values() + ) + ), + upper=Power.from_watts( + sum( + data.active_power_exclusion_upper_bound + for data in self._latest_component_data.values() + ) + ), + ) + + if ( + self._last_sent_bounds is None + or inclusion_bounds != self._last_sent_bounds.inclusion_bounds + or exclusion_bounds != self._last_sent_bounds.exclusion_bounds + ): + self._last_sent_bounds = SystemBounds( + timestamp=max( + msg.timestamp for msg in self._latest_component_data.values() + ), + inclusion_bounds=inclusion_bounds, + exclusion_bounds=exclusion_bounds, + ) + await self._bounds_sender.send(self._last_sent_bounds) + + async def _run_forever(self) -> None: + """Run the status tracker forever.""" + while True: + try: + await self._run() + except Exception: # pylint: disable=broad-except + _logger.exception( + "Restarting after exception in EVChargerSystemBoundsTracker.run()" + ) + await asyncio.sleep(1.0) + + async def _run(self) -> None: + """Run the system bounds tracker.""" + api_client = connection_manager.get().api_client + status_rx = self._status_receiver + ev_data_rx = merge( + *( + await asyncio.gather( + *[api_client.ev_charger_data(cid) for cid in self._component_ids] + ) + ) + ) + + async for selected in select(status_rx, ev_data_rx): + if selected_from(selected, status_rx): + self._component_pool_status = selected.message + to_remove = [] + for comp_id in self._latest_component_data: + if ( + comp_id not in self._component_pool_status.working + and comp_id not in self._component_pool_status.uncertain + ): + to_remove.append(comp_id) + for comp_id in to_remove: + del self._latest_component_data[comp_id] + elif selected_from(selected, ev_data_rx): + data = selected.message + comp_id = data.component_id + if ( + comp_id not in self._component_pool_status.working + and comp_id not in self._component_pool_status.uncertain + ): + continue + self._latest_component_data[data.component_id] = data + + await self._send_bounds() diff --git a/tests/actor/power_distributing/_component_status/test_battery_status.py b/tests/actor/power_distributing/_component_status/test_battery_status.py index 609b9bbba..792df2a57 100644 --- a/tests/actor/power_distributing/_component_status/test_battery_status.py +++ b/tests/actor/power_distributing/_component_status/test_battery_status.py @@ -1,5 +1,6 @@ # License: MIT # Copyright © 2023 Frequenz Energy-as-a-Service GmbH + """Tests for BatteryStatusTracker.""" # pylint: disable=too-many-lines @@ -38,6 +39,7 @@ from tests.timeseries.mock_microgrid import MockMicrogrid from ....utils.component_data_wrapper import BatteryDataWrapper, InverterDataWrapper +from ....utils.receive_timeout import Timeout, receive_timeout def battery_data( # pylint: disable=too-many-arguments @@ -123,26 +125,6 @@ class Message(Generic[T]): INVERTER_ID = 8 -class _Timeout: - """Sentinel for timeout.""" - - -async def recv_timeout(recv: Receiver[T], timeout: float = 0.1) -> T | type[_Timeout]: - """Receive message from receiver with timeout. - - Args: - recv: Receiver to receive message from. - timeout: Timeout in seconds. - - Returns: - Received message or _Timeout if timeout is reached. - """ - try: - return await asyncio.wait_for(recv.receive(), timeout=timeout) - except asyncio.TimeoutError: - return _Timeout - - # pylint: disable=protected-access, unused-argument class TestBatteryStatus: """Tests BatteryStatusTracker.""" @@ -952,7 +934,7 @@ async def test_critical_error( # --- battery warning error (keeps working) --- await self._send_healthy_inverter(mock_microgrid) await self._send_warning_error_battery(mock_microgrid) - assert await recv_timeout(status_receiver, timeout=0.1) is _Timeout + assert await receive_timeout(status_receiver, timeout=0.1) is Timeout await self._send_healthy_battery(mock_microgrid) await self._send_healthy_inverter(mock_microgrid) @@ -971,7 +953,7 @@ async def test_critical_error( # --- inverter warning error (keeps working) --- await self._send_healthy_battery(mock_microgrid) await self._send_warning_error_inverter(mock_microgrid) - assert await recv_timeout(status_receiver, timeout=0.1) is _Timeout + assert await receive_timeout(status_receiver, timeout=0.1) is Timeout await self._send_healthy_battery(mock_microgrid) await self._send_healthy_inverter(mock_microgrid) @@ -1023,11 +1005,11 @@ async def test_stale_data( # --- stale battery data --- await self._send_healthy_inverter(mock_microgrid) await self._send_healthy_battery(mock_microgrid, timestamp) - assert await recv_timeout(status_receiver) is _Timeout + assert await receive_timeout(status_receiver) is Timeout await self._send_healthy_inverter(mock_microgrid) await self._send_healthy_battery(mock_microgrid, timestamp) - assert await recv_timeout(status_receiver, 0.3) == ComponentStatus( + assert await receive_timeout(status_receiver, 0.3) == ComponentStatus( BATTERY_ID, ComponentStatusEnum.NOT_WORKING ) @@ -1039,11 +1021,11 @@ async def test_stale_data( # --- stale inverter data --- await self._send_healthy_battery(mock_microgrid) await self._send_healthy_inverter(mock_microgrid, timestamp) - assert await recv_timeout(status_receiver) is _Timeout + assert await receive_timeout(status_receiver) is Timeout await self._send_healthy_battery(mock_microgrid) await self._send_healthy_inverter(mock_microgrid, timestamp) - assert await recv_timeout(status_receiver, 0.3) == ComponentStatus( + assert await receive_timeout(status_receiver, 0.3) == ComponentStatus( BATTERY_ID, ComponentStatusEnum.NOT_WORKING ) diff --git a/tests/actor/power_distributing/_component_status/test_ev_charger_status.py b/tests/actor/power_distributing/_component_status/test_ev_charger_status.py new file mode 100644 index 000000000..925f70169 --- /dev/null +++ b/tests/actor/power_distributing/_component_status/test_ev_charger_status.py @@ -0,0 +1,165 @@ +# License: MIT +# Copyright © 2023 Frequenz Energy-as-a-Service GmbH + +"""Tests for EVChargerStatusTracker.""" + +import asyncio +from datetime import datetime, timedelta, timezone + +from frequenz.channels import Broadcast +from frequenz.client.microgrid import EVChargerCableState, EVChargerComponentState +from pytest_mock import MockerFixture + +from frequenz.sdk._internal._asyncio import cancel_and_await +from frequenz.sdk.actor.power_distributing._component_status import ( + ComponentStatus, + ComponentStatusEnum, + EVChargerStatusTracker, + SetPowerResult, +) + +from ....timeseries.mock_microgrid import MockMicrogrid +from ....utils.component_data_wrapper import EvChargerDataWrapper +from ....utils.receive_timeout import Timeout, receive_timeout + +_EV_CHARGER_ID = 6 + + +class TestEVChargerStatusTracker: + """Tests for EVChargerStatusTracker.""" + + async def test_status_changes(self, mocker: MockerFixture) -> None: + """Test that the status changes as expected.""" + mock_microgrid = MockMicrogrid(grid_meter=True, mocker=mocker) + mock_microgrid.add_ev_chargers(3) + + status_channel = Broadcast[ComponentStatus](name="battery_status") + set_power_result_channel = Broadcast[SetPowerResult](name="set_power_result") + set_power_result_sender = set_power_result_channel.new_sender() + + async with ( + mock_microgrid, + EVChargerStatusTracker( + component_id=_EV_CHARGER_ID, + max_data_age=timedelta(seconds=0.2), + max_blocking_duration=timedelta(seconds=1), + status_sender=status_channel.new_sender(), + set_power_result_receiver=set_power_result_channel.new_receiver(), + ), + ): + status_receiver = status_channel.new_receiver() + # The status is initially not working. + assert ( + await status_receiver.receive() + ).value == ComponentStatusEnum.NOT_WORKING + + # When an EV is plugged, it is working + await mock_microgrid.mock_client.send( + EvChargerDataWrapper( + _EV_CHARGER_ID, + datetime.now(tz=timezone.utc), + active_power=0.0, + component_state=EVChargerComponentState.READY, + cable_state=EVChargerCableState.EV_PLUGGED, + ) + ) + assert await receive_timeout(status_receiver) == ComponentStatus( + _EV_CHARGER_ID, ComponentStatusEnum.WORKING + ) + + # When an EV is locked, no change in status + await mock_microgrid.mock_client.send( + EvChargerDataWrapper( + _EV_CHARGER_ID, + datetime.now(tz=timezone.utc), + active_power=0.0, + component_state=EVChargerComponentState.READY, + cable_state=EVChargerCableState.EV_LOCKED, + ) + ) + assert await receive_timeout(status_receiver) is Timeout + + # When an EV is unplugged, it is not working + await mock_microgrid.mock_client.send( + EvChargerDataWrapper( + _EV_CHARGER_ID, + datetime.now(tz=timezone.utc), + active_power=0.0, + component_state=EVChargerComponentState.READY, + cable_state=EVChargerCableState.UNPLUGGED, + ) + ) + assert await receive_timeout(status_receiver) == ComponentStatus( + _EV_CHARGER_ID, ComponentStatusEnum.NOT_WORKING + ) + + # Get it back to working again + await mock_microgrid.mock_client.send( + EvChargerDataWrapper( + _EV_CHARGER_ID, + datetime.now(tz=timezone.utc), + active_power=0.0, + component_state=EVChargerComponentState.READY, + cable_state=EVChargerCableState.EV_LOCKED, + ) + ) + assert await receive_timeout(status_receiver) == ComponentStatus( + _EV_CHARGER_ID, ComponentStatusEnum.WORKING + ) + + # When there's no new data, it should become not working + assert await receive_timeout(status_receiver, 0.1) is Timeout + assert await receive_timeout(status_receiver, 0.2) == ComponentStatus( + _EV_CHARGER_ID, ComponentStatusEnum.NOT_WORKING + ) + + # Get it back to working again + await asyncio.sleep(0.1) + await mock_microgrid.mock_client.send( + EvChargerDataWrapper( + _EV_CHARGER_ID, + datetime.now(tz=timezone.utc), + active_power=0.0, + component_state=EVChargerComponentState.READY, + cable_state=EVChargerCableState.EV_LOCKED, + ) + ) + assert await receive_timeout(status_receiver) == ComponentStatus( + _EV_CHARGER_ID, ComponentStatusEnum.WORKING + ) + + async def keep_sending_healthy_message() -> None: + while True: + await mock_microgrid.mock_client.send( + EvChargerDataWrapper( + _EV_CHARGER_ID, + datetime.now(tz=timezone.utc), + active_power=0.0, + component_state=EVChargerComponentState.READY, + cable_state=EVChargerCableState.EV_LOCKED, + ) + ) + await asyncio.sleep(0.1) + + _keep_sending_healthy_message_task = asyncio.create_task( + keep_sending_healthy_message() + ) + + # when there's a PowerDistributor failure for the component, status should + # become uncertain. + await set_power_result_sender.send( + SetPowerResult( + succeeded=set(), + failed={_EV_CHARGER_ID}, + ) + ) + assert await receive_timeout(status_receiver) == ComponentStatus( + _EV_CHARGER_ID, ComponentStatusEnum.UNCERTAIN + ) + + # After the blocking duration, it should become working again. + assert await receive_timeout(status_receiver) is Timeout + assert await receive_timeout(status_receiver, 1.0) == ComponentStatus( + _EV_CHARGER_ID, ComponentStatusEnum.WORKING + ) + await cancel_and_await(_keep_sending_healthy_message_task) diff --git a/tests/actor/power_distributing/test_power_distributing.py b/tests/actor/power_distributing/test_power_distributing.py index 5454fd9e6..498ebf8b4 100644 --- a/tests/actor/power_distributing/test_power_distributing.py +++ b/tests/actor/power_distributing/test_power_distributing.py @@ -111,6 +111,7 @@ async def test_constructor_with_grid_meter(self, mocker: MockerFixture) -> None: requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), + wait_for_data_sec=0.0, ) as distributor: assert isinstance(distributor._component_manager, BatteryManager) assert distributor._component_manager._bat_invs_map == { @@ -141,6 +142,7 @@ async def test_constructor_without_grid_meter(self, mocker: MockerFixture) -> No requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), + wait_for_data_sec=0.0, ) as distributor: assert isinstance(distributor._component_manager, BatteryManager) assert distributor._component_manager._bat_invs_map == { diff --git a/tests/microgrid/fixtures.py b/tests/microgrid/fixtures.py index 0ce222b85..83774ebcd 100644 --- a/tests/microgrid/fixtures.py +++ b/tests/microgrid/fixtures.py @@ -68,6 +68,12 @@ async def new( streamer, dp._battery_power_wrapper.status_channel.new_sender(), ) + if component_category == ComponentCategory.EV_CHARGER: + return cls( + mockgrid, + streamer, + dp._ev_power_wrapper.status_channel.new_sender(), + ) raise ValueError(f"Unsupported component category: {component_category}") async def stop(self) -> None: diff --git a/tests/timeseries/_ev_charger_pool/__init__.py b/tests/timeseries/_ev_charger_pool/__init__.py new file mode 100644 index 000000000..afb2abf32 --- /dev/null +++ b/tests/timeseries/_ev_charger_pool/__init__.py @@ -0,0 +1,4 @@ +# License: MIT +# Copyright © 2023 Frequenz Energy-as-a-Service GmbH + +"""Test the EV charger pool control methods.""" diff --git a/tests/timeseries/test_ev_charger_pool.py b/tests/timeseries/_ev_charger_pool/test_ev_charger_pool.py similarity index 100% rename from tests/timeseries/test_ev_charger_pool.py rename to tests/timeseries/_ev_charger_pool/test_ev_charger_pool.py diff --git a/tests/timeseries/_ev_charger_pool/test_ev_charger_pool_control_methods.py b/tests/timeseries/_ev_charger_pool/test_ev_charger_pool_control_methods.py new file mode 100644 index 000000000..63059c57d --- /dev/null +++ b/tests/timeseries/_ev_charger_pool/test_ev_charger_pool_control_methods.py @@ -0,0 +1,266 @@ +# License: MIT +# Copyright © 2023 Frequenz Energy-as-a-Service GmbH + +"""Test the EV charger pool control methods.""" + +import asyncio +import typing +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, MagicMock + +import pytest +from frequenz.channels import Receiver +from frequenz.client.microgrid import EVChargerCableState, EVChargerComponentState +from pytest_mock import MockerFixture + +from frequenz.sdk import microgrid +from frequenz.sdk.actor import ResamplerConfig, power_distributing +from frequenz.sdk.actor.power_distributing import ( + ComponentPoolStatus, + PowerDistributingActor, +) +from frequenz.sdk.actor.power_distributing._component_managers import EVChargerManager +from frequenz.sdk.actor.power_distributing._component_managers._ev_charger_manager._config import ( + EVDistributionConfig, +) +from frequenz.sdk.actor.power_distributing._component_pool_status_tracker import ( + ComponentPoolStatusTracker, +) +from frequenz.sdk.microgrid._data_pipeline import _DataPipeline +from frequenz.sdk.timeseries import Power, Sample3Phase, Voltage +from frequenz.sdk.timeseries.ev_charger_pool import EVChargerPool, EVChargerPoolReport + +from ...microgrid.fixtures import _Mocks +from ...utils.component_data_streamer import MockComponentDataStreamer +from ...utils.component_data_wrapper import EvChargerDataWrapper, MeterDataWrapper +from ..mock_microgrid import MockMicrogrid + +# pylint: disable=protected-access + + +@pytest.fixture +async def mocks(mocker: MockerFixture) -> typing.AsyncIterator[_Mocks]: + """Create the mocks.""" + mockgrid = MockMicrogrid(grid_meter=True) + mockgrid.add_ev_chargers(4) + await mockgrid.start(mocker) + + # pylint: disable=protected-access + if microgrid._data_pipeline._DATA_PIPELINE is not None: + microgrid._data_pipeline._DATA_PIPELINE = None + await microgrid._data_pipeline.initialize( + ResamplerConfig(resampling_period=timedelta(seconds=0.1)) + ) + streamer = MockComponentDataStreamer(mockgrid.mock_client) + + dp = typing.cast(_DataPipeline, microgrid._data_pipeline._DATA_PIPELINE) + + yield _Mocks( + mockgrid, + streamer, + dp._ev_power_wrapper.status_channel.new_sender(), + ) + + +class TestEVChargerPoolControl: + """Test the EV charger pool control methods.""" + + async def _patch_ev_pool_status( + self, + mocks: _Mocks, + mocker: MockerFixture, + component_ids: list[int] | None = None, + ) -> None: + """Patch the EV charger pool status. + + If `component_ids` is not None, the mock will always return `component_ids`. + Otherwise, it will return the requested components. + """ + if component_ids: + mock = MagicMock(spec=ComponentPoolStatusTracker) + mock.get_working_components.return_value = component_ids + mocker.patch( + "frequenz.sdk.actor.power_distributing._component_managers" + "._ev_charger_manager._ev_charger_manager.ComponentPoolStatusTracker", + return_value=mock, + ) + else: + mock = MagicMock(spec=ComponentPoolStatusTracker) + mock.get_working_components.side_effect = set + mocker.patch( + "frequenz.sdk.actor.power_distributing._component_managers" + "._ev_charger_manager._ev_charger_manager.ComponentPoolStatusTracker", + return_value=mock, + ) + await mocks.component_status_sender.send( + ComponentPoolStatus(working=set(mocks.microgrid.evc_ids), uncertain=set()) + ) + + async def _patch_data_pipeline(self, mocker: MockerFixture) -> None: + mocker.patch( + "frequenz.sdk.microgrid._data_pipeline._DATA_PIPELINE._ev_power_wrapper" + "._pd_wait_for_data_sec", + 0.1, + ) + + async def _patch_power_distributing_actor( + self, + mocker: MockerFixture, + ) -> None: + dp = typing.cast(_DataPipeline, microgrid._data_pipeline._DATA_PIPELINE) + pda = typing.cast( + PowerDistributingActor, dp._ev_power_wrapper._power_distributing_actor + ) + cm = typing.cast( + EVChargerManager, + pda._component_manager, + ) + mocker.patch( + "frequenz.sdk.microgrid._data_pipeline._DATA_PIPELINE._ev_power_wrapper" + "._power_distributing_actor._component_manager._config", + EVDistributionConfig( + component_ids=cm._config.component_ids, + initial_current=cm._config.initial_current, + min_current=cm._config.min_current, + increase_power_interval=timedelta(seconds=0.12), + ), + ) + mocker.patch( + "frequenz.sdk.microgrid._data_pipeline._DATA_PIPELINE._ev_power_wrapper" + "._power_distributing_actor._component_manager._voltage_cache.get", + return_value=Sample3Phase( + timestamp=datetime.now(tz=timezone.utc), + value_p1=Voltage.from_volts(220.0), + value_p2=Voltage.from_volts(220.0), + value_p3=Voltage.from_volts(220.0), + ), + ) + + async def _init_ev_chargers(self, mocks: _Mocks) -> None: + now = datetime.now(tz=timezone.utc) + for evc_id in mocks.microgrid.evc_ids: + mocks.streamer.start_streaming( + EvChargerDataWrapper( + evc_id, + now, + cable_state=EVChargerCableState.EV_PLUGGED, + component_state=EVChargerComponentState.READY, + active_power=0.0, + active_power_inclusion_lower_bound=0.0, + active_power_inclusion_upper_bound=16.0 * 230.0 * 3, + voltage_per_phase=(230.0, 230.0, 230.0), + ), + 0.05, + ) + + for meter_id in mocks.microgrid.meter_ids: + mocks.streamer.start_streaming( + MeterDataWrapper( + meter_id, + now, + voltage_per_phase=(230.0, 230.0, 230.0), + ), + 0.05, + ) + + await asyncio.sleep(1) + + def _assert_report( # pylint: disable=too-many-arguments + self, + report: EVChargerPoolReport, + *, + power: float | None, + lower: float, + upper: float, + expected_result_pred: ( + typing.Callable[[power_distributing.Result], bool] | None + ) = None, + ) -> None: + assert report.target_power == ( + Power.from_watts(power) if power is not None else None + ) + assert report.bounds is not None + assert report.bounds.lower == Power.from_watts(lower) + assert report.bounds.upper == Power.from_watts(upper) + if expected_result_pred is not None: + assert report.distribution_result is not None + assert expected_result_pred(report.distribution_result) + + async def _get_bounds_receiver( + self, ev_charger_pool: EVChargerPool + ) -> Receiver[EVChargerPoolReport]: + bounds_rx = ev_charger_pool.power_status.new_receiver() + + # Consume initial reports as chargers are initialized + expected_upper_bounds = 44160.0 + max_reports = 10 + ctr = 0 + while ctr < max_reports: + ctr += 1 + report = await bounds_rx.receive() + assert report.bounds is not None + if report.bounds.upper == Power.from_watts(expected_upper_bounds): + break + + return bounds_rx + + async def test_setting_power( + self, + mocks: _Mocks, + mocker: MockerFixture, + ) -> None: + """Test setting power.""" + set_power = typing.cast( + AsyncMock, microgrid.connection_manager.get().api_client.set_power + ) + + await self._init_ev_chargers(mocks) + await self._patch_data_pipeline(mocker) + ev_charger_pool = microgrid.ev_charger_pool() + await self._patch_ev_pool_status(mocks, mocker) + await self._patch_power_distributing_actor(mocker) + + bounds_rx = await self._get_bounds_receiver(ev_charger_pool) + + # Check that chargers are initialized to Power.zero() + assert set_power.call_count == 4 + assert all(x.args[1] == 0.0 for x in set_power.call_args_list) + + self._assert_report( + await bounds_rx.receive(), power=None, lower=0.0, upper=44160.0 + ) + + set_power.reset_mock() + await ev_charger_pool.propose_power(Power.from_watts(40000.0)) + # ignore one report because it is not always immediately updated. + await bounds_rx.receive() + self._assert_report( + await bounds_rx.receive(), power=40000.0, lower=0.0, upper=44160.0 + ) + await asyncio.sleep(0.15) + + # Components are set initial power + assert set_power.call_count == 4 + assert all(x.args[1] == 6600.0 for x in set_power.call_args_list) + + # All available power is allocated. 3 chargers are set to 11040.0 + # and the last one is set to 6880.0 + set_power.reset_mock() + await asyncio.sleep(0.15) + assert set_power.call_count == 4 + + evs_11040 = [x.args for x in set_power.call_args_list if x.args[1] == 11040.0] + assert 3 == len(evs_11040) + evs_6680 = [x.args for x in set_power.call_args_list if x.args[1] == 6880.0] + assert 1 == len(evs_6680) + + # Throttle the power + set_power.reset_mock() + await ev_charger_pool.propose_power(Power.from_watts(32000.0)) + await bounds_rx.receive() + await asyncio.sleep(0.02) + assert set_power.call_count == 1 + + stopped_evs = [x.args for x in set_power.call_args_list if x.args[1] == 0.0] + assert 1 == len(stopped_evs) + assert stopped_evs[0][0] in [evc[0] for evc in evs_11040] diff --git a/tests/timeseries/mock_resampler.py b/tests/timeseries/mock_resampler.py index 02ab81628..3007e747f 100644 --- a/tests/timeseries/mock_resampler.py +++ b/tests/timeseries/mock_resampler.py @@ -197,6 +197,8 @@ def power_3_phase_senders( self._request_handler_task = task def _handle_task_done(self, task: asyncio.Task[None]) -> None: + if task.cancelled(): + return if exc := task.exception(): raise SystemExit(f"Task {task.get_name()!r} failed: {exc}") from exc diff --git a/tests/utils/receive_timeout.py b/tests/utils/receive_timeout.py new file mode 100644 index 000000000..ce768312b --- /dev/null +++ b/tests/utils/receive_timeout.py @@ -0,0 +1,31 @@ +# License: MIT +# Copyright © 2023 Frequenz Energy-as-a-Service GmbH + +"""Utility for receiving messages with timeout.""" + +import asyncio +from typing import TypeVar + +from frequenz.channels import Receiver + +T = TypeVar("T") + + +class Timeout: + """Sentinel for timeout.""" + + +async def receive_timeout(recv: Receiver[T], timeout: float = 0.1) -> T | type[Timeout]: + """Receive message from receiver with timeout. + + Args: + recv: Receiver to receive message from. + timeout: Timeout in seconds. + + Returns: + Received message or Timeout if timeout is reached. + """ + try: + return await asyncio.wait_for(recv.receive(), timeout=timeout) + except asyncio.TimeoutError: + return Timeout