Skip to content

Commit

Permalink
Rename get_{sender,receiver} to new_{sender,receiver}
Browse files Browse the repository at this point in the history
Using `get_` as a prefix gives the idea that one is always getting the
same object, but these methods are indeed creating new objects.

Signed-off-by: Leandro Lucarella <[email protected]>
  • Loading branch information
llucax committed Nov 22, 2022
1 parent 5b47b03 commit ee3963d
Show file tree
Hide file tree
Showing 13 changed files with 74 additions and 70 deletions.
6 changes: 5 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ https://frequenz-floss.github.io/frequenz-channels-python/
For now the documentation is pretty scarce but we will be improving it with
time.

## Upgrading
## Upgrading (breaking changes)

* You need to make sure to use [timezone-aware] `datetime` objects when using
the timestamp returned by [`Timer`], Otherwise you will get an exception.

* Channels methods `get_receiver()` and `get_sender()` have been renamed to
`new_receiver()` and `new_sender()` respectively. This is to make it more
clear that new objects are being created.

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/benchmark_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async def benchmark_anycast(
"""
channels: List[Anycast[int]] = [Anycast(buffer_size) for _ in range(num_channels)]
senders = [
asyncio.create_task(send_msg(num_messages, bcast.get_sender()))
asyncio.create_task(send_msg(num_messages, bcast.new_sender()))
for bcast in channels
]

Expand All @@ -57,7 +57,7 @@ async def update_tracker_on_receive(chan: Receiver[int]) -> None:
receivers = []
for acast in channels:
for _ in range(num_receivers):
receivers.append(update_tracker_on_receive(acast.get_receiver()))
receivers.append(update_tracker_on_receive(acast.new_receiver()))

receivers_runs = asyncio.gather(*receivers)

Expand Down
8 changes: 4 additions & 4 deletions benchmarks/benchmark_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async def benchmark_broadcast(
"""
channels: List[Broadcast[int]] = [Broadcast("meter") for _ in range(num_channels)]
senders: List[asyncio.Task[Any]] = [
asyncio.create_task(send_msg(num_messages, bcast.get_sender()))
asyncio.create_task(send_msg(num_messages, bcast.new_sender()))
for bcast in channels
]

Expand All @@ -75,7 +75,7 @@ async def update_tracker_on_receive(chan: Receiver[int]) -> None:
receivers = []
for bcast in channels:
for _ in range(num_receivers):
receivers.append(update_tracker_on_receive(bcast.get_receiver()))
receivers.append(update_tracker_on_receive(bcast.new_receiver()))

receivers_runs = asyncio.gather(*receivers)

Expand Down Expand Up @@ -104,11 +104,11 @@ async def benchmark_single_task_broadcast(
int: Total number of messages received by all receivers.
"""
channels: List[Broadcast[int]] = [Broadcast("meter") for _ in range(num_channels)]
senders = [b.get_sender() for b in channels]
senders = [b.new_sender() for b in channels]
recv_tracker = 0

receivers = [
[bcast.get_receiver() for _ in range(num_receivers)] for bcast in channels
[bcast.new_receiver() for _ in range(num_receivers)] for bcast in channels
]

for ctr in range(num_messages):
Expand Down
12 changes: 6 additions & 6 deletions src/frequenz/channels/anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ async def recv(id: int, receiver: channel.Receiver) -> None:
acast = channel.Anycast()
sender = acast.get_sender()
receiver_1 = acast.get_receiver()
sender = acast.new_sender()
receiver_1 = acast.new_receiver()
asyncio.create_task(send(sender))
Expand Down Expand Up @@ -91,15 +91,15 @@ async def close(self) -> None:
async with self.recv_cv:
self.recv_cv.notify_all()

def get_sender(self) -> Sender[T]:
def new_sender(self) -> Sender[T]:
"""Create a new sender.
Returns:
A Sender instance attached to the Anycast channel.
"""
return Sender(self)

def get_receiver(self) -> Receiver[T]:
def new_receiver(self) -> Receiver[T]:
"""Create a new receiver.
Returns:
Expand All @@ -111,7 +111,7 @@ def get_receiver(self) -> Receiver[T]:
class Sender(BaseSender[T]):
"""A sender to send messages to an Anycast channel.
Should not be created directly, but through the `Anycast.get_sender()`
Should not be created directly, but through the `Anycast.ggetet_sender()`
method.
"""

Expand Down Expand Up @@ -152,7 +152,7 @@ async def send(self, msg: T) -> bool:
class Receiver(BaseReceiver[T]):
"""A receiver to receive messages from an Anycast channel.
Should not be created directly, but through the `Anycast.get_receiver()`
Should not be created directly, but through the `Anycast.new_receiver()`
method.
"""

Expand Down
2 changes: 1 addition & 1 deletion src/frequenz/channels/base_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def into_peekable(self) -> Peekable[T]:
Raises:
NotImplementedError: when a `Receiver` implementation doesn't have
a custom `get_peekable` implementation.
a custom `into_peekable` implementation.
"""
raise NotImplementedError("This receiver does not implement `into_peekable`")

Expand Down
8 changes: 4 additions & 4 deletions src/frequenz/channels/bidirectional.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ def __init__(self, client_id: str, service_id: str) -> None:
)

self._client_handle = BidirectionalHandle(
self._request_channel.get_sender(),
self._response_channel.get_receiver(),
self._request_channel.new_sender(),
self._response_channel.new_receiver(),
)
self._service_handle = BidirectionalHandle(
self._response_channel.get_sender(),
self._request_channel.get_receiver(),
self._response_channel.new_sender(),
self._request_channel.new_receiver(),
)

@property
Expand Down
16 changes: 8 additions & 8 deletions src/frequenz/channels/broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ async def recv(id: int, receiver: channel.Receiver) -> None:
bcast = channel.Broadcast()
sender = bcast.get_sender()
receiver_1 = bcast.get_receiver()
sender = bcast.new_sender()
receiver_1 = bcast.new_receiver()
asyncio.create_task(send(sender))
Expand All @@ -73,7 +73,7 @@ def __init__(self, name: str, resend_latest: bool = False) -> None:
of data sent through it. Used to identify the channel in the
logs.
resend_latest: When True, every time a new receiver is created with
`get_receiver`, it will automatically get sent the latest value
`new_receiver`, it will automatically get sent the latest value
on the channel. This allows new receivers on slow streams to
get the latest value as soon as they are created, without having
to wait for the next message on the channel to arrive.
Expand Down Expand Up @@ -102,15 +102,15 @@ async def close(self) -> None:
async with self.recv_cv:
self.recv_cv.notify_all()

def get_sender(self) -> Sender[T]:
def new_sender(self) -> Sender[T]:
"""Create a new broadcast sender.
Returns:
A Sender instance attached to the broadcast channel.
"""
return Sender(self)

def get_receiver(
def new_receiver(
self, name: Optional[str] = None, maxsize: int = 50
) -> Receiver[T]:
"""Create a new broadcast receiver.
Expand All @@ -135,7 +135,7 @@ def get_receiver(
recv.enqueue(self._latest)
return recv

def get_peekable(self) -> Peekable[T]:
def new_peekable(self) -> Peekable[T]:
"""Create a new Peekable for the broadcast channel.
A Peekable provides a [peek()][frequenz.channels.Peekable.peek] method
Expand All @@ -152,7 +152,7 @@ class Sender(BaseSender[T]):
"""A sender to send messages to the broadcast channel.
Should not be created directly, but through the
[Broadcast.get_sender()][frequenz.channels.Broadcast.get_sender]
[Broadcast.new_sender()][frequenz.channels.Broadcast.new_sender]
method.
"""

Expand Down Expand Up @@ -196,7 +196,7 @@ class Receiver(BufferedReceiver[T]):
"""A receiver to receive messages from the broadcast channel.
Should not be created directly, but through the
[Broadcast.get_receiver()][frequenz.channels.Broadcast.get_receiver]
[Broadcast.new_receiver()][frequenz.channels.Broadcast.new_receiver]
method.
"""

Expand Down
24 changes: 12 additions & 12 deletions tests/test_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ async def update_tracker_on_receive(receiver_id: int, chan: Receiver[int]) -> No

receivers = []
for ctr in range(num_receivers):
receivers.append(update_tracker_on_receive(ctr, acast.get_receiver()))
receivers.append(update_tracker_on_receive(ctr, acast.new_receiver()))

# get one more sender and receiver to test channel operations after the
# channel is closed.
after_close_receiver = acast.get_receiver()
after_close_sender = acast.get_sender()
after_close_receiver = acast.new_receiver()
after_close_sender = acast.new_sender()

receivers_runs = asyncio.gather(*receivers)
senders = []
for ctr in range(num_senders):
senders.append(send_msg(acast.get_sender()))
senders.append(send_msg(acast.new_sender()))

await asyncio.gather(*senders)
await acast.close()
Expand All @@ -75,8 +75,8 @@ async def test_anycast_after_close() -> None:
"""Ensure closed channels can't get new messages."""
acast: Anycast[int] = Anycast()

receiver = acast.get_receiver()
sender = acast.get_sender()
receiver = acast.new_receiver()
sender = acast.new_sender()

assert await sender.send(2) is True

Expand All @@ -94,8 +94,8 @@ async def test_anycast_full() -> None:
timeout = 0.2
acast: Anycast[int] = Anycast(buffer_size)

receiver = acast.get_receiver()
sender = acast.get_sender()
receiver = acast.new_receiver()
sender = acast.new_sender()

timeout_at = 0
for ctr in range(buffer_size + 1):
Expand Down Expand Up @@ -137,8 +137,8 @@ async def test_anycast_async_iterator() -> None:
"""Check that the anycast receiver works as an async iterator."""
acast: Anycast[str] = Anycast()

sender = acast.get_sender()
receiver = acast.get_receiver()
sender = acast.new_sender()
receiver = acast.new_receiver()

async def send_values() -> None:
for val in ["one", "two", "three", "four", "five"]:
Expand All @@ -159,10 +159,10 @@ async def send_values() -> None:
async def test_anycast_map() -> None:
"""Ensure map runs on all incoming messages."""
chan = Anycast[int]()
sender = chan.get_sender()
sender = chan.new_sender()

# transform int receiver into bool receiver.
receiver: Receiver[bool] = chan.get_receiver().map(lambda num: num > 10)
receiver: Receiver[bool] = chan.new_receiver().map(lambda num: num > 10)

await sender.send(8)
await sender.send(12)
Expand Down
44 changes: 22 additions & 22 deletions tests/test_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ async def update_tracker_on_receive(receiver_id: int, chan: Receiver[int]) -> No

receivers = []
for ctr in range(num_receivers):
receivers.append(update_tracker_on_receive(ctr, bcast.get_receiver()))
receivers.append(update_tracker_on_receive(ctr, bcast.new_receiver()))

receivers_runs = asyncio.gather(*receivers)
senders = []
for ctr in range(num_senders):
senders.append(send_msg(bcast.get_sender()))
senders.append(send_msg(bcast.new_sender()))

await asyncio.gather(*senders)
await bcast.close()
Expand All @@ -64,8 +64,8 @@ async def test_broadcast_after_close() -> None:
"""Ensure closed channels can't get new messages."""
bcast: Broadcast[int] = Broadcast("meter_5")

receiver = bcast.get_receiver()
sender = bcast.get_sender()
receiver = bcast.new_receiver()
sender = bcast.new_sender()

await bcast.close()

Expand All @@ -80,10 +80,10 @@ async def test_broadcast_overflow() -> None:

big_recv_size = 10
small_recv_size = int(big_recv_size / 2)
sender = bcast.get_sender()
sender = bcast.new_sender()

big_receiver = bcast.get_receiver("named-recv", big_recv_size)
small_receiver = bcast.get_receiver(None, small_recv_size)
big_receiver = bcast.new_receiver("named-recv", big_recv_size)
small_receiver = bcast.new_receiver(None, small_recv_size)

async def drain_receivers() -> Tuple[int, int]:
big_sum = 0
Expand Down Expand Up @@ -129,11 +129,11 @@ async def test_broadcast_resend_latest() -> None:
"""Check if new receivers get the latest value when resend_latest is set."""
bcast: Broadcast[int] = Broadcast("new_recv_test", resend_latest=True)

sender = bcast.get_sender()
old_recv = bcast.get_receiver()
sender = bcast.new_sender()
old_recv = bcast.new_receiver()
for val in range(0, 10):
await sender.send(val)
new_recv = bcast.get_receiver()
new_recv = bcast.new_receiver()

await sender.send(100)

Expand All @@ -146,11 +146,11 @@ async def test_broadcast_no_resend_latest() -> None:
"""Ensure new receivers don't get the latest value when resend_latest isn't set."""
bcast: Broadcast[int] = Broadcast("new_recv_test", resend_latest=False)

sender = bcast.get_sender()
old_recv = bcast.get_receiver()
sender = bcast.new_sender()
old_recv = bcast.new_receiver()
for val in range(0, 10):
await sender.send(val)
new_recv = bcast.get_receiver()
new_recv = bcast.new_receiver()

await sender.send(100)

Expand All @@ -161,9 +161,9 @@ async def test_broadcast_no_resend_latest() -> None:
async def test_broadcast_peek() -> None:
"""Ensure we are able to peek into broadcast channels."""
bcast: Broadcast[int] = Broadcast("peek-test")
receiver = bcast.get_receiver()
receiver = bcast.new_receiver()
peekable = receiver.into_peekable()
sender = bcast.get_sender()
sender = bcast.new_sender()

with pytest.raises(EOFError):
await receiver.receive()
Expand All @@ -188,8 +188,8 @@ async def test_broadcast_async_iterator() -> None:
"""Check that the broadcast receiver works as an async iterator."""
bcast: Broadcast[int] = Broadcast("iter_test")

sender = bcast.get_sender()
receiver = bcast.get_receiver()
sender = bcast.new_sender()
receiver = bcast.new_receiver()

async def send_values() -> None:
for val in range(0, 10):
Expand All @@ -210,10 +210,10 @@ async def send_values() -> None:
async def test_broadcast_map() -> None:
"""Ensure map runs on all incoming messages."""
chan = Broadcast[int]("input-chan")
sender = chan.get_sender()
sender = chan.new_sender()

# transform int receiver into bool receiver.
receiver: Receiver[bool] = chan.get_receiver().map(lambda num: num > 10)
receiver: Receiver[bool] = chan.new_receiver().map(lambda num: num > 10)

await sender.send(8)
await sender.send(12)
Expand All @@ -225,10 +225,10 @@ async def test_broadcast_map() -> None:
async def test_broadcast_receiver_drop() -> None:
"""Ensure deleted receivers get cleaned up."""
chan = Broadcast[int]("input-chan")
sender = chan.get_sender()
sender = chan.new_sender()

receiver1 = chan.get_receiver()
receiver2 = chan.get_receiver()
receiver1 = chan.new_receiver()
receiver2 = chan.new_receiver()

await sender.send(10)

Expand Down
Loading

0 comments on commit ee3963d

Please sign in to comment.