-
Notifications
You must be signed in to change notification settings - Fork 1
fix: incorporate post-merge feedback + pre-PR review fixes #164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
7923a58
dc8b238
e4bdf98
23bb067
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -5,6 +5,7 @@ | |||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| import asyncio | ||||||||||||||||||||||||||||||||||
| import contextlib | ||||||||||||||||||||||||||||||||||
| from collections import deque | ||||||||||||||||||||||||||||||||||
| from datetime import UTC, datetime | ||||||||||||||||||||||||||||||||||
| from typing import NoReturn | ||||||||||||||||||||||||||||||||||
|
|
@@ -38,11 +39,14 @@ | |||||||||||||||||||||||||||||||||
| COMM_HISTORY_QUERIED, | ||||||||||||||||||||||||||||||||||
| COMM_MESSAGE_DELIVERED, | ||||||||||||||||||||||||||||||||||
| COMM_MESSAGE_PUBLISHED, | ||||||||||||||||||||||||||||||||||
| COMM_RECEIVE_SHUTDOWN, | ||||||||||||||||||||||||||||||||||
| COMM_RECEIVE_TIMEOUT, | ||||||||||||||||||||||||||||||||||
| COMM_RECEIVE_UNSUBSCRIBED, | ||||||||||||||||||||||||||||||||||
| COMM_SEND_DIRECT_INVALID, | ||||||||||||||||||||||||||||||||||
| COMM_SUBSCRIPTION_CREATED, | ||||||||||||||||||||||||||||||||||
| COMM_SUBSCRIPTION_NOT_FOUND, | ||||||||||||||||||||||||||||||||||
| COMM_SUBSCRIPTION_REMOVED, | ||||||||||||||||||||||||||||||||||
| COMM_UNSUBSCRIBE_SENTINEL_FAILED, | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| logger = get_logger(__name__) | ||||||||||||||||||||||||||||||||||
|
|
@@ -93,7 +97,7 @@ def __init__(self, *, config: MessageBusConfig) -> None: | |||||||||||||||||||||||||||||||||
| self._config = config | ||||||||||||||||||||||||||||||||||
| self._lock = asyncio.Lock() | ||||||||||||||||||||||||||||||||||
| self._channels: dict[str, Channel] = {} | ||||||||||||||||||||||||||||||||||
| self._queues: dict[tuple[str, str], asyncio.Queue[DeliveryEnvelope]] = {} | ||||||||||||||||||||||||||||||||||
| self._queues: dict[tuple[str, str], asyncio.Queue[DeliveryEnvelope | None]] = {} | ||||||||||||||||||||||||||||||||||
| self._history: dict[str, deque[Message]] = {} | ||||||||||||||||||||||||||||||||||
| self._known_agents: set[str] = set() | ||||||||||||||||||||||||||||||||||
| self._running = False | ||||||||||||||||||||||||||||||||||
|
|
@@ -154,7 +158,7 @@ def _ensure_queue( | |||||||||||||||||||||||||||||||||
| self, | ||||||||||||||||||||||||||||||||||
| channel_name: str, | ||||||||||||||||||||||||||||||||||
| subscriber_id: str, | ||||||||||||||||||||||||||||||||||
| ) -> asyncio.Queue[DeliveryEnvelope]: | ||||||||||||||||||||||||||||||||||
| ) -> asyncio.Queue[DeliveryEnvelope | None]: | ||||||||||||||||||||||||||||||||||
| """Get or create a per-(channel, subscriber) queue.""" | ||||||||||||||||||||||||||||||||||
| return self._queues.setdefault( | ||||||||||||||||||||||||||||||||||
| (channel_name, subscriber_id), | ||||||||||||||||||||||||||||||||||
|
|
@@ -395,7 +399,17 @@ async def unsubscribe( | |||||||||||||||||||||||||||||||||
| self._channels[channel_name] = channel.model_copy( | ||||||||||||||||||||||||||||||||||
| update={"subscribers": new_subs}, | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
| self._queues.pop((channel_name, subscriber_id), None) | ||||||||||||||||||||||||||||||||||
| queue = self._queues.pop((channel_name, subscriber_id), None) | ||||||||||||||||||||||||||||||||||
| if queue is not None: | ||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||
| queue.put_nowait(None) | ||||||||||||||||||||||||||||||||||
| except asyncio.QueueFull: | ||||||||||||||||||||||||||||||||||
| logger.exception( | ||||||||||||||||||||||||||||||||||
| COMM_UNSUBSCRIBE_SENTINEL_FAILED, | ||||||||||||||||||||||||||||||||||
| channel=channel_name, | ||||||||||||||||||||||||||||||||||
| subscriber=subscriber_id, | ||||||||||||||||||||||||||||||||||
| detail="Queue full — unsubscribe sentinel not delivered", | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
|
Comment on lines
+402
to
+412
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If an explicit capacity limit is ever added to the queue in the future, the handler would kick in correctly. For now, consider either:
Suggested change
Prompt To Fix With AIThis is a comment left during a code review.
Path: src/ai_company/communication/bus_memory.py
Line: 402-412
Comment:
`asyncio.QueueFull` can never be raised here — dead code branch
`_ensure_queue` creates queues with `asyncio.Queue()` (no `maxsize` argument), which defaults to `maxsize=0` — an unbounded queue. `put_nowait` on an unbounded queue never raises `QueueFull`, so this `except` branch is unreachable dead code.
If an explicit capacity limit is ever added to the queue in the future, the handler would kick in correctly. For now, consider either:
1. Removing the `try/except` and just calling `queue.put_nowait(None)` unconditionally (it can't raise), or
2. Adding a comment explaining this is a forward-compatible guard.
```suggestion
queue = self._queues.pop((channel_name, subscriber_id), None)
if queue is not None:
# Queue is unbounded (asyncio.Queue() default maxsize=0),
# so put_nowait cannot raise QueueFull here.
queue.put_nowait(None)
```
How can I resolve this? If you propose a fix, please make it concise. |
||||||||||||||||||||||||||||||||||
| logger.info( | ||||||||||||||||||||||||||||||||||
| COMM_SUBSCRIPTION_REMOVED, | ||||||||||||||||||||||||||||||||||
| channel=channel_name, | ||||||||||||||||||||||||||||||||||
|
|
@@ -415,13 +429,19 @@ async def receive( | |||||||||||||||||||||||||||||||||
| the bus is stopped. When ``timeout`` is ``None``, awaits | ||||||||||||||||||||||||||||||||||
| indefinitely (or until shutdown). | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Note: Only one ``receive()`` call should be pending per | ||||||||||||||||||||||||||||||||||
| ``(channel_name, subscriber_id)`` pair at a time. The | ||||||||||||||||||||||||||||||||||
| unsubscribe sentinel wakes a single waiter; concurrent | ||||||||||||||||||||||||||||||||||
| receivers on the same subscription are not supported. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||||||||
| channel_name: Channel to receive from. | ||||||||||||||||||||||||||||||||||
| subscriber_id: Agent ID receiving. | ||||||||||||||||||||||||||||||||||
| timeout: Seconds to wait before returning ``None``. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Returns: | ||||||||||||||||||||||||||||||||||
| A delivery envelope, or ``None`` on timeout or shutdown. | ||||||||||||||||||||||||||||||||||
| A delivery envelope, or ``None`` on timeout, shutdown, | ||||||||||||||||||||||||||||||||||
| or when an in-flight receive is woken by :meth:`unsubscribe`. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Raises: | ||||||||||||||||||||||||||||||||||
| MessageBusNotRunningError: If the bus is not running. | ||||||||||||||||||||||||||||||||||
|
|
@@ -442,17 +462,39 @@ async def receive( | |||||||||||||||||||||||||||||||||
| queue = self._ensure_queue(channel_name, subscriber_id) | ||||||||||||||||||||||||||||||||||
| result = await self._await_with_shutdown(queue, timeout) | ||||||||||||||||||||||||||||||||||
| if result is None: | ||||||||||||||||||||||||||||||||||
| self._log_receive_null(channel_name, subscriber_id, timeout) | ||||||||||||||||||||||||||||||||||
| return result | ||||||||||||||||||||||||||||||||||
|
Comment on lines
462
to
+466
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def _log_receive_null( | ||||||||||||||||||||||||||||||||||
| self, | ||||||||||||||||||||||||||||||||||
| channel_name: str, | ||||||||||||||||||||||||||||||||||
| subscriber_id: str, | ||||||||||||||||||||||||||||||||||
| timeout: float | None, | ||||||||||||||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||||||||||||||
| """Log the cause when ``receive()`` returns ``None``.""" | ||||||||||||||||||||||||||||||||||
| if self._shutdown_event.is_set(): | ||||||||||||||||||||||||||||||||||
| logger.debug( | ||||||||||||||||||||||||||||||||||
| COMM_RECEIVE_SHUTDOWN, | ||||||||||||||||||||||||||||||||||
| channel=channel_name, | ||||||||||||||||||||||||||||||||||
| subscriber=subscriber_id, | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
| elif (channel_name, subscriber_id) not in self._queues: | ||||||||||||||||||||||||||||||||||
| logger.debug( | ||||||||||||||||||||||||||||||||||
| COMM_RECEIVE_UNSUBSCRIBED, | ||||||||||||||||||||||||||||||||||
| channel=channel_name, | ||||||||||||||||||||||||||||||||||
| subscriber=subscriber_id, | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
|
Comment on lines
+481
to
+486
|
||||||||||||||||||||||||||||||||||
| elif (channel_name, subscriber_id) not in self._queues: | |
| logger.debug( | |
| COMM_RECEIVE_UNSUBSCRIBED, | |
| channel=channel_name, | |
| subscriber=subscriber_id, | |
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return the wake-up reason explicitly instead of inferring it from current state.
_log_receive_null() decides between timeout/shutdown/unsubscribe by checking _shutdown_event and _queues after the await has already finished. That makes the log racey: a timeout can be logged as shutdown if stop() runs just after the wait returns, and an unsubscribe wake can be logged as timeout if the same subscriber is re-added before the helper runs. Have _await_with_shutdown() return both the envelope and a reason enum/literal so the log reflects the actual wake-up cause.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/communication/bus_memory.py` around lines 464 - 493, Change
_await_with_shutdown to return a tuple (result, reason) where reason is a
literal/enum like "shutdown" | "timeout" | "unsubscribed"; update callers (e.g.,
receive()) to unpack that tuple and pass the explicit reason into
_log_receive_null instead of letting _log_receive_null re-check _shutdown_event
or _queues; modify _log_receive_null signature to accept the reason and log
COMM_RECEIVE_SHUTDOWN / COMM_RECEIVE_TIMEOUT / COMM_RECEIVE_UNSUBSCRIBED based
solely on the supplied reason so the logged cause matches the actual wake-up
source.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,7 +10,10 @@ | |
| MessageHandlerFunc, | ||
| ) | ||
| from ai_company.communication.message import Message | ||
| from ai_company.communication.subscription import Subscription # noqa: TC001 | ||
| from ai_company.communication.subscription import ( # noqa: TC001 | ||
| DeliveryEnvelope, | ||
| Subscription, | ||
| ) | ||
| from ai_company.observability import get_logger | ||
| from ai_company.observability.events.communication import ( | ||
| COMM_DISPATCH_NO_DISPATCHER, | ||
|
|
@@ -269,6 +272,28 @@ async def unsubscribe(self, channel_name: str) -> None: | |
| channel=channel_name, | ||
| ) | ||
|
|
||
| async def receive( | ||
| self, | ||
| channel_name: str, | ||
| *, | ||
| timeout: float | None = None, # noqa: ASYNC109 | ||
| ) -> DeliveryEnvelope | None: | ||
|
Comment on lines
+275
to
+280
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use This adds a new public identifier field as plain As per coding guidelines, "Use NotBlankStr (from core.types) for all identifier/name fields — including optional (NotBlankStr | None) and tuple (tuple[NotBlankStr, ...]) variants — instead of manual whitespace validators". 🤖 Prompt for AI Agents |
||
| """Receive the next message from a channel. | ||
|
|
||
| Args: | ||
| channel_name: Channel to receive from. | ||
| timeout: Max seconds to wait, or ``None`` for indefinite. | ||
|
|
||
| Returns: | ||
| The next delivery envelope, or ``None`` on timeout, shutdown, | ||
| or when an in-flight receive is woken by :meth:`unsubscribe`. | ||
| """ | ||
| return await self._bus.receive( | ||
| channel_name, | ||
| self._agent_id, | ||
| timeout=timeout, | ||
| ) | ||
|
|
||
| def register_handler( | ||
| self, | ||
| handler: MessageHandler | MessageHandlerFunc, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A single unsubscribe sentinel only wakes one waiter.
If two
receive()calls are pending on the same(channel, subscriber)queue, one consumes theNoneand the rest stay blocked forever because the queue has already been removed from_queues. Use a per-subscription event, or otherwise broadcast the unsubscribe signal, so every pending receiver wakes.🤖 Prompt for AI Agents