1
1
import asyncio
2
+ import logging
2
3
from asyncio import Future
4
+ from collections .abc import Sequence
3
5
from dataclasses import dataclass
4
6
from typing import Any , Awaitable , Dict , List , Mapping , Set
5
7
6
8
from ..core import Agent , AgentRuntime , CancellationToken
7
9
from ..core .exceptions import MessageDroppedException
8
10
from ..core .intervention import DropMessage , InterventionHandler
9
11
12
+ logger = logging .getLogger ("agnext" )
13
+
10
14
11
15
@dataclass (kw_only = True )
12
16
class PublishMessageEnvelope :
@@ -58,6 +62,14 @@ def add_agent(self, agent: Agent) -> None:
58
62
self ._per_type_subscribers [message_type ].append (agent )
59
63
self ._agents .add (agent )
60
64
65
+ @property
66
+ def agents (self ) -> Sequence [Agent ]:
67
+ return list (self ._agents )
68
+
69
+ @property
70
+ def unprocessed_messages (self ) -> Sequence [PublishMessageEnvelope | SendMessageEnvelope | ResponseMessageEnvelope ]:
71
+ return self ._message_queue
72
+
61
73
# Returns the response of the message
62
74
def send_message (
63
75
self ,
@@ -123,6 +135,10 @@ async def _process_send(self, message_envelope: SendMessageEnvelope) -> None:
123
135
assert recipient in self ._agents
124
136
125
137
try :
138
+ sender_name = message_envelope .sender .name if message_envelope .sender is not None else "Unknown"
139
+ logger .info (
140
+ f"Calling message handler for { recipient .name } with message type { type (message_envelope .message ).__name__ } from { sender_name } "
141
+ )
126
142
response = await recipient .on_message (
127
143
message_envelope .message ,
128
144
cancellation_token = message_envelope .cancellation_token ,
@@ -145,6 +161,11 @@ async def _process_publish(self, message_envelope: PublishMessageEnvelope) -> No
145
161
for agent in self ._per_type_subscribers .get (type (message_envelope .message ), []): # type: ignore
146
162
if message_envelope .sender is not None and agent .name == message_envelope .sender .name :
147
163
continue
164
+
165
+ logger .info (
166
+ f"Calling message handler for { agent .name } with message type { type (message_envelope .message ).__name__ } published by { message_envelope .sender .name if message_envelope .sender is not None else 'Unknown' } "
167
+ )
168
+
148
169
future = agent .on_message (
149
170
message_envelope .message ,
150
171
cancellation_token = message_envelope .cancellation_token ,
@@ -154,12 +175,16 @@ async def _process_publish(self, message_envelope: PublishMessageEnvelope) -> No
154
175
try :
155
176
_all_responses = await asyncio .gather (* responses )
156
177
except BaseException :
157
- # TODO log error
178
+ logger . error ( "Error processing publish message" , exc_info = True )
158
179
return
159
180
160
181
# TODO if responses are given for a publish
161
182
162
183
async def _process_response (self , message_envelope : ResponseMessageEnvelope ) -> None :
184
+ recipient_name = message_envelope .recipient .name if message_envelope .recipient is not None else "Unknown"
185
+ logger .info (
186
+ f"Resolving response for recipient { recipient_name } from { message_envelope .sender .name } with message type { type (message_envelope .message ).__name__ } "
187
+ )
163
188
message_envelope .future .set_result (message_envelope .message )
164
189
165
190
async def process_next (self ) -> None :
0 commit comments