Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename get_{sender,receiver} to new_{sender,receiver} #49

Merged
merged 1 commit into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ https://frequenz-floss.github.io/frequenz-channels-python/
For now the documentation is pretty scarce but we will be improving it with
time.

## Upgrading
## Upgrading (breaking changes)

* You need to make sure to use [timezone-aware] `datetime` objects when using
the timestamp returned by [`Timer`], Otherwise you will get an exception.

* Channels methods `get_receiver()` and `get_sender()` have been renamed to
`new_receiver()` and `new_sender()` respectively. This is to make it more
clear that new objects are being created.

Comment on lines +17 to +20

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be listed under Breaking changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a (breaking changes) to Upgrading to make it more clear that this section list breaking changes. We can change the template if Upgrading is not clear enough. I used this to make it clear for people writing notes that it is important to say how to upgrade from breaking changes, not just listing them.

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/benchmark_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async def benchmark_anycast(
"""
channels: List[Anycast[int]] = [Anycast(buffer_size) for _ in range(num_channels)]
senders = [
asyncio.create_task(send_msg(num_messages, bcast.get_sender()))
asyncio.create_task(send_msg(num_messages, bcast.new_sender()))
for bcast in channels
]

Expand All @@ -57,7 +57,7 @@ async def update_tracker_on_receive(chan: Receiver[int]) -> None:
receivers = []
for acast in channels:
for _ in range(num_receivers):
receivers.append(update_tracker_on_receive(acast.get_receiver()))
receivers.append(update_tracker_on_receive(acast.new_receiver()))

receivers_runs = asyncio.gather(*receivers)

Expand Down
8 changes: 4 additions & 4 deletions benchmarks/benchmark_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async def benchmark_broadcast(
"""
channels: List[Broadcast[int]] = [Broadcast("meter") for _ in range(num_channels)]
senders: List[asyncio.Task[Any]] = [
asyncio.create_task(send_msg(num_messages, bcast.get_sender()))
asyncio.create_task(send_msg(num_messages, bcast.new_sender()))
for bcast in channels
]

Expand All @@ -75,7 +75,7 @@ async def update_tracker_on_receive(chan: Receiver[int]) -> None:
receivers = []
for bcast in channels:
for _ in range(num_receivers):
receivers.append(update_tracker_on_receive(bcast.get_receiver()))
receivers.append(update_tracker_on_receive(bcast.new_receiver()))

receivers_runs = asyncio.gather(*receivers)

Expand Down Expand Up @@ -104,11 +104,11 @@ async def benchmark_single_task_broadcast(
int: Total number of messages received by all receivers.
"""
channels: List[Broadcast[int]] = [Broadcast("meter") for _ in range(num_channels)]
senders = [b.get_sender() for b in channels]
senders = [b.new_sender() for b in channels]
recv_tracker = 0

receivers = [
[bcast.get_receiver() for _ in range(num_receivers)] for bcast in channels
[bcast.new_receiver() for _ in range(num_receivers)] for bcast in channels
]

for ctr in range(num_messages):
Expand Down
12 changes: 6 additions & 6 deletions src/frequenz/channels/anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ async def recv(id: int, receiver: channel.Receiver) -> None:

acast = channel.Anycast()

sender = acast.get_sender()
receiver_1 = acast.get_receiver()
sender = acast.new_sender()
receiver_1 = acast.new_receiver()

asyncio.create_task(send(sender))

Expand Down Expand Up @@ -91,15 +91,15 @@ async def close(self) -> None:
async with self.recv_cv:
self.recv_cv.notify_all()

def get_sender(self) -> Sender[T]:
def new_sender(self) -> Sender[T]:
"""Create a new sender.

Returns:
A Sender instance attached to the Anycast channel.
"""
return Sender(self)

def get_receiver(self) -> Receiver[T]:
def new_receiver(self) -> Receiver[T]:
"""Create a new receiver.

Returns:
Expand All @@ -111,7 +111,7 @@ def get_receiver(self) -> Receiver[T]:
class Sender(BaseSender[T]):
"""A sender to send messages to an Anycast channel.

Should not be created directly, but through the `Anycast.get_sender()`
Should not be created directly, but through the `Anycast.ggetet_sender()`
method.
"""

Expand Down Expand Up @@ -152,7 +152,7 @@ async def send(self, msg: T) -> bool:
class Receiver(BaseReceiver[T]):
"""A receiver to receive messages from an Anycast channel.

Should not be created directly, but through the `Anycast.get_receiver()`
Should not be created directly, but through the `Anycast.new_receiver()`
method.
"""

Expand Down
2 changes: 1 addition & 1 deletion src/frequenz/channels/base_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def into_peekable(self) -> Peekable[T]:

Raises:
NotImplementedError: when a `Receiver` implementation doesn't have
a custom `get_peekable` implementation.
a custom `into_peekable` implementation.
"""
raise NotImplementedError("This receiver does not implement `into_peekable`")

Expand Down
8 changes: 4 additions & 4 deletions src/frequenz/channels/bidirectional.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ def __init__(self, client_id: str, service_id: str) -> None:
)

self._client_handle = BidirectionalHandle(
self._request_channel.get_sender(),
self._response_channel.get_receiver(),
self._request_channel.new_sender(),
self._response_channel.new_receiver(),
)
self._service_handle = BidirectionalHandle(
self._response_channel.get_sender(),
self._request_channel.get_receiver(),
self._response_channel.new_sender(),
self._request_channel.new_receiver(),
)

@property
Expand Down
16 changes: 8 additions & 8 deletions src/frequenz/channels/broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ async def recv(id: int, receiver: channel.Receiver) -> None:

bcast = channel.Broadcast()

sender = bcast.get_sender()
receiver_1 = bcast.get_receiver()
sender = bcast.new_sender()
receiver_1 = bcast.new_receiver()

asyncio.create_task(send(sender))

Expand All @@ -73,7 +73,7 @@ def __init__(self, name: str, resend_latest: bool = False) -> None:
of data sent through it. Used to identify the channel in the
logs.
resend_latest: When True, every time a new receiver is created with
`get_receiver`, it will automatically get sent the latest value
`new_receiver`, it will automatically get sent the latest value
on the channel. This allows new receivers on slow streams to
get the latest value as soon as they are created, without having
to wait for the next message on the channel to arrive.
Expand Down Expand Up @@ -102,15 +102,15 @@ async def close(self) -> None:
async with self.recv_cv:
self.recv_cv.notify_all()

def get_sender(self) -> Sender[T]:
def new_sender(self) -> Sender[T]:
"""Create a new broadcast sender.

Returns:
A Sender instance attached to the broadcast channel.
"""
return Sender(self)

def get_receiver(
def new_receiver(
self, name: Optional[str] = None, maxsize: int = 50
) -> Receiver[T]:
"""Create a new broadcast receiver.
Expand All @@ -135,7 +135,7 @@ def get_receiver(
recv.enqueue(self._latest)
return recv

def get_peekable(self) -> Peekable[T]:
def new_peekable(self) -> Peekable[T]:
"""Create a new Peekable for the broadcast channel.

A Peekable provides a [peek()][frequenz.channels.Peekable.peek] method
Expand All @@ -152,7 +152,7 @@ class Sender(BaseSender[T]):
"""A sender to send messages to the broadcast channel.

Should not be created directly, but through the
[Broadcast.get_sender()][frequenz.channels.Broadcast.get_sender]
[Broadcast.new_sender()][frequenz.channels.Broadcast.new_sender]
method.
"""

Expand Down Expand Up @@ -196,7 +196,7 @@ class Receiver(BufferedReceiver[T]):
"""A receiver to receive messages from the broadcast channel.

Should not be created directly, but through the
[Broadcast.get_receiver()][frequenz.channels.Broadcast.get_receiver]
[Broadcast.new_receiver()][frequenz.channels.Broadcast.new_receiver]
method.
"""

Expand Down
24 changes: 12 additions & 12 deletions tests/test_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ async def update_tracker_on_receive(receiver_id: int, chan: Receiver[int]) -> No

receivers = []
for ctr in range(num_receivers):
receivers.append(update_tracker_on_receive(ctr, acast.get_receiver()))
receivers.append(update_tracker_on_receive(ctr, acast.new_receiver()))

# get one more sender and receiver to test channel operations after the
# channel is closed.
after_close_receiver = acast.get_receiver()
after_close_sender = acast.get_sender()
after_close_receiver = acast.new_receiver()
after_close_sender = acast.new_sender()

receivers_runs = asyncio.gather(*receivers)
senders = []
for ctr in range(num_senders):
senders.append(send_msg(acast.get_sender()))
senders.append(send_msg(acast.new_sender()))

await asyncio.gather(*senders)
await acast.close()
Expand All @@ -75,8 +75,8 @@ async def test_anycast_after_close() -> None:
"""Ensure closed channels can't get new messages."""
acast: Anycast[int] = Anycast()

receiver = acast.get_receiver()
sender = acast.get_sender()
receiver = acast.new_receiver()
sender = acast.new_sender()

assert await sender.send(2) is True

Expand All @@ -94,8 +94,8 @@ async def test_anycast_full() -> None:
timeout = 0.2
acast: Anycast[int] = Anycast(buffer_size)

receiver = acast.get_receiver()
sender = acast.get_sender()
receiver = acast.new_receiver()
sender = acast.new_sender()

timeout_at = 0
for ctr in range(buffer_size + 1):
Expand Down Expand Up @@ -137,8 +137,8 @@ async def test_anycast_async_iterator() -> None:
"""Check that the anycast receiver works as an async iterator."""
acast: Anycast[str] = Anycast()

sender = acast.get_sender()
receiver = acast.get_receiver()
sender = acast.new_sender()
receiver = acast.new_receiver()

async def send_values() -> None:
for val in ["one", "two", "three", "four", "five"]:
Expand All @@ -159,10 +159,10 @@ async def send_values() -> None:
async def test_anycast_map() -> None:
"""Ensure map runs on all incoming messages."""
chan = Anycast[int]()
sender = chan.get_sender()
sender = chan.new_sender()

# transform int receiver into bool receiver.
receiver: Receiver[bool] = chan.get_receiver().map(lambda num: num > 10)
receiver: Receiver[bool] = chan.new_receiver().map(lambda num: num > 10)

await sender.send(8)
await sender.send(12)
Expand Down
44 changes: 22 additions & 22 deletions tests/test_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ async def update_tracker_on_receive(receiver_id: int, chan: Receiver[int]) -> No

receivers = []
for ctr in range(num_receivers):
receivers.append(update_tracker_on_receive(ctr, bcast.get_receiver()))
receivers.append(update_tracker_on_receive(ctr, bcast.new_receiver()))

receivers_runs = asyncio.gather(*receivers)
senders = []
for ctr in range(num_senders):
senders.append(send_msg(bcast.get_sender()))
senders.append(send_msg(bcast.new_sender()))

await asyncio.gather(*senders)
await bcast.close()
Expand All @@ -64,8 +64,8 @@ async def test_broadcast_after_close() -> None:
"""Ensure closed channels can't get new messages."""
bcast: Broadcast[int] = Broadcast("meter_5")

receiver = bcast.get_receiver()
sender = bcast.get_sender()
receiver = bcast.new_receiver()
sender = bcast.new_sender()

await bcast.close()

Expand All @@ -80,10 +80,10 @@ async def test_broadcast_overflow() -> None:

big_recv_size = 10
small_recv_size = int(big_recv_size / 2)
sender = bcast.get_sender()
sender = bcast.new_sender()

big_receiver = bcast.get_receiver("named-recv", big_recv_size)
small_receiver = bcast.get_receiver(None, small_recv_size)
big_receiver = bcast.new_receiver("named-recv", big_recv_size)
small_receiver = bcast.new_receiver(None, small_recv_size)

async def drain_receivers() -> Tuple[int, int]:
big_sum = 0
Expand Down Expand Up @@ -129,11 +129,11 @@ async def test_broadcast_resend_latest() -> None:
"""Check if new receivers get the latest value when resend_latest is set."""
bcast: Broadcast[int] = Broadcast("new_recv_test", resend_latest=True)

sender = bcast.get_sender()
old_recv = bcast.get_receiver()
sender = bcast.new_sender()
old_recv = bcast.new_receiver()
for val in range(0, 10):
await sender.send(val)
new_recv = bcast.get_receiver()
new_recv = bcast.new_receiver()

await sender.send(100)

Expand All @@ -146,11 +146,11 @@ async def test_broadcast_no_resend_latest() -> None:
"""Ensure new receivers don't get the latest value when resend_latest isn't set."""
bcast: Broadcast[int] = Broadcast("new_recv_test", resend_latest=False)

sender = bcast.get_sender()
old_recv = bcast.get_receiver()
sender = bcast.new_sender()
old_recv = bcast.new_receiver()
for val in range(0, 10):
await sender.send(val)
new_recv = bcast.get_receiver()
new_recv = bcast.new_receiver()

await sender.send(100)

Expand All @@ -161,9 +161,9 @@ async def test_broadcast_no_resend_latest() -> None:
async def test_broadcast_peek() -> None:
"""Ensure we are able to peek into broadcast channels."""
bcast: Broadcast[int] = Broadcast("peek-test")
receiver = bcast.get_receiver()
receiver = bcast.new_receiver()
peekable = receiver.into_peekable()
sender = bcast.get_sender()
sender = bcast.new_sender()

with pytest.raises(EOFError):
await receiver.receive()
Expand All @@ -188,8 +188,8 @@ async def test_broadcast_async_iterator() -> None:
"""Check that the broadcast receiver works as an async iterator."""
bcast: Broadcast[int] = Broadcast("iter_test")

sender = bcast.get_sender()
receiver = bcast.get_receiver()
sender = bcast.new_sender()
receiver = bcast.new_receiver()

async def send_values() -> None:
for val in range(0, 10):
Expand All @@ -210,10 +210,10 @@ async def send_values() -> None:
async def test_broadcast_map() -> None:
"""Ensure map runs on all incoming messages."""
chan = Broadcast[int]("input-chan")
sender = chan.get_sender()
sender = chan.new_sender()

# transform int receiver into bool receiver.
receiver: Receiver[bool] = chan.get_receiver().map(lambda num: num > 10)
receiver: Receiver[bool] = chan.new_receiver().map(lambda num: num > 10)

await sender.send(8)
await sender.send(12)
Expand All @@ -225,10 +225,10 @@ async def test_broadcast_map() -> None:
async def test_broadcast_receiver_drop() -> None:
"""Ensure deleted receivers get cleaned up."""
chan = Broadcast[int]("input-chan")
sender = chan.get_sender()
sender = chan.new_sender()

receiver1 = chan.get_receiver()
receiver2 = chan.get_receiver()
receiver1 = chan.new_receiver()
receiver2 = chan.new_receiver()

await sender.send(10)

Expand Down
Loading