-
Notifications
You must be signed in to change notification settings - Fork 5.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
.NET combine Handle and CallHandler #4386
base: main
Are you sure you want to change the base?
Changes from 13 commits
40c4a0d
100f893
288a9cb
50dea76
d5a5fe3
f2ca194
4e53aca
2dc5246
d1de9f6
b3a6941
010aa8e
bfa5c1b
0e05a41
d2921c3
6d223b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,7 +12,7 @@ | |
|
||
namespace Microsoft.AutoGen.Agents; | ||
|
||
public abstract class AgentBase : IAgentBase, IHandle | ||
public abstract class AgentBase : IAgentBase, IHandle, IHandle<CloudEvent> | ||
{ | ||
public static readonly ActivitySource s_source = new("AutoGen.Agent"); | ||
public AgentId AgentId => _runtime.AgentId; | ||
|
@@ -106,7 +106,7 @@ protected internal async Task HandleRpcMessage(Message msg, CancellationToken ca | |
{ | ||
var activity = this.ExtractActivity(msg.CloudEvent.Type, msg.CloudEvent.Metadata); | ||
await this.InvokeWithActivityAsync( | ||
static ((AgentBase Agent, CloudEvent Item) state, CancellationToken _) => state.Agent.CallHandler(state.Item), | ||
static ((AgentBase Agent, CloudEvent Item) state, CancellationToken _) => state.Agent.HandleObject(state.Item), | ||
(this, msg.CloudEvent), | ||
activity, | ||
msg.CloudEvent.Type, cancellationToken).ConfigureAwait(false); | ||
|
@@ -255,33 +255,43 @@ static async ((AgentBase Agent, CloudEvent Event) state, CancellationToken ct) = | |
item.Type, cancellationToken).ConfigureAwait(false); | ||
} | ||
|
||
public Task CallHandler(CloudEvent item) | ||
public Task<RpcResponse> HandleRequest(RpcRequest request) => Task.FromResult(new RpcResponse { Error = "Not implemented" }); | ||
|
||
public virtual Task HandleObject(object item) | ||
{ | ||
if (item is CloudEvent ce) | ||
{ | ||
return Handle(ce); | ||
} | ||
|
||
var genericInterfaceType = typeof(IHandle<>).MakeGenericType(item.GetType()); | ||
|
||
// check that our target actually implements this interface, otherwise call the default static | ||
if (genericInterfaceType.IsAssignableFrom(this.GetType())) | ||
{ | ||
var methodInfo = genericInterfaceType.GetMethod(nameof(IHandle<object>.Handle), BindingFlags.Public | BindingFlags.Instance) | ||
?? throw new InvalidOperationException($"Method not found on type {genericInterfaceType.FullName}"); | ||
|
||
return methodInfo.Invoke(this, [item]) as Task ?? throw new InvalidOperationException("Method did not return a Task"); | ||
} | ||
|
||
// otherwise, complain | ||
throw new InvalidOperationException($"No handler found for type {item.GetType().FullName}"); | ||
} | ||
|
||
public virtual Task Handle(CloudEvent item) | ||
{ | ||
// Only send the event to the handler if the agent type is handling that type | ||
// foreach of the keys in the EventTypes.EventsMap[] if it contains the item.type | ||
foreach (var key in EventTypes.EventsMap.Keys) | ||
{ | ||
if (EventTypes.EventsMap[key].Contains(item.Type)) | ||
{ | ||
var payload = item.ProtoData.Unpack(EventTypes.TypeRegistry); | ||
var convertedPayload = Convert.ChangeType(payload, EventTypes.Types[item.Type]); | ||
var genericInterfaceType = typeof(IHandle<>).MakeGenericType(EventTypes.Types[item.Type]); | ||
|
||
MethodInfo methodInfo; | ||
try | ||
{ | ||
// check that our target actually implements this interface, otherwise call the default static | ||
if (genericInterfaceType.IsAssignableFrom(this.GetType())) | ||
{ | ||
methodInfo = genericInterfaceType.GetMethod(nameof(IHandle<object>.Handle), BindingFlags.Public | BindingFlags.Instance) | ||
?? throw new InvalidOperationException($"Method not found on type {genericInterfaceType.FullName}"); | ||
return methodInfo.Invoke(this, [payload]) as Task ?? Task.CompletedTask; | ||
} | ||
else | ||
{ | ||
// The error here is we have registered for an event that we do not have code to listen to | ||
throw new InvalidOperationException($"No handler found for event '{item.Type}'; expecting IHandle<{item.Type}> implementation."); | ||
} | ||
var payload = item.ProtoData.Unpack(EventTypes.TypeRegistry); | ||
var convertedPayload = Convert.ChangeType(payload, EventTypes.Types[item.Type]); | ||
return this.HandleObject(convertedPayload); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand - with this all agents can handle all messages? I don't think that's what you want..... Maybe a quick call to discuss because I'm not understanding it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
} | ||
catch (Exception ex) | ||
{ | ||
|
@@ -293,29 +303,4 @@ public Task CallHandler(CloudEvent item) | |
|
||
return Task.CompletedTask; | ||
} | ||
|
||
public Task<RpcResponse> HandleRequest(RpcRequest request) => Task.FromResult(new RpcResponse { Error = "Not implemented" }); | ||
|
||
//TODO: should this be async and cancellable? | ||
public virtual Task HandleObject(object item) | ||
{ | ||
// get all Handle<T> methods | ||
var handleTMethods = this.GetType().GetMethods().Where(m => m.Name == "Handle" && m.GetParameters().Length == 1).ToList(); | ||
|
||
// get the one that matches the type of the item | ||
var handleTMethod = handleTMethods.FirstOrDefault(m => m.GetParameters()[0].ParameterType == item.GetType()); | ||
|
||
// if we found one, invoke it | ||
if (handleTMethod != null) | ||
{ | ||
return (Task)handleTMethod.Invoke(this, [item])!; | ||
} | ||
|
||
// otherwise, complain | ||
throw new InvalidOperationException($"No handler found for type {item.GetType().FullName}"); | ||
} | ||
public async ValueTask PublishEventAsync(string topic, IMessage evt, CancellationToken cancellationToken = default) | ||
{ | ||
await PublishEventAsync(evt.ToCloudEvent(topic), cancellationToken).ConfigureAwait(false); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is used in other places (eg App.cs) why remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
IAgentBase
is already deleted by recent PR. So this change no longer apply.Anyway, the idea of moving the overloaded
PublishEventAsync
as an extension method is to simplify the interface definition