Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC over events #4414

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Draft
47 changes: 5 additions & 42 deletions protos/agent_worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,11 @@ option csharp_namespace = "Microsoft.AutoGen.Contracts";
import "cloudevent.proto";
import "google/protobuf/any.proto";

message TopicId {
string type = 1;
string source = 2;
}

message AgentId {
string type = 1;
string key = 2;
}

message Payload {
string data_type = 1;
string data_content_type = 2;
bytes data = 3;
}

message RpcRequest {
string request_id = 1;
optional AgentId source = 2;
AgentId target = 3;
string method = 4;
Payload payload = 5;
map<string, string> metadata = 6;
}

message RpcResponse {
string request_id = 1;
Payload payload = 2;
string error = 3;
map<string, string> metadata = 4;
}

message Event {
string topic_type = 1;
string topic_source = 2;
optional AgentId source = 3;
Payload payload = 4;
map<string, string> metadata = 5;
}

message RegisterAgentTypeRequest {
string request_id = 1;
string type = 2;
Expand Down Expand Up @@ -115,13 +80,11 @@ message SaveStateResponse {

message Message {
oneof message {
RpcRequest request = 1;
RpcResponse response = 2;
io.cloudevents.v1.CloudEvent cloudEvent = 3;
RegisterAgentTypeRequest registerAgentTypeRequest = 4;
RegisterAgentTypeResponse registerAgentTypeResponse = 5;
AddSubscriptionRequest addSubscriptionRequest = 6;
AddSubscriptionResponse addSubscriptionResponse = 7;
io.cloudevents.v1.CloudEvent cloudEvent = 1;
RegisterAgentTypeRequest registerAgentTypeRequest = 2;
RegisterAgentTypeResponse registerAgentTypeResponse = 3;
AddSubscriptionRequest addSubscriptionRequest = 4;
AddSubscriptionResponse addSubscriptionResponse = 5;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ def __init__(self, description: str) -> None:
super().__init__(description=description)
self._fifo_lock = FIFOLock()

async def on_message_impl(self, message: Any, ctx: MessageContext) -> Any | None:
async def on_message_impl(self, message: Any, ctx: MessageContext) -> None:
await self._fifo_lock.acquire()
try:
return await super().on_message_impl(message, ctx)
await super().on_message_impl(message, ctx)
finally:
self._fifo_lock.release()
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@
},
{
"cell_type": "code",
"execution_count": 6,
"execution_count": null,
"metadata": {},
"outputs": [
{
Expand Down Expand Up @@ -222,7 +222,6 @@
"await runtime.send_message(\n",
" Message(\"Joe, tell me a joke.\"),\n",
" recipient=AgentId(joe, \"default\"),\n",
" sender=AgentId(cathy, \"default\"),\n",
")\n",
"await runtime.stop_when_idle()"
]
Expand Down
10 changes: 4 additions & 6 deletions python/packages/autogen-core/samples/slow_human_in_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from typing import Any, Mapping, Optional

from autogen_core import (
AgentId,
CancellationToken,
DefaultTopicId,
FunctionCall,
Expand All @@ -41,7 +40,6 @@
message_handler,
type_subscription,
)
from autogen_core.base.intervention import DefaultInterventionHandler
from autogen_core.model_context import BufferedChatCompletionContext
from autogen_core.models import (
AssistantMessage,
Expand Down Expand Up @@ -207,11 +205,11 @@ async def load_state(self, state: Mapping[str, Any]) -> None:
self._model_context.load_state({**state["memory"], "messages": [m for m in state["memory"]["messages"]]})


class NeedsUserInputHandler(DefaultInterventionHandler):
class NeedsUserInputHandler:
def __init__(self):
self.question_for_user: GetSlowUserMessage | None = None

async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any:
async def __call__(self, message: Any, message_context: MessageContext) -> Any:
if isinstance(message, GetSlowUserMessage):
self.question_for_user = message
return message
Expand All @@ -227,11 +225,11 @@ def user_input_content(self) -> str | None:
return self.question_for_user.content


class TerminationHandler(DefaultInterventionHandler):
class TerminationHandler:
def __init__(self):
self.terminateMessage: TerminateMessage | None = None

async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any:
async def __call__(self, message: Any, message_context: MessageContext) -> Any:
if isinstance(message, TerminateMessage):
self.terminateMessage = message
return message
Expand Down
3 changes: 3 additions & 0 deletions python/packages/autogen-core/src/autogen_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ._default_subscription import DefaultSubscription, default_subscription, type_subscription
from ._default_topic import DefaultTopicId
from ._image import Image
from ._intervention import DropMessage, InterventionFunction
from ._message_context import MessageContext
from ._message_handler_context import MessageHandlerContext
from ._routed_agent import RoutedAgent, event, message_handler, rpc
Expand Down Expand Up @@ -99,4 +100,6 @@
"ROOT_LOGGER_NAME",
"EVENT_LOGGER_NAME",
"TRACE_LOGGER_NAME",
"DropMessage",
"InterventionFunction",
]
5 changes: 1 addition & 4 deletions python/packages/autogen-core/src/autogen_core/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@ def id(self) -> AgentId:
"""ID of the agent."""
...

async def on_message(self, message: Any, ctx: MessageContext) -> Any:
async def on_message(self, message: Any, ctx: MessageContext) -> None:
"""Message handler for the agent. This should only be called by the runtime, not by other agents.

Args:
message (Any): Received message. Type is one of the types in `subscriptions`.
ctx (MessageContext): Context of the message.

Returns:
Any: Response to the message. Can be None.

Raises:
asyncio.CancelledError: If the message was cancelled.
CantHandleException: If the agent cannot handle the message.
Expand Down
2 changes: 0 additions & 2 deletions python/packages/autogen-core/src/autogen_core/_agent_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ async def send_message(
self,
message: Any,
*,
sender: AgentId,
cancellation_token: CancellationToken | None = None,
) -> Any:
return await self._runtime.send_message(
message,
recipient=self._agent,
sender=sender,
cancellation_token=cancellation_token,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ async def send_message(
message: Any,
recipient: AgentId,
*,
sender: AgentId | None = None,
cancellation_token: CancellationToken | None = None,
) -> Any:
"""Send a message to an agent and get a response.
Expand Down
Loading