Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
- Warning messages are logged when multiple instances of `*Pool`s are created for the same set of batteries, with the same priority values.

## Bug Fixes

Expand Down
150 changes: 110 additions & 40 deletions src/frequenz/sdk/microgrid/_data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from __future__ import annotations

import logging
import sys
import typing
from collections import abc
Expand Down Expand Up @@ -44,6 +45,8 @@
from ..timeseries.logical_meter import LogicalMeter
from ..timeseries.producer import Producer

_logger = logging.getLogger(__name__)


_REQUEST_RECV_BUFFER_SIZE = 500
"""The maximum number of requests that can be queued in the request receiver.
Expand Down Expand Up @@ -103,11 +106,22 @@ def __init__(
self._consumer: Consumer | None = None
self._producer: Producer | None = None
self._grid: Grid | None = None
self._ev_charger_pools: dict[frozenset[int], EVChargerPoolReferenceStore] = {}
self._battery_pools: dict[frozenset[int], BatteryPoolReferenceStore] = {}
self._ev_charger_pool_reference_stores: dict[
frozenset[int], EVChargerPoolReferenceStore
] = {}
self._battery_pool_reference_stores: dict[
frozenset[int], BatteryPoolReferenceStore
] = {}
self._frequency_instance: GridFrequency | None = None
self._voltage_instance: VoltageStreamer | None = None

self._known_pool_keys: set[str] = set()
"""A set of keys for corresponding to created EVChargerPool instances.

This is used to warn the user if they try to create a new EVChargerPool instance
for the same set of component IDs, and with the same priority.
"""

def frequency(self) -> GridFrequency:
"""Return the grid frequency measuring point."""
if self._frequency_instance is None:
Expand Down Expand Up @@ -191,26 +205,45 @@ def ev_charger_pool(
self._ev_power_wrapper.start()

# We use frozenset to make a hashable key from the input set.
key: frozenset[int] = frozenset()
ref_store_key: frozenset[int] = frozenset()
if ev_charger_ids is not None:
key = frozenset(ev_charger_ids)

if key not in self._ev_charger_pools:
self._ev_charger_pools[key] = EVChargerPoolReferenceStore(
channel_registry=self._channel_registry,
resampler_subscription_sender=self._resampling_request_sender(),
status_receiver=self._ev_power_wrapper.status_channel.new_receiver(
limit=1
),
power_manager_requests_sender=(
self._ev_power_wrapper.proposal_channel.new_sender()
),
power_manager_bounds_subs_sender=(
self._ev_power_wrapper.bounds_subscription_channel.new_sender()
),
component_ids=ev_charger_ids,
ref_store_key = frozenset(ev_charger_ids)

pool_key = f"{ref_store_key}-{priority}"
if pool_key in self._known_pool_keys:
_logger.warning(
"An EVChargerPool instance was already created for ev_charger_ids=%s "
"and priority=%s using `microgrid.ev_charger_pool(...)`."
"\n Hint: If the multiple instances are created from the same actor, "
"consider reusing the same instance."
"\n Hint: If the instances are created from different actors, "
"consider using different priorities to distinguish them.",
ev_charger_ids,
priority,
)
else:
self._known_pool_keys.add(pool_key)

if ref_store_key not in self._ev_charger_pool_reference_stores:
self._ev_charger_pool_reference_stores[ref_store_key] = (
EVChargerPoolReferenceStore(
channel_registry=self._channel_registry,
resampler_subscription_sender=self._resampling_request_sender(),
status_receiver=self._ev_power_wrapper.status_channel.new_receiver(
limit=1
),
power_manager_requests_sender=(
self._ev_power_wrapper.proposal_channel.new_sender()
),
power_manager_bounds_subs_sender=(
self._ev_power_wrapper.bounds_subscription_channel.new_sender()
),
component_ids=ev_charger_ids,
)
)
return EVChargerPool(self._ev_charger_pools[key], name, priority)
return EVChargerPool(
self._ev_charger_pool_reference_stores[ref_store_key], name, priority
)

def grid(self) -> Grid:
"""Return the grid measuring point."""
Expand Down Expand Up @@ -253,28 +286,47 @@ def battery_pool(
self._battery_power_wrapper.start()

# We use frozenset to make a hashable key from the input set.
key: frozenset[int] = frozenset()
ref_store_key: frozenset[int] = frozenset()
if battery_ids is not None:
key = frozenset(battery_ids)

if key not in self._battery_pools:
self._battery_pools[key] = BatteryPoolReferenceStore(
channel_registry=self._channel_registry,
resampler_subscription_sender=self._resampling_request_sender(),
batteries_status_receiver=self._battery_power_wrapper.status_channel.new_receiver(
limit=1
),
power_manager_requests_sender=(
self._battery_power_wrapper.proposal_channel.new_sender()
),
power_manager_bounds_subscription_sender=(
self._battery_power_wrapper.bounds_subscription_channel.new_sender()
),
min_update_interval=self._resampler_config.resampling_period,
batteries_id=battery_ids,
ref_store_key = frozenset(battery_ids)

pool_key = f"{ref_store_key}-{priority}"
if pool_key in self._known_pool_keys:
_logger.warning(
"A BatteryPool instance was already created for battery_ids=%s and "
"priority=%s using `microgrid.battery_pool(...)`."
"\n Hint: If the multiple instances are created from the same actor, "
"consider reusing the same instance."
"\n Hint: If the instances are created from different actors, "
"consider using different priorities to distinguish them.",
battery_ids,
priority,
)
else:
self._known_pool_keys.add(pool_key)

if ref_store_key not in self._battery_pool_reference_stores:
self._battery_pool_reference_stores[ref_store_key] = (
BatteryPoolReferenceStore(
channel_registry=self._channel_registry,
resampler_subscription_sender=self._resampling_request_sender(),
batteries_status_receiver=(
self._battery_power_wrapper.status_channel.new_receiver(limit=1)
),
power_manager_requests_sender=(
self._battery_power_wrapper.proposal_channel.new_sender()
),
power_manager_bounds_subscription_sender=(
self._battery_power_wrapper.bounds_subscription_channel.new_sender()
),
min_update_interval=self._resampler_config.resampling_period,
batteries_id=battery_ids,
)
)

return BatteryPool(self._battery_pools[key], name, priority)
return BatteryPool(
self._battery_pool_reference_stores[ref_store_key], name, priority
)

def _data_sourcing_request_sender(self) -> Sender[ComponentMetricRequest]:
"""Return a Sender for sending requests to the data sourcing actor.
Expand Down Expand Up @@ -331,7 +383,7 @@ async def _stop(self) -> None:
if self._resampling_actor:
await self._resampling_actor.actor.stop()
await self._battery_power_wrapper.stop()
for pool in self._battery_pools.values():
for pool in self._battery_pool_reference_stores.values():
await pool.stop()


Expand Down Expand Up @@ -393,6 +445,15 @@ def ev_charger_pool(
When specifying priority, bigger values indicate higher priority. The default
priority is the lowest possible value.

It is recommended to reuse the same instance of the `EVChargerPool` within the
same actor, unless they are managing different sets of EV chargers.

In deployments with multiple actors managing the same set of EV chargers, it is
recommended to use different priorities to distinguish between them. If not,
a random prioritization will be imposed on them to resolve conflicts, which may
lead to unexpected behavior like longer duration to converge on the desired
power.

Args:
ev_charger_ids: Optional set of IDs of EV Chargers to be managed by the
EVChargerPool. If not specified, all EV Chargers available in the
Expand Down Expand Up @@ -421,6 +482,15 @@ def battery_pool(
When specifying priority, bigger values indicate higher priority. The default
priority is the lowest possible value.

It is recommended to reuse the same instance of the `BatteryPool` within the
same actor, unless they are managing different sets of batteries.

In deployments with multiple actors managing the same set of batteries, it is
recommended to use different priorities to distinguish between them. If not,
a random prioritization will be imposed on them to resolve conflicts, which may
lead to unexpected behavior like longer duration to converge on the desired
power.

Args:
battery_ids: Optional set of IDs of batteries to be managed by the `BatteryPool`.
If not specified, all batteries available in the component graph are used.
Expand Down