Skip to content

Support for disabling overflow-warning logs in broadcast receivers #329

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
- Added support for disabling the overflow-warning log messagess in broadcast receivers, through the `warn_on_overflow` parameter on `Broadcast.new_receiver()` calls.

## Bug Fixes

Expand Down
29 changes: 22 additions & 7 deletions src/frequenz/channels/_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def new_sender(self) -> Sender[ChannelMessageT]:
return _Sender(self)

def new_receiver(
self, *, name: str | None = None, limit: int = 50
self, *, name: str | None = None, limit: int = 50, warn_on_overflow: bool = True
) -> Receiver[ChannelMessageT]:
"""Return a new receiver attached to this channel.

Expand All @@ -278,11 +278,15 @@ def new_receiver(
Args:
name: A name to identify the receiver in the logs.
limit: Number of messages the receiver can hold in its buffer.
warn_on_overflow: Whether to log a warning when the receiver's
buffer is full and a message is dropped.

Returns:
A new receiver attached to this channel.
"""
recv: _Receiver[ChannelMessageT] = _Receiver(self, name=name, limit=limit)
recv: _Receiver[ChannelMessageT] = _Receiver(
self, name=name, limit=limit, warn_on_overflow=warn_on_overflow
)
self._receivers[hash(recv)] = weakref.ref(recv)
if self.resend_latest and self._latest is not None:
recv.enqueue(self._latest)
Expand Down Expand Up @@ -371,7 +375,13 @@ class _Receiver(Receiver[_T]):
"""

def __init__(
self, channel: Broadcast[_T], /, *, name: str | None, limit: int
self,
channel: Broadcast[_T],
/,
*,
name: str | None,
limit: int,
warn_on_overflow: bool,
) -> None:
"""Initialize this receiver.

Expand All @@ -387,7 +397,11 @@ def __init__(
purposes, it will be shown in the string representation of the
receiver.
limit: Number of messages the receiver can hold in its buffer.
warn_on_overflow: Whether to log a warning when the receiver's
buffer is full and a message is dropped.
"""
self._warn_on_overflow: bool = warn_on_overflow

self._name: str = name if name is not None else f"{id(self):_}"
"""The name to identify the receiver.

Expand All @@ -412,10 +426,11 @@ def enqueue(self, message: _T, /) -> None:
"""
if len(self._q) == self._q.maxlen:
self._q.popleft()
_logger.warning(
"Broadcast receiver [%s] is full. Oldest message was dropped.",
self,
)
if self._warn_on_overflow:
_logger.warning(
"Broadcast receiver [%s] is full. Oldest message was dropped.",
self,
)
self._q.append(message)

def __len__(self) -> int:
Expand Down
Loading