From 300db7d8f0d4061e7d1f15a48453713aa8bdba83 Mon Sep 17 00:00:00 2001 From: Madalyn Redding Date: Fri, 11 Apr 2025 15:41:01 -0700 Subject: [PATCH 1/9] p1 --- .../samples/README.md | 2 +- .../tests/EventGridClientTests.cs | 12 ++++++++++++ .../tests/CloudEventTests.cs | 15 ++++++--------- 3 files changed, 19 insertions(+), 10 deletions(-) create mode 100644 sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/EventGridClientTests.cs diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/samples/README.md b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/samples/README.md index 67847a43dcd2..d3f6306754cd 100644 --- a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/samples/README.md +++ b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/samples/README.md @@ -13,4 +13,4 @@ description: Samples for the Azure.Messaging.EventGrid.Namespaces client library Before starting, take a look at the Azure Event Grid [README](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/README.md) for more information on how to create an Event Grid custom topic or domain using the Azure portal/Azure CLI, and retrieving the designated endpoint and credential. - [Using Namespace Topics](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/samples/Sample1_Namespaces.md) -- [Using the Cloud Native CloudEvent type] +- [Using the Cloud Native CloudEvent type](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/samples/Sample2_CNCF.md) diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/EventGridClientTests.cs b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/EventGridClientTests.cs new file mode 100644 index 000000000000..a6b40f347595 --- /dev/null +++ b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/EventGridClientTests.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Azure.Messaging.EventGrid.Namespaces.Tests +{ + internal class EventGridClientTests + { + } +} diff --git a/sdk/eventgrid/Microsoft.Azure.Messaging.EventGrid.CloudNativeCloudEvents/tests/CloudEventTests.cs b/sdk/eventgrid/Microsoft.Azure.Messaging.EventGrid.CloudNativeCloudEvents/tests/CloudEventTests.cs index b10a0df4db48..d240287d5322 100644 --- a/sdk/eventgrid/Microsoft.Azure.Messaging.EventGrid.CloudNativeCloudEvents/tests/CloudEventTests.cs +++ b/sdk/eventgrid/Microsoft.Azure.Messaging.EventGrid.CloudNativeCloudEvents/tests/CloudEventTests.cs @@ -1,23 +1,20 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. +using Azure; +using Azure.Core; +using Azure.Core.TestFramework; +using Azure.Messaging.EventGrid; +using CloudNative.CloudEvents.SystemTextJson; +using NUnit.Framework; using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; -using System.Net.Mime; using System.Text.Json; using System.Threading; using System.Threading.Tasks; -using Azure; -using Azure.Core; -using Azure.Core.Pipeline; -using Azure.Core.TestFramework; -using Azure.Messaging.EventGrid; -using CloudNative.CloudEvents; -using CloudNative.CloudEvents.SystemTextJson; -using NUnit.Framework; using CloudEvent = CloudNative.CloudEvents.CloudEvent; namespace Microsoft.Azure.Messaging.EventGrid.CloudNativeCloudEvents.Tests From d317202bf9308937b91fcfcc91823afdc55061e5 Mon Sep 17 00:00:00 2001 From: Madalyn Redding Date: Thu, 17 Apr 2025 12:51:11 -0700 Subject: [PATCH 2/9] update --- .../tests/CloudEventTests.cs | 267 ++++++++++++++++++ .../tests/EventGridClientTests.cs | 12 - 2 files changed, 267 insertions(+), 12 deletions(-) create mode 100644 sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/CloudEventTests.cs delete mode 100644 sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/EventGridClientTests.cs diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/CloudEventTests.cs b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/CloudEventTests.cs new file mode 100644 index 000000000000..77a0f6f5be50 --- /dev/null +++ b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/CloudEventTests.cs @@ -0,0 +1,267 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Azure.Core.TestFramework; +using Azure.Core.Tests; +using Azure.Messaging.EventGrid.Namespaces; +using NUnit.Framework; + +namespace Azure.Messaging.EventGrid.Tests +{ + public class CloudEventTests + { + private const string TraceParentHeaderName = "traceparent"; + private const string TraceStateHeaderName = "tracestate"; + + [Test] + [TestCase(false, false)] + [TestCase(true, true)] + [TestCase(true, false)] + [TestCase(false, true)] + public async Task SetsTraceParentExtension(bool inclTraceparent, bool inclTracestate) + { + MockTransport mockTransport = CreateMockTransport(); + + EventGridSenderClientOptions options = new() + { + Transport = mockTransport, + }; + EventGridSenderClient senderClient = new(new Uri("http://www.example.com"), "topic", new AzureKeyCredential("fake"), options); + + // simulating some other activity already being started before doing operations with the client + var activity = new Activity("ParentEvent"); + activity.SetIdFormat(ActivityIdFormat.W3C); + activity.Start(); + activity.TraceStateString = "tracestatevalue"; + + // create list of cloud events + List eventsList = new List(); + for (int i = 0; i < 10; i++) + { + CloudEvent cloudEvent = new CloudEvent( + "record", + "Microsoft.MockPublisher.TestEvent", + JsonDocument.Parse("{\"property1\": \"abc\", \"property2\": 123}").RootElement) + { + Id = "id", + Subject = $"Subject-{i}", + Time = DateTimeOffset.UtcNow + }; + if (inclTraceparent && i % 2 == 0) + { + cloudEvent.ExtensionAttributes.Add("traceparent", "traceparentValue"); + } + if (inclTracestate && i % 2 == 0) + { + cloudEvent.ExtensionAttributes.Add("tracestate", "param:value"); + } + eventsList.Add(cloudEvent); + } + + // send + await senderClient.SendAsync(eventsList).ConfigureAwait(false); + + // stop activity after extracting the events from the request as this is where the cloudEvents would actually + // be serialized + activity.Stop(); + + // validate + IEnumerator cloudEnum = eventsList.GetEnumerator(); + for (int i = 0; i < 10; i++) + { + cloudEnum.MoveNext(); + IDictionary cloudEventAttr = cloudEnum.Current.ExtensionAttributes; + if (inclTraceparent && inclTracestate && i % 2 == 0) + { + Assert.AreEqual( + "traceparentValue", + cloudEventAttr[TraceParentHeaderName]); + + Assert.AreEqual( + "param:value", + cloudEventAttr[TraceStateHeaderName]); + } + else if (inclTraceparent && i % 2 == 0) + { + Assert.AreEqual( + "traceparentValue", + cloudEventAttr[TraceParentHeaderName]); + } + else if (inclTracestate && i % 2 == 0) + { + Assert.AreEqual( + "param:value", + cloudEventAttr[TraceStateHeaderName]); + } + else + { + Assert.IsTrue(mockTransport.SingleRequest.Headers.TryGetValue(TraceParentHeaderName, out string requestHeader)); + + Assert.AreEqual( + requestHeader, + cloudEventAttr[TraceParentHeaderName]); + } + } + } + + [Test] + public async Task DoesNotSetTraceParentExtensionWhenTracingIsDisabled() + { + MockTransport mockTransport = CreateMockTransport(); + + EventGridSenderClientOptions options = new() + { + Transport = mockTransport, + Diagnostics = + { + IsDistributedTracingEnabled = false, + } + }; + EventGridSenderClient senderClient = new(new Uri("http://www.example.com"), "topic", new AzureKeyCredential("fake"), options); + + using ClientDiagnosticListener diagnosticListener = new(s => s.StartsWith("Azure."), asyncLocal: true); + + // simulating some other activity already being started before doing operations with the client + var activity = new Activity("ParentEvent"); + activity.SetIdFormat(ActivityIdFormat.W3C); + activity.Start(); + + CloudEvent cloudEvent = new CloudEvent( + "record", + "Microsoft.MockPublisher.TestEvent", + JsonDocument.Parse("{\"property1\": \"abc\", \"property2\": 123}").RootElement); + + await senderClient.SendAsync(cloudEvent).ConfigureAwait(false); + activity.Stop(); + + Assert.False(cloudEvent.ExtensionAttributes.ContainsKey("traceparent")); + Assert.False(cloudEvent.ExtensionAttributes.ContainsKey("tracestate")); + } + + [Test] + [TestCase(false, false)] + [TestCase(true, true)] + [TestCase(true, false)] + [TestCase(false, true)] + public async Task SetsTraceParentExtensionRetries(bool inclTraceparent, bool inclTracestate) + { + int requestCt = 0; + var mockTransport = new MockTransport((request) => + { + var stream = new MemoryStream(); + request.Content.WriteTo(stream, CancellationToken.None); + if (requestCt++ == 0) + { + return new MockResponse(500); + } + else + { + return new MockResponse(200); + } + }); + + EventGridSenderClientOptions options = new() + { + Transport = mockTransport + }; + EventGridSenderClient senderClient = new(new Uri("http://www.example.com"), "topic", new AzureKeyCredential("fake"), options); + using ClientDiagnosticListener diagnosticListener = new(s => s.StartsWith("Azure."), asyncLocal: true); + + // simulating some other activity already being started before doing operations with the client + var activity = new Activity("ParentEvent"); + activity.SetIdFormat(ActivityIdFormat.W3C); + activity.Start(); + activity.TraceStateString = "tracestatevalue"; + + // create list of cloud events + List eventsList = new List(); + for (int i = 0; i < 10; i++) + { + CloudEvent cloudEvent = new CloudEvent( + "record", + "Microsoft.MockPublisher.TestEvent", + JsonDocument.Parse("{\"property1\": \"abc\", \"property2\": 123}").RootElement) + { + Id = "id", + Subject = $"Subject-{i}", + Time = DateTimeOffset.UtcNow + }; + if (inclTraceparent && i % 2 == 0) + { + cloudEvent.ExtensionAttributes.Add("traceparent", "traceparentValue"); + } + if (inclTracestate && i % 2 == 0) + { + cloudEvent.ExtensionAttributes.Add("tracestate", "param:value"); + } + eventsList.Add(cloudEvent); + } + + // send + await senderClient.SendAsync(eventsList).ConfigureAwait(false); + + // stop activity after extracting the events from the request as this is where the cloudEvents would actually + // be serialized + activity.Stop(); + + // validate + IEnumerator cloudEnum = eventsList.GetEnumerator(); + for (int i = 0; i < 10; i++) + { + cloudEnum.MoveNext(); + IDictionary cloudEventAttr = cloudEnum.Current.ExtensionAttributes; + if (inclTraceparent && inclTracestate && i % 2 == 0) + { + Assert.AreEqual( + "traceparentValue", + cloudEventAttr[TraceParentHeaderName]); + + Assert.AreEqual( + "param:value", + cloudEventAttr[TraceStateHeaderName]); + } + else if (inclTraceparent && i % 2 == 0) + { + Assert.AreEqual( + "traceparentValue", + cloudEventAttr[TraceParentHeaderName]); + } + else if (inclTracestate && i % 2 == 0) + { + Assert.AreEqual( + "param:value", + cloudEventAttr[TraceStateHeaderName]); + } + else + { + Assert.IsTrue(mockTransport.Requests[1].Headers.TryGetValue(TraceParentHeaderName, out string traceParent)); + Assert.AreEqual( + traceParent, + cloudEventAttr[TraceParentHeaderName]); + + Assert.IsTrue(mockTransport.Requests[1].Headers.TryGetValue(TraceStateHeaderName, out string traceState)); + Assert.AreEqual( + traceState, + cloudEventAttr[TraceStateHeaderName]); + } + } + } + + private static MockTransport CreateMockTransport() + { + return new MockTransport((request) => + { + var stream = new MemoryStream(); + request.Content.WriteTo(stream, CancellationToken.None); + return new MockResponse(200); + }); + } + } +} diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/EventGridClientTests.cs b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/EventGridClientTests.cs deleted file mode 100644 index a6b40f347595..000000000000 --- a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/EventGridClientTests.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Azure.Messaging.EventGrid.Namespaces.Tests -{ - internal class EventGridClientTests - { - } -} From 3bbd2971495c3944f666cb255f656903ed1f4c54 Mon Sep 17 00:00:00 2001 From: Madalyn Redding Date: Fri, 18 Apr 2025 09:52:27 -0700 Subject: [PATCH 3/9] implementation --- .../Customization/CloudEventRequestContent.cs | 91 +++++++++++++++++++ .../Customization/EventGridSenderClient.cs | 11 ++- 2 files changed, 98 insertions(+), 4 deletions(-) create mode 100644 sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Customization/CloudEventRequestContent.cs diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Customization/CloudEventRequestContent.cs b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Customization/CloudEventRequestContent.cs new file mode 100644 index 000000000000..f4696d9e29a3 --- /dev/null +++ b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Customization/CloudEventRequestContent.cs @@ -0,0 +1,91 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Azure.Core; + +namespace Azure.Messaging.EventGrid.Namespaces +{ + internal class CloudEventRequestContent : RequestContent + { + private readonly IEnumerable _cloudEvents; + private const string TraceParentHeaderName = "traceparent"; + private const string TraceStateHeaderName = "tracestate"; + private readonly bool _isDistributedTracingEnabled; + private RequestContent _serializedContent; + + public CloudEventRequestContent(IEnumerable cloudEvents, bool isDistributedTracingEnabled) + { + _cloudEvents = cloudEvents; + _isDistributedTracingEnabled = isDistributedTracingEnabled; + } + + public CloudEventRequestContent(CloudEvent cloudEvent, bool isDistributedTracingEnabled) + { + _cloudEvents = [ cloudEvent ]; + _isDistributedTracingEnabled = isDistributedTracingEnabled; + } + + public override void Dispose() + { + } + + public override bool TryComputeLength(out long length) + { + EnsureSerialized(); + return _serializedContent.TryComputeLength(out length); + } + + public override void WriteTo(Stream stream, CancellationToken cancellationToken) + { + EnsureSerialized(); + _serializedContent.WriteTo(stream, cancellationToken); + } + + public override async Task WriteToAsync(Stream stream, CancellationToken cancellationToken) + { + EnsureSerialized(); + await _serializedContent.WriteToAsync(stream, cancellationToken).ConfigureAwait(false); + } + + private void EnsureSerialized() + { + if (_serializedContent != null) + { + return; + } + + if (_isDistributedTracingEnabled) + { + string currentActivityId = null; + string traceState = null; + Activity currentActivity = Activity.Current; + if (currentActivity != null && (currentActivity.IdFormat == ActivityIdFormat.W3C)) + { + currentActivityId = currentActivity.Id; + traceState = currentActivity.TraceStateString; + } + + foreach (CloudEvent cloudEvent in _cloudEvents) + { + if (currentActivityId != null && + !cloudEvent.ExtensionAttributes.ContainsKey(TraceParentHeaderName) && + !cloudEvent.ExtensionAttributes.ContainsKey(TraceStateHeaderName)) + { + cloudEvent.ExtensionAttributes.Add(TraceParentHeaderName, currentActivityId); + if (traceState != null) + { + cloudEvent.ExtensionAttributes.Add(TraceStateHeaderName, traceState); + } + } + } + } + + _serializedContent = RequestContent.Create(_cloudEvents); + } + } +} diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Customization/EventGridSenderClient.cs b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Customization/EventGridSenderClient.cs index b1bd243da4c1..686e893d475c 100644 --- a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Customization/EventGridSenderClient.cs +++ b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Customization/EventGridSenderClient.cs @@ -18,6 +18,7 @@ namespace Azure.Messaging.EventGrid.Namespaces public partial class EventGridSenderClient { private readonly string _topicName; + private readonly bool _isDistributedTracingEnabled; /// Initializes a new instance of EventGridSenderClient. /// The host name of the namespace, e.g. namespaceName1.westus-1.eventgrid.azure.net. @@ -66,6 +67,7 @@ public EventGridSenderClient(Uri endpoint, _endpoint = endpoint; _apiVersion = options.Version; _topicName = topicName; + _isDistributedTracingEnabled = options.Diagnostics.IsDistributedTracingEnabled; } /// Initializes a new instance of EventGridSenderClient. @@ -93,6 +95,7 @@ public EventGridSenderClient(Uri endpoint, _endpoint = endpoint; _apiVersion = options.Version; _topicName = topicName; + _isDistributedTracingEnabled = options.Diagnostics.IsDistributedTracingEnabled; } /// Publish Single Cloud Event to namespace topic. In case of success, the server responds with an HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return various error codes. For example, 401: which indicates authorization failure, 403: which indicates quota exceeded or message is too large, 410: which indicates that specific topic is not found, 400: for bad request, and 500: for internal server error. @@ -104,7 +107,7 @@ public virtual Response Send(CloudEvent cloudEvent, CancellationToken cancellati Argument.AssertNotNull(cloudEvent, nameof(cloudEvent)); RequestContext context = FromCancellationToken(cancellationToken); - return Send(_topicName, RequestContent.Create(cloudEvent), context); + return Send(_topicName, new CloudEventRequestContent(cloudEvent, _isDistributedTracingEnabled), context); } /// Publish Single Cloud Event to namespace topic. In case of success, the server responds with an HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return various error codes. For example, 401: which indicates authorization failure, 403: which indicates quota exceeded or message is too large, 410: which indicates that specific topic is not found, 400: for bad request, and 500: for internal server error. @@ -118,7 +121,7 @@ public virtual async Task SendAsync( Argument.AssertNotNull(cloudEvent, nameof(cloudEvent)); RequestContext context = FromCancellationToken(cancellationToken); - return await SendAsync(_topicName, RequestContent.Create(cloudEvent), context).ConfigureAwait(false); + return await SendAsync(_topicName, new CloudEventRequestContent(cloudEvent, _isDistributedTracingEnabled), context).ConfigureAwait(false); } /// Publish Batch Cloud Event to namespace topic. In case of success, the server responds with an HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return various error codes. For example, 401: which indicates authorization failure, 403: which indicates quota exceeded or message is too large, 410: which indicates that specific topic is not found, 400: for bad request, and 500: for internal server error. @@ -136,7 +139,7 @@ public virtual async Task SendAsync(IEnumerable cloudEvent scope.Start(); try { - return await SendEventsAsync(_topicName, RequestContent.Create(cloudEvents), context).ConfigureAwait(false); + return await SendEventsAsync(_topicName, new CloudEventRequestContent(cloudEvents, _isDistributedTracingEnabled), context).ConfigureAwait(false); } catch (Exception e) { @@ -160,7 +163,7 @@ public virtual Response Send(IEnumerable cloudEvents, CancellationTo scope.Start(); try { - return SendEvents(_topicName, RequestContent.Create(cloudEvents), context); + return SendEvents(_topicName, new CloudEventRequestContent(cloudEvents, _isDistributedTracingEnabled), context); } catch (Exception e) { From 3a354f8d1a50a830613e200bea893112a1df7c89 Mon Sep 17 00:00:00 2001 From: Madalyn Redding Date: Fri, 18 Apr 2025 13:40:34 -0700 Subject: [PATCH 4/9] second pass --- ...zure.Messaging.EventGrid.Namespaces.csproj | 13 ++- .../Customization/CloudEventRequestContent.cs | 91 ------------------- .../CloudEventRequestContent.cs | 0 sdk/eventgrid/Directory.Build.props | 11 +++ 4 files changed, 19 insertions(+), 96 deletions(-) delete mode 100644 sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Customization/CloudEventRequestContent.cs rename sdk/eventgrid/Azure.Messaging.EventGrid/src/{Customization => Shared}/CloudEventRequestContent.cs (100%) create mode 100644 sdk/eventgrid/Directory.Build.props diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Azure.Messaging.EventGrid.Namespaces.csproj b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Azure.Messaging.EventGrid.Namespaces.csproj index 2273ee2adedb..90e31819217f 100644 --- a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Azure.Messaging.EventGrid.Namespaces.csproj +++ b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Azure.Messaging.EventGrid.Namespaces.csproj @@ -9,16 +9,19 @@ $(RequiredTargetFrameworks) true true - + - + + - - Shared\Core\AzureKeyCredentialPolicy.cs - + + + + + diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Customization/CloudEventRequestContent.cs b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Customization/CloudEventRequestContent.cs deleted file mode 100644 index f4696d9e29a3..000000000000 --- a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Customization/CloudEventRequestContent.cs +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -using System.Collections.Generic; -using System.Diagnostics; -using System.IO; -using System.Threading; -using System.Threading.Tasks; -using Azure.Core; - -namespace Azure.Messaging.EventGrid.Namespaces -{ - internal class CloudEventRequestContent : RequestContent - { - private readonly IEnumerable _cloudEvents; - private const string TraceParentHeaderName = "traceparent"; - private const string TraceStateHeaderName = "tracestate"; - private readonly bool _isDistributedTracingEnabled; - private RequestContent _serializedContent; - - public CloudEventRequestContent(IEnumerable cloudEvents, bool isDistributedTracingEnabled) - { - _cloudEvents = cloudEvents; - _isDistributedTracingEnabled = isDistributedTracingEnabled; - } - - public CloudEventRequestContent(CloudEvent cloudEvent, bool isDistributedTracingEnabled) - { - _cloudEvents = [ cloudEvent ]; - _isDistributedTracingEnabled = isDistributedTracingEnabled; - } - - public override void Dispose() - { - } - - public override bool TryComputeLength(out long length) - { - EnsureSerialized(); - return _serializedContent.TryComputeLength(out length); - } - - public override void WriteTo(Stream stream, CancellationToken cancellationToken) - { - EnsureSerialized(); - _serializedContent.WriteTo(stream, cancellationToken); - } - - public override async Task WriteToAsync(Stream stream, CancellationToken cancellationToken) - { - EnsureSerialized(); - await _serializedContent.WriteToAsync(stream, cancellationToken).ConfigureAwait(false); - } - - private void EnsureSerialized() - { - if (_serializedContent != null) - { - return; - } - - if (_isDistributedTracingEnabled) - { - string currentActivityId = null; - string traceState = null; - Activity currentActivity = Activity.Current; - if (currentActivity != null && (currentActivity.IdFormat == ActivityIdFormat.W3C)) - { - currentActivityId = currentActivity.Id; - traceState = currentActivity.TraceStateString; - } - - foreach (CloudEvent cloudEvent in _cloudEvents) - { - if (currentActivityId != null && - !cloudEvent.ExtensionAttributes.ContainsKey(TraceParentHeaderName) && - !cloudEvent.ExtensionAttributes.ContainsKey(TraceStateHeaderName)) - { - cloudEvent.ExtensionAttributes.Add(TraceParentHeaderName, currentActivityId); - if (traceState != null) - { - cloudEvent.ExtensionAttributes.Add(TraceStateHeaderName, traceState); - } - } - } - } - - _serializedContent = RequestContent.Create(_cloudEvents); - } - } -} diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid/src/Customization/CloudEventRequestContent.cs b/sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventRequestContent.cs similarity index 100% rename from sdk/eventgrid/Azure.Messaging.EventGrid/src/Customization/CloudEventRequestContent.cs rename to sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventRequestContent.cs diff --git a/sdk/eventgrid/Directory.Build.props b/sdk/eventgrid/Directory.Build.props new file mode 100644 index 000000000000..d802ba0f887a --- /dev/null +++ b/sdk/eventgrid/Directory.Build.props @@ -0,0 +1,11 @@ + + + + + + + + $(MSBuildThisFileDirectory)Azure.Messaging.EventGrid\src\Shared\ + + + \ No newline at end of file From 71cd1069009a60561d13f301f1ca1ab4800ffc83 Mon Sep 17 00:00:00 2001 From: Madalyn Redding Date: Fri, 18 Apr 2025 13:49:03 -0700 Subject: [PATCH 5/9] tweaks to cloudeventrequestcontent --- .../Azure.Messaging.EventGrid.Namespaces.csproj | 3 +-- .../src/Shared/CloudEventRequestContent.cs | 17 +++++++++++------ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Azure.Messaging.EventGrid.Namespaces.csproj b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Azure.Messaging.EventGrid.Namespaces.csproj index 90e31819217f..bc1ca53d94f9 100644 --- a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Azure.Messaging.EventGrid.Namespaces.csproj +++ b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Azure.Messaging.EventGrid.Namespaces.csproj @@ -9,14 +9,13 @@ $(RequiredTargetFrameworks) true true - + - diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventRequestContent.cs b/sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventRequestContent.cs index 5a3ef44de7ae..37d00838b543 100644 --- a/sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventRequestContent.cs +++ b/sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventRequestContent.cs @@ -18,7 +18,13 @@ internal class CloudEventRequestContent : RequestContent private const string TraceParentHeaderName = "traceparent"; private const string TraceStateHeaderName = "tracestate"; private readonly bool _isDistributedTracingEnabled; - private byte[] _data; + private RequestContent _data; + + public CloudEventRequestContent(CloudEvent cloudEvent, bool isDistributedTracingEnabled) + { + _cloudEvents = [ cloudEvent ]; + _isDistributedTracingEnabled = isDistributedTracingEnabled; + } public CloudEventRequestContent(IEnumerable cloudEvents, bool isDistributedTracingEnabled) { @@ -33,20 +39,19 @@ public override void Dispose() public override bool TryComputeLength(out long length) { EnsureSerialized(); - length = _data.Length; - return true; + return _data.TryComputeLength(out length); } public override void WriteTo(Stream stream, CancellationToken cancellationToken) { EnsureSerialized(); - stream.Write(_data, 0, _data.Length); + _data.WriteTo(stream, cancellationToken); } public override async Task WriteToAsync(Stream stream, CancellationToken cancellationToken) { EnsureSerialized(); - await stream.WriteAsync(_data, 0, _data.Length, cancellationToken).ConfigureAwait(false); + await _data.WriteToAsync(stream, cancellationToken).ConfigureAwait(false); } private void EnsureSerialized() @@ -81,7 +86,7 @@ private void EnsureSerialized() } } } - _data = JsonSerializer.SerializeToUtf8Bytes(_cloudEvents, typeof(IEnumerable)); + _data = RequestContent.Create(_cloudEvents); } } } From 2fd7559a1dc1753c5f1b901663f98dbca4b475d2 Mon Sep 17 00:00:00 2001 From: Madalyn Redding Date: Mon, 21 Apr 2025 15:34:01 -0700 Subject: [PATCH 6/9] reformat tests p1 --- ...dEventTests.cs => EventGridClientTests.cs} | 230 +++++++++++++----- .../tests/CloudEventTests.cs | 15 +- 2 files changed, 180 insertions(+), 65 deletions(-) rename sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/{CloudEventTests.cs => EventGridClientTests.cs} (67%) diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/CloudEventTests.cs b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/EventGridClientTests.cs similarity index 67% rename from sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/CloudEventTests.cs rename to sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/EventGridClientTests.cs index 77a0f6f5be50..fc1bd453799b 100644 --- a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/CloudEventTests.cs +++ b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/EventGridClientTests.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.IO; +using System.Net.NetworkInformation; using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -15,7 +16,7 @@ namespace Azure.Messaging.EventGrid.Tests { - public class CloudEventTests + public class EventGridClientTests { private const string TraceParentHeaderName = "traceparent"; private const string TraceStateHeaderName = "tracestate"; @@ -25,7 +26,7 @@ public class CloudEventTests [TestCase(true, true)] [TestCase(true, false)] [TestCase(false, true)] - public async Task SetsTraceParentExtension(bool inclTraceparent, bool inclTracestate) + public async Task SendAsyncSetsTraceParentExtension(bool inclTraceparent, bool inclTracestate) { MockTransport mockTransport = CreateMockTransport(); @@ -41,74 +42,115 @@ public async Task SetsTraceParentExtension(bool inclTraceparent, bool inclTraces activity.Start(); activity.TraceStateString = "tracestatevalue"; - // create list of cloud events - List eventsList = new List(); - for (int i = 0; i < 10; i++) + List cloudEvents = CreateCloudEventsToSend(inclTraceparent, inclTracestate); + + // send + await senderClient.SendAsync(cloudEvents).ConfigureAwait(false); + + // stop activity after extracting the events from the request as this is where the cloudEvents would actually + // be serialized + activity.Stop(); + + ValidateCloudEvents(mockTransport, inclTraceparent, inclTracestate, cloudEvents); + } + + [Test] + [TestCase(false, false)] + [TestCase(true, true)] + [TestCase(true, false)] + [TestCase(false, true)] + public void SendSetsTraceParentExtension(bool inclTraceparent, bool inclTracestate) + { + MockTransport mockTransport = CreateMockTransport(); + + EventGridSenderClientOptions options = new() { - CloudEvent cloudEvent = new CloudEvent( - "record", - "Microsoft.MockPublisher.TestEvent", - JsonDocument.Parse("{\"property1\": \"abc\", \"property2\": 123}").RootElement) - { - Id = "id", - Subject = $"Subject-{i}", - Time = DateTimeOffset.UtcNow - }; - if (inclTraceparent && i % 2 == 0) - { - cloudEvent.ExtensionAttributes.Add("traceparent", "traceparentValue"); - } - if (inclTracestate && i % 2 == 0) - { - cloudEvent.ExtensionAttributes.Add("tracestate", "param:value"); - } - eventsList.Add(cloudEvent); - } + Transport = mockTransport, + }; + EventGridSenderClient senderClient = new(new Uri("http://www.example.com"), "topic", new AzureKeyCredential("fake"), options); + + // simulating some other activity already being started before doing operations with the client + var activity = new Activity("ParentEvent"); + activity.SetIdFormat(ActivityIdFormat.W3C); + activity.Start(); + activity.TraceStateString = "tracestatevalue"; + + List cloudEvents = CreateCloudEventsToSend(inclTraceparent, inclTracestate); // send - await senderClient.SendAsync(eventsList).ConfigureAwait(false); + senderClient.Send(cloudEvents); // stop activity after extracting the events from the request as this is where the cloudEvents would actually // be serialized activity.Stop(); - // validate - IEnumerator cloudEnum = eventsList.GetEnumerator(); - for (int i = 0; i < 10; i++) + ValidateCloudEvents(mockTransport, inclTraceparent, inclTracestate, cloudEvents); + } + + [Test] + [TestCase(false, false)] + [TestCase(true, true)] + [TestCase(true, false)] + [TestCase(false, true)] + public async Task SingleEventSendAsyncSetsTraceParentExtension(bool inclTraceparent, bool inclTracestate) + { + MockTransport mockTransport = CreateMockTransport(); + + EventGridSenderClientOptions options = new() { - cloudEnum.MoveNext(); - IDictionary cloudEventAttr = cloudEnum.Current.ExtensionAttributes; - if (inclTraceparent && inclTracestate && i % 2 == 0) - { - Assert.AreEqual( - "traceparentValue", - cloudEventAttr[TraceParentHeaderName]); + Transport = mockTransport, + }; + EventGridSenderClient senderClient = new(new Uri("http://www.example.com"), "topic", new AzureKeyCredential("fake"), options); - Assert.AreEqual( - "param:value", - cloudEventAttr[TraceStateHeaderName]); - } - else if (inclTraceparent && i % 2 == 0) - { - Assert.AreEqual( - "traceparentValue", - cloudEventAttr[TraceParentHeaderName]); - } - else if (inclTracestate && i % 2 == 0) - { - Assert.AreEqual( - "param:value", - cloudEventAttr[TraceStateHeaderName]); - } - else - { - Assert.IsTrue(mockTransport.SingleRequest.Headers.TryGetValue(TraceParentHeaderName, out string requestHeader)); + // simulating some other activity already being started before doing operations with the client + var activity = new Activity("ParentEvent"); + activity.SetIdFormat(ActivityIdFormat.W3C); + activity.Start(); + activity.TraceStateString = "tracestatevalue"; - Assert.AreEqual( - requestHeader, - cloudEventAttr[TraceParentHeaderName]); - } - } + List cloudEvents = CreateCloudEventsToSend(inclTraceparent, inclTracestate); + + // send + await senderClient.SendAsync(cloudEvents[0]); + + // stop activity after extracting the events from the request as this is where the cloudEvents would actually + // be serialized + activity.Stop(); + + ValidateCloudEvents(mockTransport, inclTraceparent, inclTracestate, cloudEvents); + } + + [Test] + [TestCase(false, false)] + [TestCase(true, true)] + [TestCase(true, false)] + [TestCase(false, true)] + public void SingleEventSendSetsTraceParentExtension(bool inclTraceparent, bool inclTracestate) + { + MockTransport mockTransport = CreateMockTransport(); + + EventGridSenderClientOptions options = new() + { + Transport = mockTransport, + }; + EventGridSenderClient senderClient = new(new Uri("http://www.example.com"), "topic", new AzureKeyCredential("fake"), options); + + // simulating some other activity already being started before doing operations with the client + var activity = new Activity("ParentEvent"); + activity.SetIdFormat(ActivityIdFormat.W3C); + activity.Start(); + activity.TraceStateString = "tracestatevalue"; + + List cloudEvents = CreateCloudEventsToSend(inclTraceparent, inclTracestate); + + // send + senderClient.Send(cloudEvents[0]); + + // stop activity after extracting the events from the request as this is where the cloudEvents would actually + // be serialized + activity.Stop(); + + ValidateCloudEvents(mockTransport, inclTraceparent, inclTracestate, cloudEvents); } [Test] @@ -254,6 +296,76 @@ public async Task SetsTraceParentExtensionRetries(bool inclTraceparent, bool inc } } + private static List CreateCloudEventsToSend(bool inclTraceparent, bool inclTracestate) + { + // create list of cloud events + List eventsList = new List(); + for (int i = 0; i < 10; i++) + { + CloudEvent cloudEvent = new CloudEvent( + "record", + "Microsoft.MockPublisher.TestEvent", + JsonDocument.Parse("{\"property1\": \"abc\", \"property2\": 123}").RootElement) + { + Id = "id", + Subject = $"Subject-{i}", + Time = DateTimeOffset.UtcNow + }; + if (inclTraceparent && i % 2 == 0) + { + cloudEvent.ExtensionAttributes.Add("traceparent", "traceparentValue"); + } + if (inclTracestate && i % 2 == 0) + { + cloudEvent.ExtensionAttributes.Add("tracestate", "param:value"); + } + eventsList.Add(cloudEvent); + } + + return eventsList; + } + + private void ValidateCloudEvents(MockTransport mockTransport, bool inclTraceparent, bool inclTracestate, List eventsList) + { + // validate + IEnumerator cloudEnum = eventsList.GetEnumerator(); + for (int i = 0; i < 10; i++) + { + cloudEnum.MoveNext(); + IDictionary cloudEventAttr = cloudEnum.Current.ExtensionAttributes; + if (inclTraceparent && inclTracestate && i % 2 == 0) + { + Assert.AreEqual( + "traceparentValue", + cloudEventAttr[TraceParentHeaderName]); + + Assert.AreEqual( + "param:value", + cloudEventAttr[TraceStateHeaderName]); + } + else if (inclTraceparent && i % 2 == 0) + { + Assert.AreEqual( + "traceparentValue", + cloudEventAttr[TraceParentHeaderName]); + } + else if (inclTracestate && i % 2 == 0) + { + Assert.AreEqual( + "param:value", + cloudEventAttr[TraceStateHeaderName]); + } + else + { + Assert.IsTrue(mockTransport.SingleRequest.Headers.TryGetValue(TraceParentHeaderName, out string requestHeader)); + + Assert.AreEqual( + requestHeader, + cloudEventAttr[TraceParentHeaderName]); + } + } + } + private static MockTransport CreateMockTransport() { return new MockTransport((request) => diff --git a/sdk/eventgrid/Microsoft.Azure.Messaging.EventGrid.CloudNativeCloudEvents/tests/CloudEventTests.cs b/sdk/eventgrid/Microsoft.Azure.Messaging.EventGrid.CloudNativeCloudEvents/tests/CloudEventTests.cs index d240287d5322..b10a0df4db48 100644 --- a/sdk/eventgrid/Microsoft.Azure.Messaging.EventGrid.CloudNativeCloudEvents/tests/CloudEventTests.cs +++ b/sdk/eventgrid/Microsoft.Azure.Messaging.EventGrid.CloudNativeCloudEvents/tests/CloudEventTests.cs @@ -1,20 +1,23 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -using Azure; -using Azure.Core; -using Azure.Core.TestFramework; -using Azure.Messaging.EventGrid; -using CloudNative.CloudEvents.SystemTextJson; -using NUnit.Framework; using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; +using System.Net.Mime; using System.Text.Json; using System.Threading; using System.Threading.Tasks; +using Azure; +using Azure.Core; +using Azure.Core.Pipeline; +using Azure.Core.TestFramework; +using Azure.Messaging.EventGrid; +using CloudNative.CloudEvents; +using CloudNative.CloudEvents.SystemTextJson; +using NUnit.Framework; using CloudEvent = CloudNative.CloudEvents.CloudEvent; namespace Microsoft.Azure.Messaging.EventGrid.CloudNativeCloudEvents.Tests From 3b936a82cbbd3132fa9c1e9fff7dfa3bb0b89d2e Mon Sep 17 00:00:00 2001 From: Madalyn Redding Date: Tue, 22 Apr 2025 10:52:32 -0700 Subject: [PATCH 7/9] tests --- .../tests/EventGridClientTests.cs | 401 ++++++++++++------ 1 file changed, 282 insertions(+), 119 deletions(-) diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/EventGridClientTests.cs b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/EventGridClientTests.cs index fc1bd453799b..a8b0848fcab6 100644 --- a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/EventGridClientTests.cs +++ b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/EventGridClientTests.cs @@ -16,42 +16,13 @@ namespace Azure.Messaging.EventGrid.Tests { - public class EventGridClientTests + public class EventGridClientTests : ClientTestBase { private const string TraceParentHeaderName = "traceparent"; private const string TraceStateHeaderName = "tracestate"; - [Test] - [TestCase(false, false)] - [TestCase(true, true)] - [TestCase(true, false)] - [TestCase(false, true)] - public async Task SendAsyncSetsTraceParentExtension(bool inclTraceparent, bool inclTracestate) + public EventGridClientTests(bool isAsync) : base(isAsync) { - MockTransport mockTransport = CreateMockTransport(); - - EventGridSenderClientOptions options = new() - { - Transport = mockTransport, - }; - EventGridSenderClient senderClient = new(new Uri("http://www.example.com"), "topic", new AzureKeyCredential("fake"), options); - - // simulating some other activity already being started before doing operations with the client - var activity = new Activity("ParentEvent"); - activity.SetIdFormat(ActivityIdFormat.W3C); - activity.Start(); - activity.TraceStateString = "tracestatevalue"; - - List cloudEvents = CreateCloudEventsToSend(inclTraceparent, inclTracestate); - - // send - await senderClient.SendAsync(cloudEvents).ConfigureAwait(false); - - // stop activity after extracting the events from the request as this is where the cloudEvents would actually - // be serialized - activity.Stop(); - - ValidateCloudEvents(mockTransport, inclTraceparent, inclTracestate, cloudEvents); } [Test] @@ -59,7 +30,7 @@ public async Task SendAsyncSetsTraceParentExtension(bool inclTraceparent, bool i [TestCase(true, true)] [TestCase(true, false)] [TestCase(false, true)] - public void SendSetsTraceParentExtension(bool inclTraceparent, bool inclTracestate) + public async Task SendSetsTraceParentExtension(bool inclTraceparent, bool inclTracestate) { MockTransport mockTransport = CreateMockTransport(); @@ -75,16 +46,81 @@ public void SendSetsTraceParentExtension(bool inclTraceparent, bool inclTracesta activity.Start(); activity.TraceStateString = "tracestatevalue"; - List cloudEvents = CreateCloudEventsToSend(inclTraceparent, inclTracestate); + // create list of cloud events + List eventsList = new List(); + for (int i = 0; i < 10; i++) + { + CloudEvent cloudEvent = new CloudEvent( + "record", + "Microsoft.MockPublisher.TestEvent", + JsonDocument.Parse("{\"property1\": \"abc\", \"property2\": 123}").RootElement) + { + Id = "id", + Subject = $"Subject-{i}", + Time = DateTimeOffset.UtcNow + }; + if (inclTraceparent && i % 2 == 0) + { + cloudEvent.ExtensionAttributes.Add("traceparent", "traceparentValue"); + } + if (inclTracestate && i % 2 == 0) + { + cloudEvent.ExtensionAttributes.Add("tracestate", "param:value"); + } + eventsList.Add(cloudEvent); + } // send - senderClient.Send(cloudEvents); + if (IsAsync) + { + await senderClient.SendAsync(eventsList).ConfigureAwait(false); + } + else + { + senderClient.Send(eventsList); + } // stop activity after extracting the events from the request as this is where the cloudEvents would actually // be serialized activity.Stop(); - ValidateCloudEvents(mockTransport, inclTraceparent, inclTracestate, cloudEvents); + // validate + IEnumerator cloudEnum = eventsList.GetEnumerator(); + for (int i = 0; i < 10; i++) + { + cloudEnum.MoveNext(); + IDictionary cloudEventAttr = cloudEnum.Current.ExtensionAttributes; + if (inclTraceparent && inclTracestate && i % 2 == 0) + { + Assert.AreEqual( + "traceparentValue", + cloudEventAttr[TraceParentHeaderName]); + + Assert.AreEqual( + "param:value", + cloudEventAttr[TraceStateHeaderName]); + } + else if (inclTraceparent && i % 2 == 0) + { + Assert.AreEqual( + "traceparentValue", + cloudEventAttr[TraceParentHeaderName]); + } + else if (inclTracestate && i % 2 == 0) + { + Assert.AreEqual( + "param:value", + cloudEventAttr[TraceStateHeaderName]); + } + else + { + Assert.IsTrue(mockTransport.SingleRequest.Headers.TryGetValue(TraceParentHeaderName, out string requestHeader)); + + Assert.AreEqual( + requestHeader, + cloudEventAttr[TraceParentHeaderName]); + } + } } [Test] @@ -92,7 +128,7 @@ public void SendSetsTraceParentExtension(bool inclTraceparent, bool inclTracesta [TestCase(true, true)] [TestCase(true, false)] [TestCase(false, true)] - public async Task SingleEventSendAsyncSetsTraceParentExtension(bool inclTraceparent, bool inclTracestate) + public async Task SingleEventSendSetsTraceParentExtension(bool inclTraceparent, bool inclTracestate) { MockTransport mockTransport = CreateMockTransport(); @@ -108,53 +144,118 @@ public async Task SingleEventSendAsyncSetsTraceParentExtension(bool inclTracepar activity.Start(); activity.TraceStateString = "tracestatevalue"; - List cloudEvents = CreateCloudEventsToSend(inclTraceparent, inclTracestate); + // create cloud event + CloudEvent cloudEvent = new CloudEvent( + "record", + "Microsoft.MockPublisher.TestEvent", + JsonDocument.Parse("{\"property1\": \"abc\", \"property2\": 123}").RootElement) + { + Id = "id", + Subject = $"Subject-1", + Time = DateTimeOffset.UtcNow + }; + + if (inclTraceparent) + { + cloudEvent.ExtensionAttributes.Add("traceparent", "traceparentValue"); + } + if (inclTracestate) + { + cloudEvent.ExtensionAttributes.Add("tracestate", "param:value"); + } // send - await senderClient.SendAsync(cloudEvents[0]); + if (IsAsync) + { + await senderClient.SendAsync(cloudEvent).ConfigureAwait(false); + } + else + { + senderClient.Send(cloudEvent); + } - // stop activity after extracting the events from the request as this is where the cloudEvents would actually - // be serialized - activity.Stop(); + // stop activity after extracting the events from the request as this is where the cloudEvents would actually + // be serialized + activity.Stop(); - ValidateCloudEvents(mockTransport, inclTraceparent, inclTracestate, cloudEvents); + // validate + IDictionary cloudEventAttr = cloudEvent.ExtensionAttributes; + if (inclTraceparent && inclTracestate) + { + Assert.AreEqual( + "traceparentValue", + cloudEventAttr[TraceParentHeaderName]); + + Assert.AreEqual( + "param:value", + cloudEventAttr[TraceStateHeaderName]); + } + else if (inclTraceparent) + { + Assert.AreEqual( + "traceparentValue", + cloudEventAttr[TraceParentHeaderName]); + } + else if (inclTracestate) + { + Assert.AreEqual( + "param:value", + cloudEventAttr[TraceStateHeaderName]); + } + else + { + Assert.IsTrue(mockTransport.SingleRequest.Headers.TryGetValue(TraceParentHeaderName, out string requestHeader)); + + Assert.AreEqual( + requestHeader, + cloudEventAttr[TraceParentHeaderName]); + } } [Test] - [TestCase(false, false)] - [TestCase(true, true)] - [TestCase(true, false)] - [TestCase(false, true)] - public void SingleEventSendSetsTraceParentExtension(bool inclTraceparent, bool inclTracestate) + public async Task SingleEventSendDoesNotSetTraceParentExtensionWhenTracingIsDisabled() { MockTransport mockTransport = CreateMockTransport(); EventGridSenderClientOptions options = new() { Transport = mockTransport, + Diagnostics = + { + IsDistributedTracingEnabled = false, + } }; EventGridSenderClient senderClient = new(new Uri("http://www.example.com"), "topic", new AzureKeyCredential("fake"), options); + using ClientDiagnosticListener diagnosticListener = new(s => s.StartsWith("Azure."), asyncLocal: true); + // simulating some other activity already being started before doing operations with the client var activity = new Activity("ParentEvent"); activity.SetIdFormat(ActivityIdFormat.W3C); activity.Start(); - activity.TraceStateString = "tracestatevalue"; - List cloudEvents = CreateCloudEventsToSend(inclTraceparent, inclTracestate); + CloudEvent cloudEvent = new CloudEvent( + "record", + "Microsoft.MockPublisher.TestEvent", + JsonDocument.Parse("{\"property1\": \"abc\", \"property2\": 123}").RootElement); - // send - senderClient.Send(cloudEvents[0]); + if (IsAsync) + { + await senderClient.SendAsync(cloudEvent).ConfigureAwait(false); + } + else + { + senderClient.Send(cloudEvent); + } - // stop activity after extracting the events from the request as this is where the cloudEvents would actually - // be serialized activity.Stop(); - ValidateCloudEvents(mockTransport, inclTraceparent, inclTracestate, cloudEvents); + Assert.False(cloudEvent.ExtensionAttributes.ContainsKey("traceparent")); + Assert.False(cloudEvent.ExtensionAttributes.ContainsKey("tracestate")); } [Test] - public async Task DoesNotSetTraceParentExtensionWhenTracingIsDisabled() + public async Task SendDoesNotSetTraceParentExtensionWhenTracingIsDisabled() { MockTransport mockTransport = CreateMockTransport(); @@ -175,16 +276,35 @@ public async Task DoesNotSetTraceParentExtensionWhenTracingIsDisabled() activity.SetIdFormat(ActivityIdFormat.W3C); activity.Start(); - CloudEvent cloudEvent = new CloudEvent( - "record", - "Microsoft.MockPublisher.TestEvent", - JsonDocument.Parse("{\"property1\": \"abc\", \"property2\": 123}").RootElement); + List eventsList = new List(); + for (int i = 0; i < 10; i++) + { + CloudEvent cloudEvent = new CloudEvent( + "record", + "Microsoft.MockPublisher.TestEvent", + JsonDocument.Parse("{\"property1\": \"abc\", \"property2\": 123}").RootElement); + + eventsList.Add(cloudEvent); + } + + if (IsAsync) + { + await senderClient.SendAsync(eventsList).ConfigureAwait(false); + } + else + { + senderClient.Send(eventsList); + } - await senderClient.SendAsync(cloudEvent).ConfigureAwait(false); activity.Stop(); - Assert.False(cloudEvent.ExtensionAttributes.ContainsKey("traceparent")); - Assert.False(cloudEvent.ExtensionAttributes.ContainsKey("tracestate")); + IEnumerator cloudEnum = eventsList.GetEnumerator(); + for (int i = 0; i < 10; i++) + { + cloudEnum.MoveNext(); + Assert.False(cloudEnum.Current.ExtensionAttributes.ContainsKey("traceparent")); + Assert.False(cloudEnum.Current.ExtensionAttributes.ContainsKey("tracestate")); + } } [Test] @@ -192,7 +312,7 @@ public async Task DoesNotSetTraceParentExtensionWhenTracingIsDisabled() [TestCase(true, true)] [TestCase(true, false)] [TestCase(false, true)] - public async Task SetsTraceParentExtensionRetries(bool inclTraceparent, bool inclTracestate) + public async Task SendSetsTraceParentExtensionRetries(bool inclTraceparent, bool inclTracestate) { int requestCt = 0; var mockTransport = new MockTransport((request) => @@ -247,7 +367,14 @@ public async Task SetsTraceParentExtensionRetries(bool inclTraceparent, bool inc } // send - await senderClient.SendAsync(eventsList).ConfigureAwait(false); + if (IsAsync) + { + await senderClient.SendAsync(eventsList).ConfigureAwait(false); + } + else + { + senderClient.Send(eventsList); + } // stop activity after extracting the events from the request as this is where the cloudEvents would actually // be serialized @@ -296,73 +423,109 @@ public async Task SetsTraceParentExtensionRetries(bool inclTraceparent, bool inc } } - private static List CreateCloudEventsToSend(bool inclTraceparent, bool inclTracestate) + [Test] + [TestCase(false, false)] + [TestCase(true, true)] + [TestCase(true, false)] + [TestCase(false, true)] + public async Task SingleEventSendSetsTraceParentExtensionRetries(bool inclTraceparent, bool inclTracestate) { - // create list of cloud events - List eventsList = new List(); - for (int i = 0; i < 10; i++) + int requestCt = 0; + var mockTransport = new MockTransport((request) => { - CloudEvent cloudEvent = new CloudEvent( - "record", - "Microsoft.MockPublisher.TestEvent", - JsonDocument.Parse("{\"property1\": \"abc\", \"property2\": 123}").RootElement) - { - Id = "id", - Subject = $"Subject-{i}", - Time = DateTimeOffset.UtcNow - }; - if (inclTraceparent && i % 2 == 0) + var stream = new MemoryStream(); + request.Content.WriteTo(stream, CancellationToken.None); + if (requestCt++ == 0) { - cloudEvent.ExtensionAttributes.Add("traceparent", "traceparentValue"); + return new MockResponse(500); } - if (inclTracestate && i % 2 == 0) + else { - cloudEvent.ExtensionAttributes.Add("tracestate", "param:value"); + return new MockResponse(200); } - eventsList.Add(cloudEvent); + }); + + EventGridSenderClientOptions options = new() + { + Transport = mockTransport + }; + EventGridSenderClient senderClient = new(new Uri("http://www.example.com"), "topic", new AzureKeyCredential("fake"), options); + using ClientDiagnosticListener diagnosticListener = new(s => s.StartsWith("Azure."), asyncLocal: true); + + // simulating some other activity already being started before doing operations with the client + var activity = new Activity("ParentEvent"); + activity.SetIdFormat(ActivityIdFormat.W3C); + activity.Start(); + activity.TraceStateString = "tracestatevalue"; + + // create list of cloud events + CloudEvent cloudEvent = new CloudEvent( + "record", + "Microsoft.MockPublisher.TestEvent", + JsonDocument.Parse("{\"property1\": \"abc\", \"property2\": 123}").RootElement) + { + Id = "id", + Subject = $"Subject-1", + Time = DateTimeOffset.UtcNow + }; + if (inclTraceparent) + { + cloudEvent.ExtensionAttributes.Add("traceparent", "traceparentValue"); + } + if (inclTracestate) + { + cloudEvent.ExtensionAttributes.Add("tracestate", "param:value"); } - return eventsList; - } + // send + if (IsAsync) + { + await senderClient.SendAsync(cloudEvent).ConfigureAwait(false); + } + else + { + senderClient.Send(cloudEvent); + } + + // stop activity after extracting the events from the request as this is where the cloudEvents would actually + // be serialized + activity.Stop(); - private void ValidateCloudEvents(MockTransport mockTransport, bool inclTraceparent, bool inclTracestate, List eventsList) - { // validate - IEnumerator cloudEnum = eventsList.GetEnumerator(); - for (int i = 0; i < 10; i++) + IDictionary cloudEventAttr = cloudEvent.ExtensionAttributes; + if (inclTraceparent && inclTracestate) { - cloudEnum.MoveNext(); - IDictionary cloudEventAttr = cloudEnum.Current.ExtensionAttributes; - if (inclTraceparent && inclTracestate && i % 2 == 0) - { - Assert.AreEqual( - "traceparentValue", - cloudEventAttr[TraceParentHeaderName]); - - Assert.AreEqual( - "param:value", - cloudEventAttr[TraceStateHeaderName]); - } - else if (inclTraceparent && i % 2 == 0) - { - Assert.AreEqual( - "traceparentValue", - cloudEventAttr[TraceParentHeaderName]); - } - else if (inclTracestate && i % 2 == 0) - { - Assert.AreEqual( - "param:value", - cloudEventAttr[TraceStateHeaderName]); - } - else - { - Assert.IsTrue(mockTransport.SingleRequest.Headers.TryGetValue(TraceParentHeaderName, out string requestHeader)); + Assert.AreEqual( + "traceparentValue", + cloudEventAttr[TraceParentHeaderName]); - Assert.AreEqual( - requestHeader, - cloudEventAttr[TraceParentHeaderName]); - } + Assert.AreEqual( + "param:value", + cloudEventAttr[TraceStateHeaderName]); + } + else if (inclTraceparent) + { + Assert.AreEqual( + "traceparentValue", + cloudEventAttr[TraceParentHeaderName]); + } + else if (inclTracestate) + { + Assert.AreEqual( + "param:value", + cloudEventAttr[TraceStateHeaderName]); + } + else + { + Assert.IsTrue(mockTransport.Requests[1].Headers.TryGetValue(TraceParentHeaderName, out string traceParent)); + Assert.AreEqual( + traceParent, + cloudEventAttr[TraceParentHeaderName]); + + Assert.IsTrue(mockTransport.Requests[1].Headers.TryGetValue(TraceStateHeaderName, out string traceState)); + Assert.AreEqual( + traceState, + cloudEventAttr[TraceStateHeaderName]); } } From 6f8f3b36e46c6ef6578439467a4d31c90a0f3d88 Mon Sep 17 00:00:00 2001 From: Madalyn Redding Date: Tue, 22 Apr 2025 12:52:25 -0700 Subject: [PATCH 8/9] fix request content --- .../src/Shared/CloudEventRequestContent.cs | 39 ++++++++++++++----- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventRequestContent.cs b/sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventRequestContent.cs index 37d00838b543..a31ec4d9e013 100644 --- a/sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventRequestContent.cs +++ b/sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventRequestContent.cs @@ -15,6 +15,7 @@ namespace Azure.Messaging.EventGrid internal class CloudEventRequestContent : RequestContent { private readonly IEnumerable _cloudEvents; + private readonly CloudEvent _cloudEvent; private const string TraceParentHeaderName = "traceparent"; private const string TraceStateHeaderName = "tracestate"; private readonly bool _isDistributedTracingEnabled; @@ -22,7 +23,7 @@ internal class CloudEventRequestContent : RequestContent public CloudEventRequestContent(CloudEvent cloudEvent, bool isDistributedTracingEnabled) { - _cloudEvents = [ cloudEvent ]; + _cloudEvent = cloudEvent; _isDistributedTracingEnabled = isDistributedTracingEnabled; } @@ -72,21 +73,39 @@ private void EnsureSerialized() traceState = currentActivity.TraceStateString; } - foreach (CloudEvent cloudEvent in _cloudEvents) + if (_cloudEvent != null) { - if (currentActivityId != null && - !cloudEvent.ExtensionAttributes.ContainsKey(TraceParentHeaderName) && - !cloudEvent.ExtensionAttributes.ContainsKey(TraceStateHeaderName)) + AddExtensionAttributes(_cloudEvent, currentActivityId, traceState); + } + else + { + foreach (CloudEvent cloudEvent in _cloudEvents) { - cloudEvent.ExtensionAttributes.Add(TraceParentHeaderName, currentActivityId); - if (traceState != null) - { - cloudEvent.ExtensionAttributes.Add(TraceStateHeaderName, traceState); - } + AddExtensionAttributes(cloudEvent, currentActivityId, traceState); } } } + + if (_cloudEvent != null) + { + _data = RequestContent.Create(_cloudEvent); + return; + } _data = RequestContent.Create(_cloudEvents); } + + private void AddExtensionAttributes(CloudEvent cloudEvent, string currentActivityId, string traceState) + { + if (currentActivityId != null && + !cloudEvent.ExtensionAttributes.ContainsKey(TraceParentHeaderName) && + !cloudEvent.ExtensionAttributes.ContainsKey(TraceStateHeaderName)) + { + cloudEvent.ExtensionAttributes.Add(TraceParentHeaderName, currentActivityId); + if (traceState != null) + { + cloudEvent.ExtensionAttributes.Add(TraceStateHeaderName, traceState); + } + } + } } } From c84007960d6d857f60f6777681fd159832c73883 Mon Sep 17 00:00:00 2001 From: Madalyn Redding Date: Tue, 22 Apr 2025 12:59:09 -0700 Subject: [PATCH 9/9] split types --- ...zure.Messaging.EventGrid.Namespaces.csproj | 1 + .../Customization/EventGridSenderClient.cs | 4 +- .../src/EventGridPublisherClient.cs | 2 +- .../src/Shared/CloudEventRequestContent.cs | 41 ++------- .../src/Shared/CloudEventsRequestContent.cs | 86 +++++++++++++++++++ 5 files changed, 97 insertions(+), 37 deletions(-) create mode 100644 sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventsRequestContent.cs diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Azure.Messaging.EventGrid.Namespaces.csproj b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Azure.Messaging.EventGrid.Namespaces.csproj index bc1ca53d94f9..ea3294db8608 100644 --- a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Azure.Messaging.EventGrid.Namespaces.csproj +++ b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Azure.Messaging.EventGrid.Namespaces.csproj @@ -18,6 +18,7 @@ + diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Customization/EventGridSenderClient.cs b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Customization/EventGridSenderClient.cs index 686e893d475c..24157710f649 100644 --- a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Customization/EventGridSenderClient.cs +++ b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Customization/EventGridSenderClient.cs @@ -139,7 +139,7 @@ public virtual async Task SendAsync(IEnumerable cloudEvent scope.Start(); try { - return await SendEventsAsync(_topicName, new CloudEventRequestContent(cloudEvents, _isDistributedTracingEnabled), context).ConfigureAwait(false); + return await SendEventsAsync(_topicName, new CloudEventsRequestContent(cloudEvents, _isDistributedTracingEnabled), context).ConfigureAwait(false); } catch (Exception e) { @@ -163,7 +163,7 @@ public virtual Response Send(IEnumerable cloudEvents, CancellationTo scope.Start(); try { - return SendEvents(_topicName, new CloudEventRequestContent(cloudEvents, _isDistributedTracingEnabled), context); + return SendEvents(_topicName, new CloudEventsRequestContent(cloudEvents, _isDistributedTracingEnabled), context); } catch (Exception e) { diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid/src/EventGridPublisherClient.cs b/sdk/eventgrid/Azure.Messaging.EventGrid/src/EventGridPublisherClient.cs index 57f581bbc528..b418dfcdc068 100644 --- a/sdk/eventgrid/Azure.Messaging.EventGrid/src/EventGridPublisherClient.cs +++ b/sdk/eventgrid/Azure.Messaging.EventGrid/src/EventGridPublisherClient.cs @@ -276,7 +276,7 @@ private async Task SendCloudEventsInternal(IEnumerable eve request.Headers.Add("aeg-channel-name", channelName); } - CloudEventRequestContent content = new CloudEventRequestContent(events, _clientDiagnostics.IsActivityEnabled); + CloudEventsRequestContent content = new CloudEventsRequestContent(events, _clientDiagnostics.IsActivityEnabled); request.Content = content; if (async) diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventRequestContent.cs b/sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventRequestContent.cs index a31ec4d9e013..12b0f3cc54c2 100644 --- a/sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventRequestContent.cs +++ b/sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventRequestContent.cs @@ -14,7 +14,6 @@ namespace Azure.Messaging.EventGrid { internal class CloudEventRequestContent : RequestContent { - private readonly IEnumerable _cloudEvents; private readonly CloudEvent _cloudEvent; private const string TraceParentHeaderName = "traceparent"; private const string TraceStateHeaderName = "tracestate"; @@ -27,12 +26,6 @@ public CloudEventRequestContent(CloudEvent cloudEvent, bool isDistributedTracing _isDistributedTracingEnabled = isDistributedTracingEnabled; } - public CloudEventRequestContent(IEnumerable cloudEvents, bool isDistributedTracingEnabled) - { - _cloudEvents = cloudEvents; - _isDistributedTracingEnabled = isDistributedTracingEnabled; - } - public override void Dispose() { } @@ -73,39 +66,19 @@ private void EnsureSerialized() traceState = currentActivity.TraceStateString; } - if (_cloudEvent != null) + if (currentActivityId != null && + !_cloudEvent.ExtensionAttributes.ContainsKey(TraceParentHeaderName) && + !_cloudEvent.ExtensionAttributes.ContainsKey(TraceStateHeaderName)) { - AddExtensionAttributes(_cloudEvent, currentActivityId, traceState); - } - else - { - foreach (CloudEvent cloudEvent in _cloudEvents) + _cloudEvent.ExtensionAttributes.Add(TraceParentHeaderName, currentActivityId); + if (traceState != null) { - AddExtensionAttributes(cloudEvent, currentActivityId, traceState); + _cloudEvent.ExtensionAttributes.Add(TraceStateHeaderName, traceState); } } } - if (_cloudEvent != null) - { - _data = RequestContent.Create(_cloudEvent); - return; - } - _data = RequestContent.Create(_cloudEvents); - } - - private void AddExtensionAttributes(CloudEvent cloudEvent, string currentActivityId, string traceState) - { - if (currentActivityId != null && - !cloudEvent.ExtensionAttributes.ContainsKey(TraceParentHeaderName) && - !cloudEvent.ExtensionAttributes.ContainsKey(TraceStateHeaderName)) - { - cloudEvent.ExtensionAttributes.Add(TraceParentHeaderName, currentActivityId); - if (traceState != null) - { - cloudEvent.ExtensionAttributes.Add(TraceStateHeaderName, traceState); - } - } + _data = RequestContent.Create(_cloudEvent); } } } diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventsRequestContent.cs b/sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventsRequestContent.cs new file mode 100644 index 000000000000..00a5e74d13c0 --- /dev/null +++ b/sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventsRequestContent.cs @@ -0,0 +1,86 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Azure.Core; +using Azure.Core.Pipeline; + +namespace Azure.Messaging.EventGrid +{ + internal class CloudEventsRequestContent : RequestContent + { + private readonly IEnumerable _cloudEvents; + private const string TraceParentHeaderName = "traceparent"; + private const string TraceStateHeaderName = "tracestate"; + private readonly bool _isDistributedTracingEnabled; + private RequestContent _data; + + public CloudEventsRequestContent(IEnumerable cloudEvents, bool isDistributedTracingEnabled) + { + _cloudEvents = cloudEvents; + _isDistributedTracingEnabled = isDistributedTracingEnabled; + } + + public override void Dispose() + { + } + + public override bool TryComputeLength(out long length) + { + EnsureSerialized(); + return _data.TryComputeLength(out length); + } + + public override void WriteTo(Stream stream, CancellationToken cancellationToken) + { + EnsureSerialized(); + _data.WriteTo(stream, cancellationToken); + } + + public override async Task WriteToAsync(Stream stream, CancellationToken cancellationToken) + { + EnsureSerialized(); + await _data.WriteToAsync(stream, cancellationToken).ConfigureAwait(false); + } + + private void EnsureSerialized() + { + if (_data != null) + { + return; + } + + if (_isDistributedTracingEnabled) + { + string currentActivityId = null; + string traceState = null; + Activity currentActivity = Activity.Current; + if (currentActivity != null && (currentActivity.IdFormat == ActivityIdFormat.W3C)) + { + currentActivityId = currentActivity.Id; + traceState = currentActivity.TraceStateString; + } + + foreach (CloudEvent cloudEvent in _cloudEvents) + { + if (currentActivityId != null && + !cloudEvent.ExtensionAttributes.ContainsKey(TraceParentHeaderName) && + !cloudEvent.ExtensionAttributes.ContainsKey(TraceStateHeaderName)) + { + cloudEvent.ExtensionAttributes.Add(TraceParentHeaderName, currentActivityId); + if (traceState != null) + { + cloudEvent.ExtensionAttributes.Add(TraceStateHeaderName, traceState); + } + } + } + } + _data = RequestContent.Create(_cloudEvents); + } + } +}