Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.NET combine Handle and CallHandler #4386

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 30 additions & 50 deletions dotnet/src/Microsoft.AutoGen/Core/Agent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

namespace Microsoft.AutoGen.Core;

public abstract class Agent : IDisposable, IHandle
public abstract class Agent : IHandle
{
public static readonly ActivitySource s_source = new("AutoGen.Agent");
public AgentId AgentId => _runtime.AgentId;
Expand Down Expand Up @@ -135,7 +135,7 @@ protected internal async Task HandleRpcMessage(Message msg, CancellationToken ca
{
var activity = this.ExtractActivity(msg.CloudEvent.Type, msg.CloudEvent.Attributes);
await this.InvokeWithActivityAsync(
static ((Agent Agent, CloudEvent Item) state, CancellationToken _) => state.Agent.CallHandler(state.Item),
static ((Agent Agent, CloudEvent Item) state, CancellationToken _) => state.Agent.HandleObject(state.Item),
(this, msg.CloudEvent),
activity,
msg.CloudEvent.Type, cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -281,33 +281,43 @@ static async ((Agent Agent, CloudEvent Event) state, CancellationToken ct) =>
item.Type, cancellationToken).ConfigureAwait(false);
}

public Task CallHandler(CloudEvent item)
public Task<RpcResponse> HandleRequest(RpcRequest request) => Task.FromResult(new RpcResponse { Error = "Not implemented" });

public virtual Task HandleObject(object item)
{
if (item is CloudEvent ce)
{
return Handle(ce);
}

var genericInterfaceType = typeof(IHandle<>).MakeGenericType(item.GetType());

// check that our target actually implements this interface, otherwise call the default static
if (genericInterfaceType.IsAssignableFrom(this.GetType()))
{
var methodInfo = genericInterfaceType.GetMethod(nameof(IHandle<object>.Handle), BindingFlags.Public | BindingFlags.Instance)
?? throw new InvalidOperationException($"Method not found on type {genericInterfaceType.FullName}");

return methodInfo.Invoke(this, [item]) as Task ?? throw new InvalidOperationException("Method did not return a Task");
}

// otherwise, complain
throw new InvalidOperationException($"No handler found for type {item.GetType().FullName}");
}

public virtual Task Handle(CloudEvent item)
{
// Only send the event to the handler if the agent type is handling that type
// foreach of the keys in the EventTypes.EventsMap[] if it contains the item.type
foreach (var key in EventTypes.EventsMap.Keys)
{
if (EventTypes.EventsMap[key].Contains(item.Type))
{
var payload = item.ProtoData.Unpack(EventTypes.TypeRegistry);
var convertedPayload = Convert.ChangeType(payload, EventTypes.Types[item.Type]);
var genericInterfaceType = typeof(IHandle<>).MakeGenericType(EventTypes.Types[item.Type]);

MethodInfo methodInfo;
try
{
// check that our target actually implements this interface, otherwise call the default static
if (genericInterfaceType.IsAssignableFrom(this.GetType()))
{
methodInfo = genericInterfaceType.GetMethod(nameof(IHandle<object>.Handle), BindingFlags.Public | BindingFlags.Instance)
?? throw new InvalidOperationException($"Method not found on type {genericInterfaceType.FullName}");
return methodInfo.Invoke(this, [payload]) as Task ?? Task.CompletedTask;
}
else
{
// The error here is we have registered for an event that we do not have code to listen to
throw new InvalidOperationException($"No handler found for event '{item.Type}'; expecting IHandle<{item.Type}> implementation.");
}
var payload = item.ProtoData.Unpack(EventTypes.TypeRegistry);
var convertedPayload = Convert.ChangeType(payload, EventTypes.Types[item.Type]);
return this.HandleObject(convertedPayload);
}
catch (Exception ex)
{
Expand All @@ -319,34 +329,4 @@ public Task CallHandler(CloudEvent item)

return Task.CompletedTask;
}

public Task<RpcResponse> HandleRequest(RpcRequest request) => Task.FromResult(new RpcResponse { Error = "Not implemented" });

//TODO: should this be async and cancellable?
public virtual Task HandleObject(object item)
{
// get all Handle<T> methods
var handleTMethods = this.GetType().GetMethods().Where(m => m.Name == "Handle" && m.GetParameters().Length == 1).ToList();

// get the one that matches the type of the item
var handleTMethod = handleTMethods.FirstOrDefault(m => m.GetParameters()[0].ParameterType == item.GetType());

// if we found one, invoke it
if (handleTMethod != null)
{
return (Task)handleTMethod.Invoke(this, [item])!;
}

// otherwise, complain
throw new InvalidOperationException($"No handler found for type {item.GetType().FullName}");
}
public async ValueTask PublishEventAsync(string topic, IMessage evt, CancellationToken cancellationToken = default)
{
await PublishEventAsync(evt.ToCloudEvent(topic), cancellationToken).ConfigureAwait(false);
}

public void Dispose()
{
throw new NotImplementedException();
}
}
7 changes: 7 additions & 0 deletions dotnet/src/Microsoft.AutoGen/Core/AgentExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
// AgentExtensions.cs

using System.Diagnostics;
using Google.Protobuf;
using Google.Protobuf.Collections;
using Microsoft.AutoGen.Contracts;
using static Microsoft.AutoGen.Contracts.CloudEvent.Types;

namespace Microsoft.AutoGen.Core;
Expand Down Expand Up @@ -119,4 +121,9 @@ public static async Task InvokeWithActivityAsync<TState>(this Agent agent, Func<
activity?.Stop();
}
}

public static async ValueTask PublishEventAsync(this Agent agent, string topic, IMessage evt, CancellationToken cancellationToken = default)
{
await agent.PublishEventAsync(evt.ToCloudEvent(topic), cancellationToken).ConfigureAwait(false);
}
}
6 changes: 3 additions & 3 deletions dotnet/test/Microsoft.AutoGen.Agents.Tests/AgentTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
using Google.Protobuf.Reflection;
using Microsoft.AspNetCore.Builder;
using Microsoft.AutoGen.Contracts;
using Microsoft.AutoGen.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Moq;
using Xunit;
using static Microsoft.AutoGen.Core.Tests.AgentTests;

namespace Microsoft.AutoGen.Core.Tests;
using static Microsoft.AutoGen.Agents.Tests.AgentTests;
namespace Microsoft.AutoGen.Agents.Tests;

[Collection(ClusterFixtureCollection.Name)]
public class AgentTests(InMemoryAgentRuntimeFixture fixture)
Expand Down
Loading