diff --git a/Directory.Packages.props b/Directory.Packages.props index 9e8557783..fc4916ccb 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -13,6 +13,7 @@ + diff --git a/docs/azmcp-commands.md b/docs/azmcp-commands.md index f6eee29b9..110c534d6 100644 --- a/docs/azmcp-commands.md +++ b/docs/azmcp-commands.md @@ -624,6 +624,13 @@ azmcp eventgrid subscription list --subscription \ [--resource-group ] \ [--topic ] [--location ] + +# Publish custom events to Event Grid topics +azmcp eventgrid events publish --subscription \ + --topic \ + --data \ + [--resource-group ] \ + [--schema ] ``` ### Azure Function App Operations diff --git a/docs/e2eTestPrompts.md b/docs/e2eTestPrompts.md index 671dda8c3..1b7ce4036 100644 --- a/docs/e2eTestPrompts.md +++ b/docs/e2eTestPrompts.md @@ -178,6 +178,9 @@ This file contains prompts used for end-to-end testing to ensure each tool is in | azmcp_eventgrid_subscription_list | List all Event Grid subscriptions in subscription | | azmcp_eventgrid_subscription_list | Show Event Grid subscriptions in resource group in subscription | | azmcp_eventgrid_subscription_list | List Event Grid subscriptions for subscription in location | +| azmcp_eventgrid_events_publish | Publish an event to Event Grid topic using with the following data | +| azmcp_eventgrid_events_publish | Publish event to my Event Grid topic with the following events | +| azmcp_eventgrid_events_publish | Send an event to Event Grid topic in resource group with | ## Azure Function App diff --git a/servers/Azure.Mcp.Server/CHANGELOG.md b/servers/Azure.Mcp.Server/CHANGELOG.md index 721e7fa8f..6f1e5d76a 100644 --- a/servers/Azure.Mcp.Server/CHANGELOG.md +++ b/servers/Azure.Mcp.Server/CHANGELOG.md @@ -4,6 +4,10 @@ The Azure MCP Server updates automatically by default whenever a new release com ## 0.8.2 (2025-09-25) +### Features Added + +-- Added support for publishing custom events to Event Grid topics via the command `azmcp_eventgrid_events_publish`. Supports EventGrid, CloudEvents, and custom schemas with structured event data delivery for event-driven architectures. [[#514](https://github.com/microsoft/mcp/pull/514)] + ### Bugs Fixed - Fixed `azmcp_subscription_list` to return empty enumerable instead of `null` when no subscriptions are found. [[#508](https://github.com/microsoft/mcp/pull/508)] diff --git a/servers/Azure.Mcp.Server/README.md b/servers/Azure.Mcp.Server/README.md index 7f471ed76..02998a1cf 100644 --- a/servers/Azure.Mcp.Server/README.md +++ b/servers/Azure.Mcp.Server/README.md @@ -322,6 +322,8 @@ The Azure MCP Server supercharges your agents with Azure context. Here are some * "List Event Grid subscriptions for topic 'my-topic' in subscription 'my-subscription'" * "List Event Grid Subscriptions in subscription 'my-subscription'" * "List Event Grid subscriptions for topic 'my-topic' in location 'my-location'" +* "Publish an event with data '{\"name\": \"test\"}' to topic 'my-topic' using CloudEvents schema" +* "Send custom event data to Event Grid topic 'analytics-events' with EventGrid schema" ### 🔑 Azure Key Vault diff --git a/tools/Azure.Mcp.Tools.EventGrid/src/Azure.Mcp.Tools.EventGrid.csproj b/tools/Azure.Mcp.Tools.EventGrid/src/Azure.Mcp.Tools.EventGrid.csproj index 6af3b705b..0bfbd65cc 100644 --- a/tools/Azure.Mcp.Tools.EventGrid/src/Azure.Mcp.Tools.EventGrid.csproj +++ b/tools/Azure.Mcp.Tools.EventGrid/src/Azure.Mcp.Tools.EventGrid.csproj @@ -10,6 +10,7 @@ + diff --git a/tools/Azure.Mcp.Tools.EventGrid/src/Commands/EventGridJsonContext.cs b/tools/Azure.Mcp.Tools.EventGrid/src/Commands/EventGridJsonContext.cs index 301ad71ac..9b5ed299f 100644 --- a/tools/Azure.Mcp.Tools.EventGrid/src/Commands/EventGridJsonContext.cs +++ b/tools/Azure.Mcp.Tools.EventGrid/src/Commands/EventGridJsonContext.cs @@ -2,15 +2,24 @@ // Licensed under the MIT License. using System.Text.Json.Serialization; +using Azure.Mcp.Tools.EventGrid.Commands.Events; using Azure.Mcp.Tools.EventGrid.Commands.Subscription; using Azure.Mcp.Tools.EventGrid.Commands.Topic; +using Azure.Mcp.Tools.EventGrid.Models; namespace Azure.Mcp.Tools.EventGrid.Commands; +// JsonSerializable attributes for all types used in EventGrid command responses and event serialization +[JsonSerializable(typeof(EventGridPublishCommand.EventGridPublishCommandResult))] [JsonSerializable(typeof(SubscriptionListCommand.SubscriptionListCommandResult))] [JsonSerializable(typeof(TopicListCommand.TopicListCommandResult))] [JsonSerializable(typeof(EventGridSubscriptionInfo))] [JsonSerializable(typeof(EventGridTopicInfo))] +[JsonSerializable(typeof(EventPublishResult))] +[JsonSerializable(typeof(EventGridEventSchema))] // For individual event serialization to EventGrid +[JsonSerializable(typeof(CloudEvent))] // For CloudEvent schema input deserialization +[JsonSerializable(typeof(EventGridEventInput))] // For EventGrid schema input deserialization +[JsonSerializable(typeof(CustomEvent))] // For custom event schema input deserialization [JsonSourceGenerationOptions(PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase, WriteIndented = true, DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)] internal sealed partial class EventGridJsonContext : JsonSerializerContext { diff --git a/tools/Azure.Mcp.Tools.EventGrid/src/Commands/Events/EventsPublishCommand.cs b/tools/Azure.Mcp.Tools.EventGrid/src/Commands/Events/EventsPublishCommand.cs new file mode 100644 index 000000000..25421153e --- /dev/null +++ b/tools/Azure.Mcp.Tools.EventGrid/src/Commands/Events/EventsPublishCommand.cs @@ -0,0 +1,146 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.CommandLine; +using System.CommandLine.Parsing; +using System.Net; +using Azure.Mcp.Core.Extensions; +using Azure.Mcp.Core.Models.Option; +using Azure.Mcp.Tools.EventGrid.Options; +using Azure.Mcp.Tools.EventGrid.Options.Events; +using Azure.Mcp.Tools.EventGrid.Services; + +namespace Azure.Mcp.Tools.EventGrid.Commands.Events; + +public sealed class EventGridPublishCommand(ILogger logger) : BaseEventGridCommand +{ + private const string CommandTitle = "Publish Events to Event Grid Topic"; + private readonly ILogger _logger = logger; + + public override string Name => "publish"; + + public override string Description => + """ + Publish custom events to Event Grid topics for event-driven architectures. This tool sends structured event data to + Event Grid topics with schema validation and delivery guarantees for downstream subscribers. Returns publish operation + status. Requires topic, data, and optional schema. + """; + + public override string Title => CommandTitle; + + public override ToolMetadata Metadata => new() + { + Destructive = false, + Idempotent = false, + OpenWorld = false, + ReadOnly = false, + LocalRequired = false, + Secret = false + }; + + protected override void RegisterOptions(Command command) + { + base.RegisterOptions(command); + command.Options.Add(OptionDefinitions.Common.ResourceGroup); + command.Options.Add(EventGridOptionDefinitions.TopicName); + command.Options.Add(EventGridOptionDefinitions.EventData); + command.Options.Add(EventGridOptionDefinitions.EventSchema); + } + + protected override EventsPublishOptions BindOptions(ParseResult parseResult) + { + var options = base.BindOptions(parseResult); + options.ResourceGroup ??= parseResult.GetValueOrDefault(OptionDefinitions.Common.ResourceGroup.Name); + options.TopicName = parseResult.GetValueOrDefault(EventGridOptionDefinitions.TopicName.Name); + options.EventData = parseResult.GetValueOrDefault(EventGridOptionDefinitions.EventData.Name); + options.EventSchema = parseResult.GetValueOrDefault(EventGridOptionDefinitions.EventSchema.Name); + return options; + } + + public override ValidationResult Validate(CommandResult commandResult, CommandResponse? commandResponse = null) + { + var result = base.Validate(commandResult, commandResponse); + if (!result.IsValid) + { + return result; + } + + var eventSchema = commandResult.GetValueOrDefault(EventGridOptionDefinitions.EventSchema); + if (!string.IsNullOrEmpty(eventSchema)) + { + var normalizedSchema = eventSchema.Trim().ToLowerInvariant().Replace(" ", ""); + var supportedSchemas = new[] { "cloudevents", "eventgrid", "custom" }; + + if (!supportedSchemas.Contains(normalizedSchema)) + { + result.IsValid = false; + result.ErrorMessage = "Invalid event schema specified. Supported schemas are: CloudEvents, EventGrid, or Custom."; + + if (commandResponse != null) + { + commandResponse.Status = HttpStatusCode.BadRequest; + commandResponse.Message = result.ErrorMessage; + } + } + } + + return result; + } + + public override async Task ExecuteAsync(CommandContext context, ParseResult parseResult) + { + if (!Validate(parseResult.CommandResult, context.Response).IsValid) + { + return context.Response; + } + + var options = BindOptions(parseResult); + + try + { + var eventGridService = context.GetService(); + var result = await eventGridService.PublishEventAsync( + options.Subscription!, + options.ResourceGroup, + options.TopicName!, + options.EventData!, + options.EventSchema, + options.Tenant, + options.RetryPolicy); + + context.Response.Results = ResponseResult.Create( + new(result), + EventGridJsonContext.Default.EventGridPublishCommandResult); + } + catch (Exception ex) + { + _logger.LogError(ex, + "Error publishing events to Event Grid topic. Subscription: {Subscription}, Topic: {TopicName}, Options: {@Options}", + options.Subscription, options.TopicName, options); + HandleException(context, ex); + } + + return context.Response; + } + + protected override string GetErrorMessage(Exception ex) => ex switch + { + Azure.RequestFailedException reqEx when reqEx.Status == (int)HttpStatusCode.NotFound => + "Event Grid topic not found. Please verify the topic name and resource group exist.", + Azure.RequestFailedException reqEx when reqEx.Status == (int)HttpStatusCode.Forbidden => + "Access denied to Event Grid topic. Please verify you have Event Grid Data Sender permissions.", + Azure.RequestFailedException reqEx when reqEx.Status == (int)HttpStatusCode.BadRequest => + "Invalid event data or schema format. Please verify the event data is valid JSON and matches the expected schema.", + System.Text.Json.JsonException jsonEx => + $"Invalid JSON format in event data: {jsonEx.Message}", + _ => base.GetErrorMessage(ex) + }; + + protected override HttpStatusCode GetStatusCode(Exception ex) => ex switch + { + System.Text.Json.JsonException => HttpStatusCode.BadRequest, + _ => base.GetStatusCode(ex) + }; + + internal record EventGridPublishCommandResult(EventPublishResult Result); +} diff --git a/tools/Azure.Mcp.Tools.EventGrid/src/EventGridSetup.cs b/tools/Azure.Mcp.Tools.EventGrid/src/EventGridSetup.cs index a385b13f7..e255531b9 100644 --- a/tools/Azure.Mcp.Tools.EventGrid/src/EventGridSetup.cs +++ b/tools/Azure.Mcp.Tools.EventGrid/src/EventGridSetup.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. using Azure.Mcp.Core.Areas; +using Azure.Mcp.Tools.EventGrid.Commands.Events; using Azure.Mcp.Tools.EventGrid.Commands.Subscription; using Azure.Mcp.Tools.EventGrid.Commands.Topic; using Azure.Mcp.Tools.EventGrid.Services; @@ -18,6 +19,7 @@ public void ConfigureServices(IServiceCollection services) services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); } public CommandGroup RegisterCommands(IServiceProvider serviceProvider) @@ -25,6 +27,10 @@ public CommandGroup RegisterCommands(IServiceProvider serviceProvider) // Event Grid top-level group var eventGrid = new CommandGroup(Name, "Event Grid operations - Commands for managing and accessing Event Grid topics, domains, and event subscriptions."); + // Events subgroup + var events = new CommandGroup("events", "Event Grid event operations - Commands for publishing and managing events sent to Event Grid topics."); + eventGrid.AddSubGroup(events); + // Topics subgroup var topics = new CommandGroup("topic", "Event Grid topic operations - Commands for managing Event Grid topics and their configurations."); eventGrid.AddSubGroup(topics); @@ -33,12 +39,17 @@ public CommandGroup RegisterCommands(IServiceProvider serviceProvider) var subscriptions = new CommandGroup("subscription", "Event Grid subscription operations - Commands for managing event subscriptions with filtering and endpoint configuration."); eventGrid.AddSubGroup(subscriptions); + // Register Events commands + var eventsPublish = serviceProvider.GetRequiredService(); + events.AddCommand(eventsPublish.Name, eventsPublish); + // Register Topic commands - var listCommand = serviceProvider.GetRequiredService(); - topics.AddCommand(listCommand.Name, listCommand); + var topicList = serviceProvider.GetRequiredService(); + topics.AddCommand(topicList.Name, topicList); // Register Subscription commands - subscriptions.AddCommand("list", serviceProvider.GetRequiredService()); + var subscriptionList = serviceProvider.GetRequiredService(); + subscriptions.AddCommand(subscriptionList.Name, subscriptionList); return eventGrid; } diff --git a/tools/Azure.Mcp.Tools.EventGrid/src/GlobalUsings.cs b/tools/Azure.Mcp.Tools.EventGrid/src/GlobalUsings.cs index 487469cc4..d6027d9e5 100644 --- a/tools/Azure.Mcp.Tools.EventGrid/src/GlobalUsings.cs +++ b/tools/Azure.Mcp.Tools.EventGrid/src/GlobalUsings.cs @@ -9,4 +9,5 @@ global using Azure.Mcp.Core.Services.Azure.Subscription; global using Azure.Mcp.Core.Services.Azure.Tenant; global using Azure.Mcp.Tools.EventGrid.Models; +global using Azure.Messaging.EventGrid; global using Microsoft.Extensions.Logging; diff --git a/tools/Azure.Mcp.Tools.EventGrid/src/Models/EventGridEventSchema.cs b/tools/Azure.Mcp.Tools.EventGrid/src/Models/EventGridEventSchema.cs new file mode 100644 index 000000000..dde70950a --- /dev/null +++ b/tools/Azure.Mcp.Tools.EventGrid/src/Models/EventGridEventSchema.cs @@ -0,0 +1,31 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Text.Json.Nodes; +using System.Text.Json.Serialization; + +namespace Azure.Mcp.Tools.EventGrid.Models; + +/// +/// Represents an event conforming to the EventGrid event schema for HTTP API publishing. +/// +public sealed class EventGridEventSchema +{ + [JsonPropertyName("id")] + public required string Id { get; set; } + + [JsonPropertyName("subject")] + public required string Subject { get; set; } + + [JsonPropertyName("eventType")] + public required string EventType { get; set; } + + [JsonPropertyName("dataVersion")] + public required string DataVersion { get; set; } + + [JsonPropertyName("data")] + public JsonNode? Data { get; set; } + + [JsonPropertyName("eventTime")] + public DateTimeOffset EventTime { get; set; } +} diff --git a/tools/Azure.Mcp.Tools.EventGrid/src/Models/EventInputSchemas.cs b/tools/Azure.Mcp.Tools.EventGrid/src/Models/EventInputSchemas.cs new file mode 100644 index 000000000..aa0caead8 --- /dev/null +++ b/tools/Azure.Mcp.Tools.EventGrid/src/Models/EventInputSchemas.cs @@ -0,0 +1,111 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace Azure.Mcp.Tools.EventGrid.Models; + +/// +/// CloudEvent v1.0 specification POJO for JSON deserialization. +/// Represents the standard CloudEvent format defined at https://cloudevents.io/ +/// This gets converted to EventGridEvent for internal processing. +/// +public sealed class CloudEvent +{ + [JsonPropertyName("id")] + public string? Id { get; set; } + + [JsonPropertyName("type")] + public string? Type { get; set; } + + [JsonPropertyName("source")] + public string? Source { get; set; } + + [JsonPropertyName("subject")] + public string? Subject { get; set; } + + [JsonPropertyName("specversion")] + public string? SpecVersion { get; set; } + + [JsonPropertyName("time")] + public DateTimeOffset? Time { get; set; } + + [JsonPropertyName("datacontenttype")] + public string? DataContentType { get; set; } + + [JsonPropertyName("data")] + public JsonElement? Data { get; set; } +} + +/// +/// EventGrid Event schema POJO for JSON deserialization. +/// This gets converted to EventGridEvent for internal processing. +/// Note: We still use this POJO even though EventGridEvent exists because +/// the input may have optional fields that need defaults applied. +/// +public sealed class EventGridEventInput +{ + [JsonPropertyName("id")] + public string? Id { get; set; } + + [JsonPropertyName("eventType")] + public string? EventType { get; set; } + + [JsonPropertyName("subject")] + public string? Subject { get; set; } + + [JsonPropertyName("dataVersion")] + public string? DataVersion { get; set; } + + [JsonPropertyName("eventTime")] + public DateTimeOffset? EventTime { get; set; } + + [JsonPropertyName("data")] + public JsonElement? Data { get; set; } +} + +/// +/// Flexible/custom event schema POJO that supports both CloudEvents and EventGrid field names. +/// Used when the schema type is "Custom" or unknown. +/// This gets converted to EventGridEvent for internal processing. +/// +public sealed class CustomEvent +{ + [JsonPropertyName("id")] + public string? Id { get; set; } + + // CloudEvents uses "type", EventGrid uses "eventType" + [JsonPropertyName("type")] + public string? Type { get; set; } + + [JsonPropertyName("eventType")] + public string? EventType { get; set; } + + // CloudEvents uses "source", EventGrid uses "subject" + [JsonPropertyName("source")] + public string? Source { get; set; } + + [JsonPropertyName("subject")] + public string? Subject { get; set; } + + // CloudEvents uses "specversion", EventGrid uses "dataVersion" + [JsonPropertyName("specversion")] + public string? SpecVersion { get; set; } + + [JsonPropertyName("dataVersion")] + public string? DataVersion { get; set; } + + // CloudEvents uses "time", EventGrid uses "eventTime" + [JsonPropertyName("time")] + public DateTimeOffset? Time { get; set; } + + [JsonPropertyName("eventTime")] + public DateTimeOffset? EventTime { get; set; } + + [JsonPropertyName("datacontenttype")] + public string? DataContentType { get; set; } + + [JsonPropertyName("data")] + public JsonElement? Data { get; set; } +} diff --git a/tools/Azure.Mcp.Tools.EventGrid/src/Models/EventPublishResult.cs b/tools/Azure.Mcp.Tools.EventGrid/src/Models/EventPublishResult.cs new file mode 100644 index 000000000..a75a93c6a --- /dev/null +++ b/tools/Azure.Mcp.Tools.EventGrid/src/Models/EventPublishResult.cs @@ -0,0 +1,11 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Azure.Mcp.Tools.EventGrid.Models; + +public sealed record EventPublishResult( + string Status, + string Message, + int PublishedEventCount, + string? OperationId = null, + DateTime? PublishedAt = null); diff --git a/tools/Azure.Mcp.Tools.EventGrid/src/Options/EventGridOptionDefinitions.cs b/tools/Azure.Mcp.Tools.EventGrid/src/Options/EventGridOptionDefinitions.cs index c51bc6233..19ecc1034 100644 --- a/tools/Azure.Mcp.Tools.EventGrid/src/Options/EventGridOptionDefinitions.cs +++ b/tools/Azure.Mcp.Tools.EventGrid/src/Options/EventGridOptionDefinitions.cs @@ -7,18 +7,38 @@ public static class EventGridOptionDefinitions { public const string TopicNameParam = "topic"; public const string LocationParam = "location"; + public const string EventDataParam = "data"; + public const string EventSchemaParam = "schema"; public static readonly Option TopicName = new( $"--{TopicNameParam}" ) { - Description = "The name of the Event Grid topic." + Description = "The name of the Event Grid topic.", + Required = true }; public static readonly Option Location = new( $"--{LocationParam}" ) { - Description = "The Azure region to filter resources by (e.g., 'eastus', 'westus2')." + Description = "The Azure region to filter resources by (e.g., 'eastus', 'westus2').", + Required = false + }; + + public static readonly Option EventData = new( + $"--{EventDataParam}" + ) + { + Description = "The event data as JSON string to publish to the Event Grid topic.", + Required = true + }; + + public static readonly Option EventSchema = new( + $"--{EventSchemaParam}" + ) + { + Description = "The event schema type (CloudEvents, EventGrid, or Custom). Defaults to EventGrid.", + Required = false }; } diff --git a/tools/Azure.Mcp.Tools.EventGrid/src/Options/Events/EventsPublishOptions.cs b/tools/Azure.Mcp.Tools.EventGrid/src/Options/Events/EventsPublishOptions.cs new file mode 100644 index 000000000..f835454eb --- /dev/null +++ b/tools/Azure.Mcp.Tools.EventGrid/src/Options/Events/EventsPublishOptions.cs @@ -0,0 +1,11 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Azure.Mcp.Tools.EventGrid.Options.Events; + +public class EventsPublishOptions : BaseEventGridOptions +{ + public string? TopicName { get; set; } + public string? EventData { get; set; } + public string? EventSchema { get; set; } +} diff --git a/tools/Azure.Mcp.Tools.EventGrid/src/Services/EventGridService.cs b/tools/Azure.Mcp.Tools.EventGrid/src/Services/EventGridService.cs index a333a952a..4c07b7467 100644 --- a/tools/Azure.Mcp.Tools.EventGrid/src/Services/EventGridService.cs +++ b/tools/Azure.Mcp.Tools.EventGrid/src/Services/EventGridService.cs @@ -1,6 +1,10 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Net.Mime; +using System.Text.Json; +using System.Text.Json.Nodes; +using Azure.Mcp.Tools.EventGrid.Commands; using Azure.ResourceManager.EventGrid; using Azure.ResourceManager.EventGrid.Models; using Azure.ResourceManager.Resources; @@ -79,6 +83,192 @@ public async Task> GetSubscriptionsAsync( return subscriptions; } + public async Task PublishEventAsync( + string subscription, + string? resourceGroup, + string topicName, + string eventData, + string? eventSchema = null, + string? tenant = null, + RetryPolicyOptions? retryPolicy = null) + { + var operationId = Guid.NewGuid().ToString(); + _logger.LogInformation("Starting event publication. OperationId: {OperationId}, Topic: {TopicName}, ResourceGroup: {ResourceGroup}, Subscription: {Subscription}", + operationId, topicName, resourceGroup, subscription); + + try + { + var subscriptionResource = await _subscriptionService.GetSubscription(subscription, tenant, retryPolicy); + + // Find the topic to get its endpoint and access key + var topic = await FindTopic(subscriptionResource, resourceGroup, topicName); + if (topic == null) + { + var errorMessage = $"Event Grid topic '{topicName}' not found in resource group '{resourceGroup}'. Make sure the topic exists and you have access to it."; + _logger.LogError(errorMessage); + throw new InvalidOperationException("Publishing failed with the following error message: " + errorMessage); + } + + if (topic.Data.Endpoint == null) + { + var errorMessage = $"Event Grid topic '{topicName}' does not have a valid endpoint."; + _logger.LogError(errorMessage); + throw new InvalidOperationException("Publishing failed with the following error message: " + errorMessage); + } + + // Get credential using standardized method from base class for Azure AD authentication + var credential = await GetCredential(tenant); + + // Parse and validate event data directly to EventGridEventSchema + var eventGridEventSchemas = ParseAndValidateEventData(eventData, eventSchema ?? "EventGridEvent"); + + var publisherClient = new EventGridPublisherClient(topic.Data.Endpoint, credential); + + // Serialize each event individually to JSON using source-generated context + var eventsData = eventGridEventSchemas.Select(eventSchema => + { + var jsonString = JsonSerializer.Serialize(eventSchema, EventGridJsonContext.Default.EventGridEventSchema); + return BinaryData.FromString(jsonString); + }).ToArray(); + + var eventCount = eventsData.Length; + _logger.LogInformation("Publishing {EventCount} events to topic '{TopicName}' with operation ID: {OperationId}", + eventCount, topicName, operationId); + + try + { + await publisherClient.SendEventsAsync(eventsData); + } + catch (Exception publishEx) + { + _logger.LogError(publishEx, "Failed to publish events to topic '{TopicName}' with operation ID: {OperationId}", + topicName, operationId); + throw; + } + + _logger.LogInformation("Successfully published {EventCount} events to topic '{TopicName}'", + eventCount, topicName); + + return new EventPublishResult( + Status: "Success", + Message: $"Successfully published {eventCount} event(s) to topic '{topicName}'.", + PublishedEventCount: eventCount, + OperationId: operationId, + PublishedAt: DateTime.UtcNow); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to publish events to topic '{TopicName}' in resource group '{ResourceGroup}'", + topicName, resourceGroup); + + return new EventPublishResult( + Status: "Failed", + Message: $"Failed to publish events: {ex.Message}", + PublishedEventCount: 0, + OperationId: operationId, + PublishedAt: DateTime.UtcNow); + } + } + + private static IEnumerable ParseAndValidateEventData(string eventData, string eventSchema) + { + try + { + // Parse the JSON data + var jsonDocument = JsonDocument.Parse(eventData); + + IEnumerable events; + + if (jsonDocument.RootElement.ValueKind == JsonValueKind.Array) + { + // Handle array of events - use lazy evaluation + events = jsonDocument.RootElement.EnumerateArray() + .Select(eventElement => CreateEventGridEventFromJsonElement(eventElement, eventSchema)); + } + else + { + // Handle single event - return single item enumerable + events = new[] { CreateEventGridEventFromJsonElement(jsonDocument.RootElement, eventSchema) }; + } + + var eventsList = events.ToList(); + if (eventsList.Count == 0) + { + throw new ArgumentException("No valid events found in the provided event data."); + } + + return eventsList; + } + catch (JsonException ex) + { + throw new ArgumentException($"Invalid JSON format in event data: {ex.Message}", ex); + } + } + + private static Models.EventGridEventSchema CreateEventGridEventFromJsonElement(JsonElement eventElement, string eventSchema) + { + var eventJson = eventElement.GetRawText(); + + if (eventSchema.Equals("CloudEvents", StringComparison.OrdinalIgnoreCase)) + { + var cloudEvent = JsonSerializer.Deserialize(eventJson, EventGridJsonContext.Default.CloudEvent); + if (cloudEvent == null) + throw new ArgumentException("Failed to deserialize CloudEvent"); + + // Validate datacontenttype for CloudEvents + var dataContentType = cloudEvent.DataContentType ?? MediaTypeNames.Application.Json; + if (!string.Equals(dataContentType, MediaTypeNames.Application.Json, StringComparison.OrdinalIgnoreCase)) + { + if (string.IsNullOrWhiteSpace(dataContentType) || !dataContentType.Contains('/')) + { + throw new ArgumentException($"Invalid datacontenttype '{dataContentType}' in CloudEvent with id '{cloudEvent.Id}'. Must be a valid MIME type (e.g., 'application/xml', 'text/plain')."); + } + } + + return new Models.EventGridEventSchema + { + Id = cloudEvent.Id ?? Guid.NewGuid().ToString(), + Subject = cloudEvent.Source ?? cloudEvent.Subject ?? "/default/subject", + EventType = cloudEvent.Type ?? "CustomEvent", + DataVersion = cloudEvent.SpecVersion ?? "1.0", + Data = cloudEvent.Data.HasValue ? JsonNode.Parse(cloudEvent.Data.Value.GetRawText()) : null, + EventTime = cloudEvent.Time ?? DateTimeOffset.UtcNow + }; + } + else if (eventSchema.Equals("EventGrid", StringComparison.OrdinalIgnoreCase)) + { + var eventGridEvent = JsonSerializer.Deserialize(eventJson, EventGridJsonContext.Default.EventGridEventInput); + if (eventGridEvent == null) + throw new ArgumentException("Failed to deserialize EventGrid event"); + + return new Models.EventGridEventSchema + { + Id = eventGridEvent.Id ?? Guid.NewGuid().ToString(), + Subject = eventGridEvent.Subject ?? "/default/subject", + EventType = eventGridEvent.EventType ?? "CustomEvent", + DataVersion = eventGridEvent.DataVersion ?? "1.0", + Data = eventGridEvent.Data.HasValue ? JsonNode.Parse(eventGridEvent.Data.Value.GetRawText()) : null, + EventTime = eventGridEvent.EventTime ?? DateTimeOffset.UtcNow + }; + } + else // Custom schema - try both CloudEvents and EventGrid field names + { + var flexibleEvent = JsonSerializer.Deserialize(eventJson, EventGridJsonContext.Default.CustomEvent); + if (flexibleEvent == null) + throw new ArgumentException("Failed to deserialize custom event"); + + return new Models.EventGridEventSchema + { + Id = flexibleEvent.Id ?? Guid.NewGuid().ToString(), + Subject = flexibleEvent.Subject ?? flexibleEvent.Source ?? "/default/subject", + EventType = flexibleEvent.EventType ?? flexibleEvent.Type ?? "CustomEvent", + DataVersion = flexibleEvent.DataVersion ?? flexibleEvent.SpecVersion ?? "1.0", + Data = flexibleEvent.Data.HasValue ? JsonNode.Parse(flexibleEvent.Data.Value.GetRawText()) : null, + EventTime = flexibleEvent.EventTime ?? flexibleEvent.Time ?? DateTimeOffset.UtcNow + }; + } + } + private async Task GetSubscriptionsForSpecificTopic( SubscriptionResource subscriptionResource, string? resourceGroup, @@ -202,27 +392,43 @@ private async Task GetSubscriptionsFromAllTopics( { if (!string.IsNullOrEmpty(resourceGroup)) { - // Search in specific resource group - var resourceGroupResource = await subscriptionResource.GetResourceGroupAsync(resourceGroup); - - await foreach (var topic in resourceGroupResource.Value.GetEventGridTopics().GetAllAsync()) + try { - if (topic.Data.Name.Equals(topicName, StringComparison.OrdinalIgnoreCase)) + // Search in specific resource group + var resourceGroupResource = await subscriptionResource.GetResourceGroupAsync(resourceGroup); + + await foreach (var topic in resourceGroupResource.Value.GetEventGridTopics().GetAllAsync()) { - return topic; + if (topic.Data.Name.Equals(topicName, StringComparison.OrdinalIgnoreCase)) + { + return topic; + } } } + catch (Exception ex) + { + _logger.LogError(ex, "Error accessing resource group '{ResourceGroup}'", resourceGroup); + throw; + } } else { - // Search in all resource groups - await foreach (var topic in subscriptionResource.GetEventGridTopicsAsync()) + try { - if (topic.Data.Name.Equals(topicName, StringComparison.OrdinalIgnoreCase)) + // Search in all resource groups + await foreach (var topic in subscriptionResource.GetEventGridTopicsAsync()) { - return topic; + if (topic.Data.Name.Equals(topicName, StringComparison.OrdinalIgnoreCase)) + { + return topic; + } } } + catch (Exception ex) + { + _logger.LogError(ex, "Error searching topics across subscription"); + throw; + } } return null; @@ -362,4 +568,5 @@ private async Task AddSubscriptionsFromSystemTopic( } } } + } diff --git a/tools/Azure.Mcp.Tools.EventGrid/src/Services/IEventGridService.cs b/tools/Azure.Mcp.Tools.EventGrid/src/Services/IEventGridService.cs index 72bbdb6e7..db53125af 100644 --- a/tools/Azure.Mcp.Tools.EventGrid/src/Services/IEventGridService.cs +++ b/tools/Azure.Mcp.Tools.EventGrid/src/Services/IEventGridService.cs @@ -18,4 +18,13 @@ Task> GetSubscriptionsAsync( string? location = null, string? tenant = null, RetryPolicyOptions? retryPolicy = null); + + Task PublishEventAsync( + string subscription, + string? resourceGroup, + string topicName, + string eventData, + string? eventSchema = null, + string? tenant = null, + RetryPolicyOptions? retryPolicy = null); } diff --git a/tools/Azure.Mcp.Tools.EventGrid/tests/Azure.Mcp.Tools.EventGrid.LiveTests/EventGridCommandTests.cs b/tools/Azure.Mcp.Tools.EventGrid/tests/Azure.Mcp.Tools.EventGrid.LiveTests/EventGridCommandTests.cs index 99249b928..d1f05bf88 100644 --- a/tools/Azure.Mcp.Tools.EventGrid/tests/Azure.Mcp.Tools.EventGrid.LiveTests/EventGridCommandTests.cs +++ b/tools/Azure.Mcp.Tools.EventGrid/tests/Azure.Mcp.Tools.EventGrid.LiveTests/EventGridCommandTests.cs @@ -74,4 +74,274 @@ public async Task Should_list_eventgrid_subscriptions_by_subscription_and_resour Assert.Equal(JsonValueKind.Array, subscriptions.ValueKind); // Note: subscriptions array might be empty if no Event Grid subscriptions exist in the resource group } + + [Fact] + public async Task Should_publish_events_to_eventgrid_topic() + { + // Create test event data + var eventData = JsonSerializer.Serialize(new + { + subject = "/test/subject", + eventType = "TestEvent", + dataVersion = "1.0", + data = new { message = "Test event from integration test", timestamp = DateTime.UtcNow } + }); + + var result = await CallToolAsync( + "azmcp_eventgrid_events_publish", + new() + { + { "subscription", Settings.SubscriptionId }, + { "resource-group", Settings.ResourceGroupName }, + { "topic", Settings.ResourceBaseName }, + { "data", eventData } + }); + + var publishResult = result.AssertProperty("result"); + var status = publishResult.AssertProperty("status").GetString(); + var publishedEventCount = publishResult.AssertProperty("publishedEventCount").GetInt32(); + + Assert.Equal("Success", status); + Assert.Equal(1, publishedEventCount); + } + + [Fact] + public async Task Should_publish_multiple_events_to_eventgrid_topic() + { + // Create test event data array + var eventData = JsonSerializer.Serialize(new[] + { + new + { + subject = "/test/subject1", + eventType = "TestEvent", + dataVersion = "1.0", + data = new { message = "Test event 1", timestamp = DateTime.UtcNow } + }, + new + { + subject = "/test/subject2", + eventType = "TestEvent", + dataVersion = "1.0", + data = new { message = "Test event 2", timestamp = DateTime.UtcNow } + } + }); + + var result = await CallToolAsync( + "azmcp_eventgrid_events_publish", + new() + { + { "subscription", Settings.SubscriptionId }, + { "resource-group", Settings.ResourceGroupName }, + { "topic", Settings.ResourceBaseName }, + { "data", eventData } + }); + + var publishResult = result.AssertProperty("result"); + var status = publishResult.AssertProperty("status").GetString(); + var publishedEventCount = publishResult.AssertProperty("publishedEventCount").GetInt32(); + + Assert.Equal("Success", status); + Assert.Equal(2, publishedEventCount); + } + + [Fact] + public async Task Should_publish_cloudevents_to_eventgrid_topic() + { + // Create CloudEvents format event data + var eventData = JsonSerializer.Serialize(new + { + specversion = "1.0", + type = "com.example.LiveTestEvent", + source = "/live/test/cloudevents", + id = Guid.NewGuid().ToString(), + time = DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ssZ"), + data = new + { + message = "CloudEvents test from live integration test", + testType = "live-test", + timestamp = DateTime.UtcNow + } + }); + + var result = await CallToolAsync( + "azmcp_eventgrid_events_publish", + new() + { + { "subscription", Settings.SubscriptionId }, + { "resource-group", Settings.ResourceGroupName }, + { "topic", Settings.ResourceBaseName }, + { "data", eventData }, + { "schema", "CloudEvents" } + }); + + var publishResult = result.AssertProperty("result"); + var status = publishResult.AssertProperty("status").GetString(); + var publishedEventCount = publishResult.AssertProperty("publishedEventCount").GetInt32(); + + Assert.Equal("Success", status); + Assert.Equal(1, publishedEventCount); + } + + [Fact] + public async Task Should_publish_custom_schema_to_eventgrid_topic() + { + // Create custom schema event data (business-oriented format) + var eventData = JsonSerializer.Serialize(new + { + orderNumber = "LIVE-ORDER-" + DateTime.Now.Ticks, + eventCategory = "OrderProcessed", + resourcePath = "/orders/live-test", + occurredAt = DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ssZ"), + details = new + { + amount = 125.50m, + currency = "USD", + items = new[] { + new { sku = "LIVE-SKU-001", quantity = 2, price = 50.00m }, + new { sku = "LIVE-SKU-002", quantity = 1, price = 25.50m } + }, + customer = new + { + id = "CUST-LIVE-001", + tier = "premium" + } + } + }); + + var result = await CallToolAsync( + "azmcp_eventgrid_events_publish", + new() + { + { "subscription", Settings.SubscriptionId }, + { "resource-group", Settings.ResourceGroupName }, + { "topic", Settings.ResourceBaseName }, + { "data", eventData }, + { "schema", "Custom" } + }); + + var publishResult = result.AssertProperty("result"); + var status = publishResult.AssertProperty("status").GetString(); + var publishedEventCount = publishResult.AssertProperty("publishedEventCount").GetInt32(); + + Assert.Equal("Success", status); + Assert.Equal(1, publishedEventCount); + } + + [Fact] + public async Task Should_publish_mixed_schemas_in_custom_format() + { + // Create array with mixed EventGrid and CloudEvents field styles + var eventData = JsonSerializer.Serialize(new object[] + { + new // EventGrid-style fields + { + id = "live-eventgrid-" + Guid.NewGuid().ToString(), + subject = "/live/test/eventgrid", + eventType = "LiveTest.EventGrid", + dataVersion = "2.0", + eventTime = DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ssZ"), + data = new { format = "EventGrid", test = "live" } + }, + new // CloudEvents-style fields + { + id = "live-cloudevents-" + Guid.NewGuid().ToString(), + source = "/live/test/cloudevents", + type = "LiveTest.CloudEvents", + specversion = "1.0", + time = DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ssZ"), + data = new { format = "CloudEvents", test = "live" } + } + }); + + var result = await CallToolAsync( + "azmcp_eventgrid_events_publish", + new() + { + { "subscription", Settings.SubscriptionId }, + { "resource-group", Settings.ResourceGroupName }, + { "topic", Settings.ResourceBaseName }, + { "data", eventData }, + { "schema", "Custom" } + }); + + var publishResult = result.AssertProperty("result"); + var status = publishResult.AssertProperty("status").GetString(); + var publishedEventCount = publishResult.AssertProperty("publishedEventCount").GetInt32(); + + Assert.Equal("Success", status); + Assert.Equal(2, publishedEventCount); + } + + [Fact] + public async Task Should_handle_eventgrid_schema_explicitly() + { + // Test explicit EventGrid schema specification + var eventData = JsonSerializer.Serialize(new + { + id = "live-explicit-eventgrid-" + Guid.NewGuid().ToString(), + subject = "/live/test/explicit", + eventType = "LiveTest.ExplicitEventGrid", + dataVersion = "1.5", + eventTime = DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ssZ"), + data = new + { + isExplicit = true, + schema = "EventGrid", + timestamp = DateTime.UtcNow + } + }); + + var result = await CallToolAsync( + "azmcp_eventgrid_events_publish", + new() + { + { "subscription", Settings.SubscriptionId }, + { "resource-group", Settings.ResourceGroupName }, + { "topic", Settings.ResourceBaseName }, + { "data", eventData }, + { "schema", "EventGrid" } + }); + + var publishResult = result.AssertProperty("result"); + var status = publishResult.AssertProperty("status").GetString(); + var publishedEventCount = publishResult.AssertProperty("publishedEventCount").GetInt32(); + + Assert.Equal("Success", status); + Assert.Equal(1, publishedEventCount); + } + + [Fact] + public async Task Should_publish_with_default_schema_when_not_specified() + { + // Test that EventGrid schema is used by default when not specified + var eventData = JsonSerializer.Serialize(new + { + subject = "/live/test/default", + eventType = "LiveTest.DefaultSchema", + dataVersion = "1.0", + data = new + { + defaultTest = true, + timestamp = DateTime.UtcNow + } + }); + + var result = await CallToolAsync( + "azmcp_eventgrid_events_publish", + new() + { + { "subscription", Settings.SubscriptionId }, + { "resource-group", Settings.ResourceGroupName }, + { "topic", Settings.ResourceBaseName }, + { "data", eventData } + }); + + var publishResult = result.AssertProperty("result"); + var status = publishResult.AssertProperty("status").GetString(); + var publishedEventCount = publishResult.AssertProperty("publishedEventCount").GetInt32(); + + Assert.Equal("Success", status); + Assert.Equal(1, publishedEventCount); + } } diff --git a/tools/Azure.Mcp.Tools.EventGrid/tests/Azure.Mcp.Tools.EventGrid.UnitTests/Events/EventsPublishCommandTests.cs b/tools/Azure.Mcp.Tools.EventGrid/tests/Azure.Mcp.Tools.EventGrid.UnitTests/Events/EventsPublishCommandTests.cs new file mode 100644 index 000000000..988fa8a6c --- /dev/null +++ b/tools/Azure.Mcp.Tools.EventGrid/tests/Azure.Mcp.Tools.EventGrid.UnitTests/Events/EventsPublishCommandTests.cs @@ -0,0 +1,931 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.CommandLine; +using System.Net; +using System.Text.Json; +using Azure.Mcp.Core.Models.Command; +using Azure.Mcp.Core.Options; +using Azure.Mcp.Tools.EventGrid.Commands; +using Azure.Mcp.Tools.EventGrid.Commands.Events; +using Azure.Mcp.Tools.EventGrid.Models; +using Azure.Mcp.Tools.EventGrid.Services; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using NSubstitute; +using NSubstitute.ExceptionExtensions; +using Xunit; + +namespace Azure.Mcp.Tools.EventGrid.UnitTests.Events; + +[Trait("Area", "EventGrid")] +public class EventsPublishCommandTests +{ + private readonly IServiceProvider _serviceProvider; + private readonly IEventGridService _eventGridService; + private readonly ILogger _logger; + private readonly EventGridPublishCommand _command; + private readonly CommandContext _context; + private readonly Command _commandDefinition; + + public EventsPublishCommandTests() + { + _eventGridService = Substitute.For(); + _logger = Substitute.For>(); + + var collection = new ServiceCollection().AddSingleton(_eventGridService); + + _serviceProvider = collection.BuildServiceProvider(); + _command = new(_logger); + _context = new(_serviceProvider); + _commandDefinition = _command.GetCommand(); + } + + [Fact] + public void Command_Properties_AreCorrect() + { + Assert.Equal("publish", _command.Name); + Assert.Contains("Publish custom events to Event Grid topics", _command.Description); + Assert.Equal("Publish Events to Event Grid Topic", _command.Title); + } + + [Fact] + public void Command_Metadata_IsCorrect() + { + var metadata = _command.Metadata; + Assert.False(metadata.Destructive); + Assert.False(metadata.Idempotent); + Assert.False(metadata.OpenWorld); + Assert.False(metadata.ReadOnly); + Assert.False(metadata.LocalRequired); + Assert.False(metadata.Secret); + } + + [Fact] + public async Task ExecuteAsync_WithValidSingleEvent_ReturnsSuccess() + { + // Arrange + var subscriptionId = "test-sub"; + var resourceGroup = "test-rg"; + var topicName = "test-topic"; + var eventData = JsonSerializer.Serialize(new + { + subject = "/test/subject", + eventType = "TestEvent", + dataVersion = "1.0", + data = new { message = "Hello World" } + }); + + var expectedResult = new EventPublishResult( + Status: "Success", + Message: $"Successfully published 1 event(s) to topic '{topicName}'.", + PublishedEventCount: 1, + OperationId: Guid.NewGuid().ToString(), + PublishedAt: DateTime.UtcNow); + + _eventGridService.PublishEventAsync( + Arg.Is(subscriptionId), + Arg.Is(resourceGroup), + Arg.Is(topicName), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any()) + .Returns(Task.FromResult(expectedResult)); + + var args = _commandDefinition.Parse(["--subscription", subscriptionId, "--resource-group", resourceGroup, "--topic", topicName, "--data", eventData]); + + // Act + var response = await _command.ExecuteAsync(_context, args); + + // Assert + Assert.Equal(HttpStatusCode.OK, response.Status); + Assert.NotNull(response.Results); + + var json = JsonSerializer.Serialize(response.Results); + var result = JsonSerializer.Deserialize(json, EventGridJsonContext.Default.EventGridPublishCommandResult); + Assert.NotNull(result); + Assert.Equal("Success", result!.Result.Status); + Assert.Equal(1, result.Result.PublishedEventCount); + } + + [Fact] + public async Task ExecuteAsync_WithValidArrayOfEvents_ReturnsSuccess() + { + // Arrange + var subscriptionId = "test-sub"; + var resourceGroup = "test-rg"; + var topicName = "test-topic"; + var eventData = JsonSerializer.Serialize(new[] + { + new + { + subject = "/test/subject1", + eventType = "TestEvent", + dataVersion = "1.0", + data = new { message = "Hello World 1" } + }, + new + { + subject = "/test/subject2", + eventType = "TestEvent", + dataVersion = "1.0", + data = new { message = "Hello World 2" } + } + }); + + var expectedResult = new EventPublishResult( + Status: "Success", + Message: $"Successfully published 2 event(s) to topic '{topicName}'.", + PublishedEventCount: 2, + OperationId: Guid.NewGuid().ToString(), + PublishedAt: DateTime.UtcNow); + + _eventGridService.PublishEventAsync( + Arg.Is(subscriptionId), + Arg.Is(resourceGroup), + Arg.Is(topicName), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any()) + .Returns(Task.FromResult(expectedResult)); + + var args = _commandDefinition.Parse(["--subscription", subscriptionId, "--resource-group", resourceGroup, "--topic", topicName, "--data", eventData]); + + // Act + var response = await _command.ExecuteAsync(_context, args); + + // Assert + Assert.Equal(HttpStatusCode.OK, response.Status); + Assert.NotNull(response.Results); + + var json = JsonSerializer.Serialize(response.Results); + var result = JsonSerializer.Deserialize(json, EventGridJsonContext.Default.EventGridPublishCommandResult); + Assert.NotNull(result); + Assert.Equal("Success", result!.Result.Status); + Assert.Equal(2, result.Result.PublishedEventCount); + } + + [Fact] + public async Task ExecuteAsync_WithInvalidJson_Returns400() + { + // Arrange + var subscriptionId = "test-sub"; + var resourceGroup = "test-rg"; + var topicName = "test-topic"; + var invalidEventData = "invalid-json"; + + _eventGridService.PublishEventAsync( + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any()) + .ThrowsAsync(new System.Text.Json.JsonException("Invalid JSON format")); + + var args = _commandDefinition.Parse(["--subscription", subscriptionId, "--resource-group", resourceGroup, "--topic", topicName, "--data", invalidEventData]); + + // Act + var response = await _command.ExecuteAsync(_context, args); + + // Assert + Assert.Equal(HttpStatusCode.BadRequest, response.Status); + Assert.Contains("Invalid", response.Message); + } + + [Fact] + public async Task ExecuteAsync_WithTopicNotFound_Returns404() + { + // Arrange + var subscriptionId = "test-sub"; + var resourceGroup = "test-rg"; + var topicName = "nonexistent-topic"; + var eventData = JsonSerializer.Serialize(new + { + subject = "/test/subject", + eventType = "TestEvent", + dataVersion = "1.0", + data = new { message = "Hello World" } + }); + + _eventGridService.PublishEventAsync( + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any()) + .ThrowsAsync(new InvalidOperationException($"Event Grid topic '{topicName}' not found in resource group '{resourceGroup}'.")); + + var args = _commandDefinition.Parse(["--subscription", subscriptionId, "--resource-group", resourceGroup, "--topic", topicName, "--data", eventData]); + + // Act + var response = await _command.ExecuteAsync(_context, args); + + // Assert + Assert.Equal(HttpStatusCode.InternalServerError, response.Status); // The base command returns InternalServerError for general exceptions by default + Assert.Contains("not found", response.Message); + } + + [Theory] + [InlineData("", false)] + [InlineData("--subscription test-sub", false)] + [InlineData("--subscription test-sub --resource-group test-rg", false)] + [InlineData("--subscription test-sub --topic test-topic", false)] + [InlineData("--subscription test-sub --resource-group test-rg --topic test-topic", false)] + [InlineData("--subscription test-sub --topic test-topic --data '{\"subject\":\"test\"}'", true)] + [InlineData("--subscription test-sub --resource-group test-rg --topic test-topic --data '{\"subject\":\"test\"}'", true)] + public async Task ExecuteAsync_ValidatesInputCorrectly(string args, bool shouldSucceed) + { + // Arrange + if (shouldSucceed) + { + var expectedResult = new EventPublishResult( + Status: "Success", + Message: "Successfully published 1 event(s).", + PublishedEventCount: 1, + OperationId: Guid.NewGuid().ToString(), + PublishedAt: DateTime.UtcNow); + + _eventGridService.PublishEventAsync( + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any()) + .Returns(Task.FromResult(expectedResult)); + } + + var parseResult = _commandDefinition.Parse(args.Split(' ', StringSplitOptions.RemoveEmptyEntries)); + + // Act + var response = await _command.ExecuteAsync(_context, parseResult); + + // Assert + if (shouldSucceed) + { + Assert.Equal(HttpStatusCode.OK, response.Status); + Assert.NotNull(response.Results); + Assert.Equal("Success", response.Message); + } + else + { + Assert.Equal(HttpStatusCode.BadRequest, response.Status); + Assert.Contains("required", response.Message?.ToLower() ?? ""); + } + } + + [Fact] + public async Task ExecuteAsync_WithoutResourceGroup_ReturnsSuccess() + { + // Arrange + var subscriptionId = "test-sub"; + var topicName = "test-topic"; + var eventData = JsonSerializer.Serialize(new + { + subject = "/test/subject", + eventType = "TestEvent", + dataVersion = "1.0", + data = new { message = "Hello World" } + }); + + var expectedResult = new EventPublishResult( + Status: "Success", + Message: $"Successfully published 1 event(s) to topic '{topicName}'.", + PublishedEventCount: 1, + OperationId: Guid.NewGuid().ToString(), + PublishedAt: DateTime.UtcNow); + + _eventGridService.PublishEventAsync( + Arg.Is(subscriptionId), + Arg.Is(resourceGroup => resourceGroup == null), // Resource group should be null + Arg.Is(topicName), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any()) + .Returns(Task.FromResult(expectedResult)); + + var args = _commandDefinition.Parse(["--subscription", subscriptionId, "--topic", topicName, "--data", eventData]); + + // Act + var response = await _command.ExecuteAsync(_context, args); + + // Assert + Assert.Equal(HttpStatusCode.OK, response.Status); + Assert.NotNull(response.Results); + + var json = JsonSerializer.Serialize(response.Results); + var result = JsonSerializer.Deserialize(json, EventGridJsonContext.Default.EventGridPublishCommandResult); + Assert.NotNull(result); + Assert.Equal("Success", result!.Result.Status); + Assert.Equal(1, result.Result.PublishedEventCount); + } + + [Fact] + public async Task ExecuteAsync_WithCloudEventsSchema_ReturnsSuccess() + { + // Arrange + var subscriptionId = "test-sub"; + var resourceGroup = "test-rg"; + var topicName = "test-topic"; + var eventData = JsonSerializer.Serialize(new + { + specversion = "1.0", + type = "com.example.CloudEventTest", + source = "/test/cloudevents", + id = "cloud-event-123", + time = "2025-01-15T10:00:00Z", + data = new { message = "Hello CloudEvents!" } + }); + + var expectedResult = new EventPublishResult( + Status: "Success", + Message: $"Successfully published 1 event(s) to topic '{topicName}'.", + PublishedEventCount: 1, + OperationId: Guid.NewGuid().ToString(), + PublishedAt: DateTime.UtcNow); + + _eventGridService.PublishEventAsync( + Arg.Is(subscriptionId), + Arg.Is(resourceGroup), + Arg.Is(topicName), + Arg.Any(), + Arg.Is("CloudEvents"), // Verify CloudEvents schema is passed + Arg.Any(), + Arg.Any()) + .Returns(Task.FromResult(expectedResult)); + + var args = _commandDefinition.Parse(["--subscription", subscriptionId, "--resource-group", resourceGroup, "--topic", topicName, "--data", eventData, "--schema", "CloudEvents"]); + + // Act + var response = await _command.ExecuteAsync(_context, args); + + // Assert + Assert.Equal(HttpStatusCode.OK, response.Status); + Assert.NotNull(response.Results); + + var json = JsonSerializer.Serialize(response.Results); + var result = JsonSerializer.Deserialize(json, EventGridJsonContext.Default.EventGridPublishCommandResult); + Assert.NotNull(result); + Assert.Equal("Success", result!.Result.Status); + Assert.Equal(1, result.Result.PublishedEventCount); + } + + [Fact] + public async Task ExecuteAsync_WithCustomSchema_ReturnsSuccess() + { + // Arrange + var subscriptionId = "test-sub"; + var resourceGroup = "test-rg"; + var topicName = "test-topic"; + var eventData = JsonSerializer.Serialize(new + { + orderNumber = "ORD-12345", + eventCategory = "OrderPlaced", + resourcePath = "/orders/12345", + occurredAt = "2025-01-15T10:00:00Z", + details = new { amount = 99.99, currency = "USD" } + }); + + var expectedResult = new EventPublishResult( + Status: "Success", + Message: $"Successfully published 1 event(s) to topic '{topicName}'.", + PublishedEventCount: 1, + OperationId: Guid.NewGuid().ToString(), + PublishedAt: DateTime.UtcNow); + + _eventGridService.PublishEventAsync( + Arg.Is(subscriptionId), + Arg.Is(resourceGroup), + Arg.Is(topicName), + Arg.Any(), + Arg.Is("Custom"), // Verify Custom schema is passed + Arg.Any(), + Arg.Any()) + .Returns(Task.FromResult(expectedResult)); + + var args = _commandDefinition.Parse(["--subscription", subscriptionId, "--resource-group", resourceGroup, "--topic", topicName, "--data", eventData, "--schema", "Custom"]); + + // Act + var response = await _command.ExecuteAsync(_context, args); + + // Assert + Assert.Equal(HttpStatusCode.OK, response.Status); + Assert.NotNull(response.Results); + + var json = JsonSerializer.Serialize(response.Results); + var result = JsonSerializer.Deserialize(json, EventGridJsonContext.Default.EventGridPublishCommandResult); + Assert.NotNull(result); + Assert.Equal("Success", result!.Result.Status); + Assert.Equal(1, result.Result.PublishedEventCount); + } + + [Theory] + [InlineData("EventGrid")] + [InlineData("CloudEvents")] + [InlineData("Custom")] + public async Task ExecuteAsync_WithDifferentSchemas_PassesSchemaCorrectly(string schema) + { + // Arrange + var subscriptionId = "test-sub"; + var resourceGroup = "test-rg"; + var topicName = "test-topic"; + var eventData = schema switch + { + "CloudEvents" => JsonSerializer.Serialize(new + { + specversion = "1.0", + type = "test.event", + source = "/test", + id = "test-123" + }), + "EventGrid" => JsonSerializer.Serialize(new + { + id = "test-123", + subject = "/test", + eventType = "TestEvent", + dataVersion = "1.0" + }), + _ => JsonSerializer.Serialize(new + { + customField = "customValue", + id = "test-123" + }) + }; + + var expectedResult = new EventPublishResult( + Status: "Success", + Message: $"Successfully published 1 event(s) to topic '{topicName}'.", + PublishedEventCount: 1, + OperationId: Guid.NewGuid().ToString(), + PublishedAt: DateTime.UtcNow); + + _eventGridService.PublishEventAsync( + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Is(schema), // Verify the schema parameter is passed correctly + Arg.Any(), + Arg.Any()) + .Returns(Task.FromResult(expectedResult)); + + var args = _commandDefinition.Parse(["--subscription", subscriptionId, "--resource-group", resourceGroup, "--topic", topicName, "--data", eventData, "--schema", schema]); + + // Act + var response = await _command.ExecuteAsync(_context, args); + + // Assert + Assert.Equal(HttpStatusCode.OK, response.Status); + } + + [Fact] + public async Task ExecuteAsync_WithoutEventSchema_DefaultsToEventGrid() + { + // Arrange + var subscriptionId = "test-sub"; + var resourceGroup = "test-rg"; + var topicName = "test-topic"; + var eventData = JsonSerializer.Serialize(new + { + subject = "/test/subject", + eventType = "TestEvent", + dataVersion = "1.0" + }); + + var expectedResult = new EventPublishResult( + Status: "Success", + Message: $"Successfully published 1 event(s) to topic '{topicName}'.", + PublishedEventCount: 1, + OperationId: Guid.NewGuid().ToString(), + PublishedAt: DateTime.UtcNow); + + _eventGridService.PublishEventAsync( + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Is(schema => schema == null), // Should be null when not specified + Arg.Any(), + Arg.Any()) + .Returns(Task.FromResult(expectedResult)); + + var args = _commandDefinition.Parse(["--subscription", subscriptionId, "--resource-group", resourceGroup, "--topic", topicName, "--data", eventData]); + + // Act + var response = await _command.ExecuteAsync(_context, args); + + // Assert + Assert.Equal(HttpStatusCode.OK, response.Status); + } + + [Fact] + public async Task ExecuteAsync_WithAccessDenied_Returns403() + { + // Arrange + var subscriptionId = "test-sub"; + var resourceGroup = "test-rg"; + var topicName = "test-topic"; + var eventData = JsonSerializer.Serialize(new + { + subject = "/test/subject", + eventType = "TestEvent", + dataVersion = "1.0" + }); + + _eventGridService.PublishEventAsync( + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any()) + .ThrowsAsync(new Azure.RequestFailedException(403, "Access denied to Event Grid topic")); + + var args = _commandDefinition.Parse(["--subscription", subscriptionId, "--resource-group", resourceGroup, "--topic", topicName, "--data", eventData]); + + // Act + var response = await _command.ExecuteAsync(_context, args); + + // Assert + Assert.Equal(HttpStatusCode.Forbidden, response.Status); + Assert.Contains("Access denied", response.Message); + } + + [Fact] + public async Task ExecuteAsync_WithInvalidEventSchema_Returns400() + { + // Arrange + var subscriptionId = "test-sub"; + var resourceGroup = "test-rg"; + var topicName = "test-topic"; + var eventData = JsonSerializer.Serialize(new + { + subject = "/test/subject", + eventType = "TestEvent", + dataVersion = "1.0" + }); + + _eventGridService.PublishEventAsync( + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any()) + .ThrowsAsync(new ArgumentException("Invalid event schema specified. Supported schemas are: CloudEvents, EventGrid, or Custom.")); + + var args = _commandDefinition.Parse(["--subscription", subscriptionId, "--resource-group", resourceGroup, "--topic", topicName, "--data", eventData, "--schema", "InvalidSchema"]); + + // Act + var response = await _command.ExecuteAsync(_context, args); + + // Assert + Assert.Equal(HttpStatusCode.BadRequest, response.Status); + Assert.Contains("Invalid event schema", response.Message); + } + + [Fact] + public async Task ExecuteAsync_WithBadRequestError_Returns400() + { + // Arrange + var subscriptionId = "test-sub"; + var resourceGroup = "test-rg"; + var topicName = "test-topic"; + var eventData = JsonSerializer.Serialize(new + { + subject = "/test/subject", + eventType = "TestEvent", + dataVersion = "1.0" + }); + + _eventGridService.PublishEventAsync( + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any()) + .ThrowsAsync(new Azure.RequestFailedException(400, "Invalid event data or schema format")); + + var args = _commandDefinition.Parse(["--subscription", subscriptionId, "--resource-group", resourceGroup, "--topic", topicName, "--data", eventData]); + + // Act + var response = await _command.ExecuteAsync(_context, args); + + // Assert + Assert.Equal(HttpStatusCode.BadRequest, response.Status); + Assert.Contains("Invalid event data", response.Message); + } + + [Fact] + public async Task ExecuteAsync_WithLargeEventPayload_ReturnsSuccess() + { + // Arrange + var subscriptionId = "test-sub"; + var resourceGroup = "test-rg"; + var topicName = "test-topic"; + + // Create a large data payload + var largeData = new + { + largeArray = Enumerable.Range(1, 100).Select(i => new { id = i, value = $"item-{i}" }).ToArray(), + largeString = new string('x', 1000), + nestedObject = new + { + level1 = new + { + level2 = new + { + level3 = new { data = "deep nested data" } + } + } + } + }; + + var eventData = JsonSerializer.Serialize(new + { + subject = "/test/large", + eventType = "LargeEvent", + dataVersion = "1.0", + data = largeData + }); + + var expectedResult = new EventPublishResult( + Status: "Success", + Message: $"Successfully published 1 event(s) to topic '{topicName}'.", + PublishedEventCount: 1, + OperationId: Guid.NewGuid().ToString(), + PublishedAt: DateTime.UtcNow); + + _eventGridService.PublishEventAsync( + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any()) + .Returns(Task.FromResult(expectedResult)); + + var args = _commandDefinition.Parse(["--subscription", subscriptionId, "--resource-group", resourceGroup, "--topic", topicName, "--data", eventData]); + + // Act + var response = await _command.ExecuteAsync(_context, args); + + // Assert + Assert.Equal(HttpStatusCode.OK, response.Status); + Assert.NotNull(response.Results); + } + + [Fact] + public async Task ExecuteAsync_WithCloudEventsMinimalFields_ReturnsSuccess() + { + // Arrange - Test CloudEvents with only required fields + var subscriptionId = "test-sub"; + var resourceGroup = "test-rg"; + var topicName = "test-topic"; + var eventData = JsonSerializer.Serialize(new + { + specversion = "1.0", + type = "com.example.minimal", + source = "/minimal/source", + id = "minimal-cloud-event" + // Missing optional fields: time, subject, data + }); + + var expectedResult = new EventPublishResult( + Status: "Success", + Message: $"Successfully published 1 event(s) to topic '{topicName}'.", + PublishedEventCount: 1, + OperationId: Guid.NewGuid().ToString(), + PublishedAt: DateTime.UtcNow); + + _eventGridService.PublishEventAsync( + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Is("CloudEvents"), + Arg.Any(), + Arg.Any()) + .Returns(Task.FromResult(expectedResult)); + + var args = _commandDefinition.Parse(["--subscription", subscriptionId, "--resource-group", resourceGroup, "--topic", topicName, "--data", eventData, "--schema", "CloudEvents"]); + + // Act + var response = await _command.ExecuteAsync(_context, args); + + // Assert + Assert.Equal(HttpStatusCode.OK, response.Status); + Assert.NotNull(response.Results); + } + + [Fact] + public async Task ExecuteAsync_WithCloudEventsDataContentType_ReturnsSuccess() + { + // Arrange - Test CloudEvents with datacontenttype field + var subscriptionId = "test-sub"; + var resourceGroup = "test-rg"; + var topicName = "test-topic"; + var eventData = JsonSerializer.Serialize(new + { + specversion = "1.0", + type = "com.example.data.structured", + source = "/datatype/test", + id = "datacontent-test-123", + time = "2025-01-15T10:00:00Z", + datacontenttype = "application/xml", + subject = "data/content/type/test", + data = new { message = "XML structured data", format = "xml" } + }); + + var expectedResult = new EventPublishResult( + Status: "Success", + Message: $"Successfully published 1 event(s) to topic '{topicName}'.", + PublishedEventCount: 1, + OperationId: Guid.NewGuid().ToString(), + PublishedAt: DateTime.UtcNow); + + _eventGridService.PublishEventAsync( + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Is("CloudEvents"), + Arg.Any(), + Arg.Any()) + .Returns(Task.FromResult(expectedResult)); + + var args = _commandDefinition.Parse(["--subscription", subscriptionId, "--resource-group", resourceGroup, "--topic", topicName, "--data", eventData, "--schema", "CloudEvents"]); + + // Act + var response = await _command.ExecuteAsync(_context, args); + + // Assert + Assert.Equal(HttpStatusCode.OK, response.Status); + Assert.NotNull(response.Results); + } + + [Fact] + public async Task ExecuteAsync_WithMixedSchemaFields_ReturnsSuccess() + { + // Arrange - Test Custom schema with mixed EventGrid and CloudEvents fields + var subscriptionId = "test-sub"; + var resourceGroup = "test-rg"; + var topicName = "test-topic"; + var eventData = JsonSerializer.Serialize(new + { + id = "mixed-event-123", + type = "MixedEvent", // CloudEvents field name + subject = "/mixed/subject", // EventGrid field name + dataVersion = "1.5", // EventGrid field name + time = "2025-01-15T13:00:00Z", // CloudEvents field name + data = new { mixed = "event data" } + }); + + var expectedResult = new EventPublishResult( + Status: "Success", + Message: $"Successfully published 1 event(s) to topic '{topicName}'.", + PublishedEventCount: 1, + OperationId: Guid.NewGuid().ToString(), + PublishedAt: DateTime.UtcNow); + + _eventGridService.PublishEventAsync( + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Is("Custom"), + Arg.Any(), + Arg.Any()) + .Returns(Task.FromResult(expectedResult)); + + var args = _commandDefinition.Parse(["--subscription", subscriptionId, "--resource-group", resourceGroup, "--topic", topicName, "--data", eventData, "--schema", "Custom"]); + + // Act + var response = await _command.ExecuteAsync(_context, args); + + // Assert + Assert.Equal(HttpStatusCode.OK, response.Status); + Assert.NotNull(response.Results); + } + + [Theory] + [InlineData("cloudevents")] + [InlineData("CLOUDEVENTS")] + [InlineData("CloudEvents")] + [InlineData("eventgrid")] + [InlineData("EVENTGRID")] + [InlineData("EventGrid")] + [InlineData("custom")] + [InlineData("CUSTOM")] + [InlineData("Custom")] + public async Task ExecuteAsync_WithSchemaNameCaseInsensitive_ReturnsSuccess(string schema) + { + // Arrange - Test that schema names are case insensitive + var subscriptionId = "test-sub"; + var resourceGroup = "test-rg"; + var topicName = "test-topic"; + var eventData = JsonSerializer.Serialize(new + { + id = "case-test", + subject = "/test/case", + eventType = "CaseTest", + dataVersion = "1.0" + }); + + var expectedResult = new EventPublishResult( + Status: "Success", + Message: $"Successfully published 1 event(s) to topic '{topicName}'.", + PublishedEventCount: 1, + OperationId: Guid.NewGuid().ToString(), + PublishedAt: DateTime.UtcNow); + + _eventGridService.PublishEventAsync( + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Is(schema), + Arg.Any(), + Arg.Any()) + .Returns(Task.FromResult(expectedResult)); + + var args = _commandDefinition.Parse(["--subscription", subscriptionId, "--resource-group", resourceGroup, "--topic", topicName, "--data", eventData, "--schema", schema]); + + // Act + var response = await _command.ExecuteAsync(_context, args); + + // Assert + Assert.Equal(HttpStatusCode.OK, response.Status); + } + + [Fact] + public async Task ExecuteAsync_WithArrayOfMixedSchemaEvents_ReturnsSuccess() + { + // Arrange - Test array with mixed event formats in Custom schema + var subscriptionId = "test-sub"; + var resourceGroup = "test-rg"; + var topicName = "test-topic"; + var eventData = JsonSerializer.Serialize(new object[] + { + new // EventGrid-style fields + { + id = "event-1", + subject = "/test/1", + eventType = "TestEvent1", + dataVersion = "1.0", + eventTime = "2025-01-15T10:00:00Z", + data = new { index = 1 } + }, + new // CloudEvents-style fields + { + id = "event-2", + source = "/test/2", + type = "TestEvent2", + specversion = "1.0", + time = "2025-01-15T11:00:00Z", + data = new { index = 2 } + } + }); + + var expectedResult = new EventPublishResult( + Status: "Success", + Message: $"Successfully published 2 event(s) to topic '{topicName}'.", + PublishedEventCount: 2, + OperationId: Guid.NewGuid().ToString(), + PublishedAt: DateTime.UtcNow); + + _eventGridService.PublishEventAsync( + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Is("Custom"), + Arg.Any(), + Arg.Any()) + .Returns(Task.FromResult(expectedResult)); + + var args = _commandDefinition.Parse(["--subscription", subscriptionId, "--resource-group", resourceGroup, "--topic", topicName, "--data", eventData, "--schema", "Custom"]); + + // Act + var response = await _command.ExecuteAsync(_context, args); + + // Assert + Assert.Equal(HttpStatusCode.OK, response.Status); + var json = JsonSerializer.Serialize(response.Results!); + var result = JsonSerializer.Deserialize(json, EventGridJsonContext.Default.EventGridPublishCommandResult); + Assert.Equal(2, result!.Result.PublishedEventCount); + } +}