Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve documentation #279

Merged
merged 20 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/user-guide/receiving/synchronization/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ could be difficult to, for example, receive the first message of each receiver
as soon as it is available in one single task. A naive approach like this will
not work:

```python
```python show_lines="4:"
receiver1: Receiver[int] = channel1.new_receiver()
receiver2: Receiver[int] = channel2.new_receiver()

Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ plugins:
show_root_members_full_path: true
show_signature_annotations: true
show_source: true
show_symbol_type_toc: true
signature_crossrefs: true
import:
# See https://mkdocstrings.github.io/python/usage/#import for details
Expand Down
3 changes: 3 additions & 0 deletions src/frequenz/channels/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
This package contains
[channel](https://en.wikipedia.org/wiki/Channel_(programming)) implementations.

<!-- For the full documentation and user guide please visit the [project's
website](https://frequenz-floss.github.io/frequenz-channels-python/) -->

Base classes:

* [Receiver][frequenz.channels.Receiver]: An object that can wait for and
Expand Down
25 changes: 8 additions & 17 deletions src/frequenz/channels/_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ async def main() -> None:
"""

def __init__(self, *, name: str, limit: int = 10) -> None:
"""Create an Anycast channel.
"""Initialize this channel.

Args:
name: The name of the channel. This is for logging purposes, and it will be
Expand Down Expand Up @@ -275,7 +275,6 @@ async def close(self) -> None:
but after that, subsequent
[receive()][frequenz.channels.Receiver.receive] calls will return `None`
immediately.

"""
self._closed = True
async with self._send_cv:
Expand All @@ -284,19 +283,11 @@ async def close(self) -> None:
self._recv_cv.notify_all()

def new_sender(self) -> Sender[_T]:
"""Create a new sender.

Returns:
A Sender instance attached to the Anycast channel.
"""
"""Return a new sender attached to this channel."""
return _Sender(self)

def new_receiver(self) -> Receiver[_T]:
"""Create a new receiver.

Returns:
A Receiver instance attached to the Anycast channel.
"""
"""Return a new receiver attached to this channel."""
return _Receiver(self)

def __str__(self) -> str:
Expand All @@ -319,7 +310,7 @@ class _Sender(Sender[_T]):
"""

def __init__(self, chan: Anycast[_T]) -> None:
"""Create a channel sender.
"""Initialize this sender.

Args:
chan: A reference to the channel that this sender belongs to.
Expand All @@ -339,7 +330,7 @@ async def send(self, msg: _T) -> None:
msg: The message to be sent.

Raises:
SenderError: if the underlying channel was closed.
SenderError: If the underlying channel was closed.
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
set as the cause.
"""
Expand Down Expand Up @@ -387,7 +378,7 @@ class _Receiver(Receiver[_T]):
"""

def __init__(self, chan: Anycast[_T]) -> None:
"""Create a channel receiver.
"""Initialize this receiver.

Args:
chan: A reference to the channel that this receiver belongs to.
Expand Down Expand Up @@ -431,8 +422,8 @@ def consume(self) -> _T:
The next value that was received.

Raises:
ReceiverStoppedError: if the receiver stopped producing messages.
ReceiverError: if there is some problem with the receiver.
ReceiverStoppedError: If the receiver stopped producing messages.
ReceiverError: If there is some problem with the receiver.
"""
if ( # pylint: disable=protected-access
self._next is _Empty and self._chan._closed
Expand Down
24 changes: 10 additions & 14 deletions src/frequenz/channels/_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ async def main() -> None:
"""

def __init__(self, *, name: str, resend_latest: bool = False) -> None:
"""Create a Broadcast channel.
"""Initialize this channel.

Args:
name: The name of the channel. This is for logging purposes, and it will be
Expand Down Expand Up @@ -246,7 +246,7 @@ def is_closed(self) -> bool:
return self._closed

async def close(self) -> None:
"""Close the Broadcast channel.
"""Close this channel.

Any further attempts to [send()][frequenz.channels.Sender.send] data
will return `False`.
Expand All @@ -262,15 +262,11 @@ async def close(self) -> None:
self._recv_cv.notify_all()

def new_sender(self) -> Sender[_T]:
"""Create a new broadcast sender.

Returns:
A Sender instance attached to the broadcast channel.
"""
"""Return a new sender attached to this channel."""
return _Sender(self)

def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[_T]:
"""Create a new broadcast receiver.
"""Return a new receiver attached to this channel.

Broadcast receivers have their own buffer, and when messages are not
being consumed fast enough and the buffer fills up, old messages will
Expand All @@ -281,7 +277,7 @@ def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[
limit: Number of messages the receiver can hold in its buffer.

Returns:
A Receiver instance attached to the broadcast channel.
A new receiver attached to this channel.
"""
recv: _Receiver[_T] = _Receiver(name, limit, self)
self._receivers[hash(recv)] = weakref.ref(recv)
Expand All @@ -290,7 +286,7 @@ def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[
return recv

def __str__(self) -> str:
"""Return a string representation of this receiver."""
"""Return a string representation of this channel."""
return f"{type(self).__name__}:{self._name}"

def __repr__(self) -> str:
Expand All @@ -313,7 +309,7 @@ class _Sender(Sender[_T]):
"""

def __init__(self, chan: Broadcast[_T]) -> None:
"""Create a Broadcast sender.
"""Initialize this sender.

Args:
chan: A reference to the broadcast channel this sender belongs to.
Expand All @@ -328,7 +324,7 @@ async def send(self, msg: _T) -> None:
msg: The message to be broadcast.

Raises:
SenderError: if the underlying channel was closed.
SenderError: If the underlying channel was closed.
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
set as the cause.
"""
Expand Down Expand Up @@ -369,7 +365,7 @@ class _Receiver(Receiver[_T]):
"""

def __init__(self, name: str | None, limit: int, chan: Broadcast[_T]) -> None:
"""Create a broadcast receiver.
"""Initialize this receiver.

Broadcast receivers have their own buffer, and when messages are not
being consumed fast enough and the buffer fills up, old messages will
Expand Down Expand Up @@ -457,7 +453,7 @@ def consume(self) -> _T:
The next value that was received.

Raises:
ReceiverStoppedError: if there is some problem with the receiver.
ReceiverStoppedError: If there is some problem with the receiver.
"""
if not self._q and self._chan._closed: # pylint: disable=protected-access
raise ReceiverStoppedError(self) from ChannelClosedError(self._chan)
Expand Down
22 changes: 11 additions & 11 deletions src/frequenz/channels/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,45 +70,45 @@


class Error(RuntimeError):
"""Base error.
"""An error that originated in this library.

All exceptions generated by this library inherit from this exception.
This is useful if you want to catch all exceptions generated by this library.
"""

def __init__(self, message: str):
"""Create a ChannelError instance.
"""Initialize this error.

Args:
message: An error message.
message: The error message.
"""
super().__init__(message)


class ChannelError(Error):
"""An error produced in a channel.
"""An error that originated in a channel.

All exceptions generated by channels inherit from this exception.
"""

def __init__(self, message: str, channel: Any):
"""Create a ChannelError instance.
"""Initialize this error.

Args:
message: An error message.
channel: A reference to the channel that encountered the error.
message: The error message.
channel: The channel where the error happened.
"""
super().__init__(message)
self.channel: Any = channel
"""The channel where the error happened."""


class ChannelClosedError(ChannelError):
"""Error raised when trying to operate on a closed channel."""
"""A closed channel was used."""

def __init__(self, channel: Any):
"""Create a `ChannelClosedError` instance.
"""Initialize this error.

Args:
channel: A reference to the channel that was closed.
channel: The channel that was closed.
"""
super().__init__(f"Channel {channel} was closed", channel)
12 changes: 6 additions & 6 deletions src/frequenz/channels/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def merge(*receivers: Receiver[_T]) -> Merger[_T]:
single stream.

Raises:
ValueError: if no receivers are provided.
ValueError: If no receivers are provided.
"""
if not receivers:
raise ValueError("At least one receiver must be provided")
Expand All @@ -104,7 +104,7 @@ class Merger(Receiver[_T]):
"""

def __init__(self, *receivers: Receiver[_T], name: str | None) -> None:
"""Create a `Merger` instance.
"""Initialize this merger.

Args:
*receivers: The receivers to merge.
Expand All @@ -122,13 +122,13 @@ def __init__(self, *receivers: Receiver[_T], name: str | None) -> None:
self._results: deque[_T] = deque(maxlen=len(self._receivers))

def __del__(self) -> None:
"""Cleanup any pending tasks."""
"""Finalize this merger."""
for task in self._pending:
if not task.done() and task.get_loop().is_running():
task.cancel()

async def stop(self) -> None:
"""Stop the `Merger` instance and cleanup any pending tasks."""
"""Stop this merger."""
for task in self._pending:
task.cancel()
await asyncio.gather(*self._pending, return_exceptions=True)
Expand Down Expand Up @@ -177,8 +177,8 @@ def consume(self) -> _T:
The next value that was received.

Raises:
ReceiverStoppedError: if the receiver stopped producing messages.
ReceiverError: if there is some problem with the receiver.
ReceiverStoppedError: If the receiver stopped producing messages.
ReceiverError: If there is some problem with the receiver.
"""
if not self._results and not self._pending:
raise ReceiverStoppedError(self)
Expand Down
Loading
Loading