Skip to content

Commit 7cd9b81

Browse files
committed
Changed the publish event implementation
1 parent 8809381 commit 7cd9b81

File tree

8 files changed

+933
-85
lines changed

8 files changed

+933
-85
lines changed

docs/azmcp-commands.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,7 @@ azmcp eventgrid events publish --subscription "my-subscription" \
576576
azmcp eventgrid events publish --subscription "my-subscription" \
577577
--resource-group "my-rg" \
578578
--topic "my-topic" \
579-
--event-data '{"specversion": "1.0", "type": "com.example.order.created", "source": "/orders", "id": "123", "time": "2023-01-01T12:00:00Z", "data": {"orderId": "123"}}' \
579+
--event-data '{"specversion": "1.0", "type": "com.example.order.created", "source": "/orders", "id": "123", "time": "2023-01-01T12:00:00Z", "datacontenttype": "application/json", "data": {"orderId": "123"}}' \
580580
--event-schema "CloudEvents"
581581

582582
# Publish with custom schema

tools/Azure.Mcp.Tools.EventGrid/src/Commands/EventGridJsonContext.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,18 @@
55
using Azure.Mcp.Tools.EventGrid.Commands.Events;
66
using Azure.Mcp.Tools.EventGrid.Commands.Subscription;
77
using Azure.Mcp.Tools.EventGrid.Commands.Topic;
8+
using Azure.Mcp.Tools.EventGrid.Models;
89

910
namespace Azure.Mcp.Tools.EventGrid.Commands;
1011

12+
// JsonSerializable attributes for all types used in EventGrid command responses and event serialization
1113
[JsonSerializable(typeof(EventsPublishCommand.EventsPublishCommandResult))]
1214
[JsonSerializable(typeof(SubscriptionListCommand.SubscriptionListCommandResult))]
1315
[JsonSerializable(typeof(TopicListCommand.TopicListCommandResult))]
1416
[JsonSerializable(typeof(EventGridSubscriptionInfo))]
1517
[JsonSerializable(typeof(EventGridTopicInfo))]
1618
[JsonSerializable(typeof(EventPublishResult))]
19+
[JsonSerializable(typeof(EventGridEventSchema))] // For individual event serialization to EventGrid
1720
[JsonSourceGenerationOptions(PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase, WriteIndented = true, DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)]
1821
internal sealed partial class EventGridJsonContext : JsonSerializerContext
1922
{

tools/Azure.Mcp.Tools.EventGrid/src/Commands/Events/EventsPublishCommand.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public override async Task<CommandResponse> ExecuteAsync(CommandContext context,
9999
Azure.RequestFailedException reqEx when reqEx.Status == 400 =>
100100
"Invalid event data or schema format. Please verify the event data is valid JSON and matches the expected schema.",
101101
ArgumentException argEx when argEx.Message.Contains("schema") =>
102-
"Invalid event schema specified. Supported schemas are: CloudEvents, EventGridEvent, or Custom.",
102+
"Invalid event schema specified. Supported schemas are: CloudEvents, EventGrid, or Custom.",
103103
System.Text.Json.JsonException jsonEx =>
104104
$"Invalid JSON format in event data: {jsonEx.Message}",
105105
_ => base.GetErrorMessage(ex)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using System.Text.Json.Nodes;
5+
using System.Text.Json.Serialization;
6+
7+
namespace Azure.Mcp.Tools.EventGrid.Models;
8+
9+
/// <summary>
10+
/// Represents an event conforming to the EventGrid event schema for HTTP API publishing.
11+
/// </summary>
12+
public sealed class EventGridEventSchema
13+
{
14+
[JsonPropertyName("id")]
15+
public required string Id { get; set; }
16+
17+
[JsonPropertyName("subject")]
18+
public required string Subject { get; set; }
19+
20+
[JsonPropertyName("eventType")]
21+
public required string EventType { get; set; }
22+
23+
[JsonPropertyName("dataVersion")]
24+
public required string DataVersion { get; set; }
25+
26+
[JsonPropertyName("data")]
27+
public JsonNode? Data { get; set; }
28+
29+
[JsonPropertyName("eventTime")]
30+
public DateTimeOffset EventTime { get; set; }
31+
}

tools/Azure.Mcp.Tools.EventGrid/src/Options/EventGridOptionDefinitions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,6 @@ public static class EventGridOptionDefinitions
3535
$"--{EventSchemaParam}"
3636
)
3737
{
38-
Description = "The event schema type (CloudEvents, EventGridEvent, or Custom). Defaults to EventGridEvent."
38+
Description = "The event schema type (CloudEvents, EventGrid, or Custom). Defaults to EventGrid."
3939
};
4040
}

tools/Azure.Mcp.Tools.EventGrid/src/Services/EventGridService.cs

Lines changed: 99 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4-
using System.Net.Http;
5-
using System.Text;
64
using System.Text.Json;
5+
using System.Text.Json.Nodes;
76
using Azure.Core;
87
using Azure.Mcp.Core.Options;
8+
using Azure.Mcp.Tools.EventGrid.Commands;
9+
using Azure.Mcp.Tools.EventGrid.Models;
910
using Azure.Messaging.EventGrid;
1011
using Azure.ResourceManager.EventGrid;
1112
using Azure.ResourceManager.EventGrid.Models;
@@ -131,62 +132,28 @@ public async Task<EventPublishResult> PublishEventsAsync(
131132
// Get credential using standardized method from base class for Azure AD authentication
132133
var credential = await GetCredential(tenant);
133134

134-
// Parse and validate event data
135-
var events = ParseAndValidateEventData(eventData, eventSchema ?? "EventGridEvent");
135+
// Parse and validate event data directly to EventGridEventSchema
136+
var eventGridEventSchemas = ParseAndValidateEventData(eventData, eventSchema ?? "EventGridEvent");
136137

137-
// Use raw HTTP approach to completely avoid AOT serialization issues
138-
using var httpClient = new HttpClient();
138+
// Use EventGridPublisherClient with BinaryData for AOT-compatible publishing
139+
var publisherClient = new EventGridPublisherClient(topic.Data.Endpoint, credential);
139140

140-
// Create events as raw JSON strings using lazy evaluation
141-
var eventJsonStrings = events.Select(evt =>
141+
// Serialize each event individually to JSON using source-generated context and convert to BinaryData
142+
var eventsBinaryData = eventGridEventSchemas.Select(eventSchema =>
142143
{
143-
// Use JsonEncodedText.Encode for AOT-compatible JSON escaping
144-
var escapedSubject = JsonEncodedText.Encode(evt.Subject);
145-
var escapedEventType = JsonEncodedText.Encode(evt.EventType);
146-
var escapedId = JsonEncodedText.Encode(evt.Id);
147-
var escapedDataVersion = JsonEncodedText.Encode(evt.DataVersion);
148-
var formattedEventTime = evt.EventTime.ToString("yyyy-MM-ddTHH:mm:ss.fffZ");
149-
150-
return $$"""
151-
{
152-
"id": "{{escapedId}}",
153-
"subject": "{{escapedSubject}}",
154-
"eventType": "{{escapedEventType}}",
155-
"dataVersion": "{{escapedDataVersion}}",
156-
"eventTime": "{{formattedEventTime}}",
157-
"data": {{evt.Data ?? "{}"}}
158-
}
159-
""";
160-
});
161-
162-
var jsonPayload = $"[{string.Join(",", eventJsonStrings)}]";
144+
var jsonString = JsonSerializer.Serialize(eventSchema, EventGridJsonContext.Default.EventGridEventSchema);
145+
return BinaryData.FromString(jsonString);
146+
}).ToArray();
163147

164148
// Get event count for logging (this will materialize the enumerable once)
165-
var eventCount = events.Count();
149+
var eventCount = eventsBinaryData.Length;
166150
_logger.LogInformation("Publishing {EventCount} events to topic '{TopicName}' with operation ID: {OperationId}",
167151
eventCount, topicName, operationId);
168152

169153
try
170154
{
171-
// Send the events using raw HTTP POST to avoid EventGridPublisherClient AOT issues
172-
var content = new StringContent(jsonPayload, Encoding.UTF8, "application/json");
173-
174-
// Get access token for Event Grid using Azure AD authentication
175-
var tokenRequestContext = new TokenRequestContext(new[] { "https://eventgrid.azure.net/.default" });
176-
var tokenResult = await credential.GetTokenAsync(tokenRequestContext, CancellationToken.None);
177-
178-
// Add Authorization header with Bearer token for Azure AD authentication
179-
httpClient.DefaultRequestHeaders.Authorization =
180-
new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", tokenResult.Token);
181-
182-
var response = await httpClient.PostAsync(topic.Data.Endpoint, content);
183-
184-
if (!response.IsSuccessStatusCode)
185-
{
186-
var responseContent = await response.Content.ReadAsStringAsync();
187-
throw new HttpRequestException(
188-
$"Event Grid returned status code {response.StatusCode}. Response: {responseContent}");
189-
}
155+
// Send events using EventGridPublisherClient with BinaryData (AOT-compatible)
156+
await publisherClient.SendEventsAsync(eventsBinaryData);
190157
}
191158
catch (Exception publishEx)
192159
{
@@ -219,25 +186,25 @@ public async Task<EventPublishResult> PublishEventsAsync(
219186
}
220187
}
221188

222-
private static IEnumerable<EventData> ParseAndValidateEventData(string eventData, string eventSchema)
189+
private static IEnumerable<EventGridEventSchema> ParseAndValidateEventData(string eventData, string eventSchema)
223190
{
224191
try
225192
{
226193
// Parse the JSON data
227194
var jsonDocument = JsonDocument.Parse(eventData);
228195

229-
IEnumerable<EventData> events;
196+
IEnumerable<EventGridEventSchema> events;
230197

231198
if (jsonDocument.RootElement.ValueKind == JsonValueKind.Array)
232199
{
233200
// Handle array of events - use lazy evaluation
234201
events = jsonDocument.RootElement.EnumerateArray()
235-
.Select(eventElement => CreateEventDataFromJsonElement(eventElement, eventSchema));
202+
.Select(eventElement => CreateEventGridEventSchemaFromJsonElement(eventElement, eventSchema));
236203
}
237204
else
238205
{
239206
// Handle single event - return single item enumerable
240-
events = new[] { CreateEventDataFromJsonElement(jsonDocument.RootElement, eventSchema) };
207+
events = new[] { CreateEventGridEventSchemaFromJsonElement(jsonDocument.RootElement, eventSchema) };
241208
}
242209

243210
// Force evaluation to validate all events before returning
@@ -255,48 +222,98 @@ private static IEnumerable<EventData> ParseAndValidateEventData(string eventData
255222
}
256223
}
257224

258-
private static EventData CreateEventDataFromJsonElement(JsonElement eventElement, string eventSchema)
225+
private static EventGridEventSchema CreateEventGridEventSchemaFromJsonElement(JsonElement eventElement, string eventSchema)
259226
{
260-
// Extract required properties
261-
var eventType = eventElement.TryGetProperty("eventType", out var eventTypeProp) ? eventTypeProp.GetString() :
227+
string? eventType, subject, dataVersion;
228+
DateTimeOffset eventTime;
229+
230+
// Extract event ID early for logging purposes
231+
var id = eventElement.TryGetProperty("id", out var idProp) ? idProp.GetString() : Guid.NewGuid().ToString();
232+
233+
if (eventSchema.Equals("CloudEvents", StringComparison.OrdinalIgnoreCase))
234+
{
235+
// CloudEvents spec handling (v1.0)
236+
eventType = eventElement.TryGetProperty("type", out var typeProp) ? typeProp.GetString() : "CustomEvent";
237+
238+
// CloudEvents uses "source" field, but we can fall back to "subject" for compatibility
239+
subject = eventElement.TryGetProperty("source", out var sourceProp) ? sourceProp.GetString() :
240+
eventElement.TryGetProperty("subject", out var subjectProp) ? subjectProp.GetString() : "/default/subject";
241+
242+
// CloudEvents uses "specversion" for schema version
243+
dataVersion = eventElement.TryGetProperty("specversion", out var specProp) ? specProp.GetString() : "1.0";
244+
245+
// CloudEvents uses "time" field
246+
eventTime = eventElement.TryGetProperty("time", out var timeProp) && timeProp.TryGetDateTimeOffset(out var timeValue)
247+
? timeValue : DateTimeOffset.UtcNow;
248+
249+
// Handle datacontenttype - CloudEvents v1.0 spec field for content type of data payload
250+
var dataContentType = eventElement.TryGetProperty("datacontenttype", out var dataContentTypeProp)
251+
? dataContentTypeProp.GetString()
252+
: "application/json"; // Default per CloudEvents spec
253+
254+
// Log and validate datacontenttype for debugging and monitoring purposes
255+
256+
if (!string.Equals(dataContentType, "application/json", StringComparison.OrdinalIgnoreCase))
257+
{
258+
// Log when non-JSON content types are used - this helps with debugging
259+
// Note: EventGrid will accept the event regardless of datacontenttype,
260+
// but subscribers should handle non-JSON content types appropriately
261+
// Common non-JSON types: application/xml, text/plain, application/octet-stream
262+
263+
// For now, we'll just validate that it's a recognized MIME type format
264+
if (string.IsNullOrWhiteSpace(dataContentType) || !dataContentType.Contains('/'))
265+
{
266+
throw new ArgumentException($"Invalid datacontenttype '{dataContentType}' in CloudEvent with id '{id}'. Must be a valid MIME type (e.g., 'application/xml', 'text/plain').");
267+
}
268+
}
269+
}
270+
else if (eventSchema.Equals("EventGrid", StringComparison.OrdinalIgnoreCase))
271+
{
272+
// EventGrid spec handling
273+
eventType = eventElement.TryGetProperty("eventType", out var eventTypeProp) ? eventTypeProp.GetString() : "CustomEvent";
274+
subject = eventElement.TryGetProperty("subject", out var subjectProp) ? subjectProp.GetString() : "/default/subject";
275+
dataVersion = eventElement.TryGetProperty("dataVersion", out var dataVersionProp) ? dataVersionProp.GetString() : "1.0";
276+
eventTime = eventElement.TryGetProperty("eventTime", out var timeProp) && timeProp.TryGetDateTimeOffset(out var eventTimeValue)
277+
? eventTimeValue : DateTimeOffset.UtcNow;
278+
}
279+
else // Custom schema
280+
{
281+
// For custom schema, try both CloudEvents and EventGrid field names for flexibility
282+
eventType = eventElement.TryGetProperty("eventType", out var eventTypeProp) ? eventTypeProp.GetString() :
262283
eventElement.TryGetProperty("type", out var typeProp) ? typeProp.GetString() : "CustomEvent";
263284

264-
var subject = eventElement.TryGetProperty("subject", out var subjectProp) ? subjectProp.GetString() : "/default/subject";
285+
subject = eventElement.TryGetProperty("subject", out var subjectProp) ? subjectProp.GetString() :
286+
eventElement.TryGetProperty("source", out var sourceProp) ? sourceProp.GetString() : "/default/subject";
265287

266-
var dataVersion = eventElement.TryGetProperty("dataVersion", out var dataVersionProp) ? dataVersionProp.GetString() : "1.0";
288+
dataVersion = eventElement.TryGetProperty("dataVersion", out var dataVersionProp) ? dataVersionProp.GetString() :
289+
eventElement.TryGetProperty("specversion", out var specProp) ? specProp.GetString() : "1.0";
267290

268-
// Extract data payload as raw JSON string for AOT compatibility
269-
string? data = null;
291+
eventTime = eventElement.TryGetProperty("eventTime", out var eventTimeProp) && eventTimeProp.TryGetDateTimeOffset(out var eventTimeValue) ? eventTimeValue :
292+
eventElement.TryGetProperty("time", out var timeProp) && timeProp.TryGetDateTimeOffset(out var timeValue) ? timeValue : DateTimeOffset.UtcNow;
293+
}
294+
295+
// Extract data payload and parse as JsonNode for AOT compatibility
296+
JsonNode? data = null;
270297
if (eventElement.TryGetProperty("data", out var dataProp))
271298
{
272-
data = dataProp.GetRawText();
299+
data = JsonNode.Parse(dataProp.GetRawText());
273300
}
274301

275-
var id = eventElement.TryGetProperty("id", out var idProp) ? idProp.GetString() : Guid.NewGuid().ToString();
276-
var eventTime = eventElement.TryGetProperty("eventTime", out var timeProp) && timeProp.TryGetDateTimeOffset(out var eventTimeValue)
277-
? eventTimeValue : DateTimeOffset.UtcNow;
302+
// For CloudEvents schema, we've already captured datacontenttype above for validation/logging
303+
// The EventGrid schema doesn't have a direct equivalent, so we don't persist it in the final event
278304

279-
// Create a simple event data structure
280-
return new EventData(
281-
Id: id ?? Guid.NewGuid().ToString(),
282-
Subject: subject ?? "/default/subject",
283-
EventType: eventType ?? "CustomEvent",
284-
DataVersion: dataVersion ?? "1.0",
285-
Data: data,
286-
EventTime: eventTime,
287-
Schema: eventSchema);
305+
// Create EventGridEventSchema directly
306+
return new EventGridEventSchema
307+
{
308+
Id = id ?? Guid.NewGuid().ToString(),
309+
Subject = subject ?? "/default/subject",
310+
EventType = eventType ?? "CustomEvent",
311+
DataVersion = dataVersion ?? "1.0",
312+
Data = data,
313+
EventTime = eventTime
314+
};
288315
}
289316

290-
// Simple record to hold event data for validation
291-
private record EventData(
292-
string Id,
293-
string Subject,
294-
string EventType,
295-
string DataVersion,
296-
string? Data,
297-
DateTimeOffset EventTime,
298-
string Schema);
299-
300317
private async Task GetSubscriptionsForSpecificTopic(
301318
SubscriptionResource subscriptionResource,
302319
string? resourceGroup,

0 commit comments

Comments
 (0)