diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/samples/README.md b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/samples/README.md
index 67847a43dcd2..d3f6306754cd 100644
--- a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/samples/README.md
+++ b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/samples/README.md
@@ -13,4 +13,4 @@ description: Samples for the Azure.Messaging.EventGrid.Namespaces client library
Before starting, take a look at the Azure Event Grid [README](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/README.md) for more information on how to create an Event Grid custom topic or domain using the Azure portal/Azure CLI, and retrieving the designated endpoint and credential.
- [Using Namespace Topics](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/samples/Sample1_Namespaces.md)
-- [Using the Cloud Native CloudEvent type]
+- [Using the Cloud Native CloudEvent type](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/samples/Sample2_CNCF.md)
diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Azure.Messaging.EventGrid.Namespaces.csproj b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Azure.Messaging.EventGrid.Namespaces.csproj
index 2273ee2adedb..ea3294db8608 100644
--- a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Azure.Messaging.EventGrid.Namespaces.csproj
+++ b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Azure.Messaging.EventGrid.Namespaces.csproj
@@ -11,14 +11,17 @@
true
-
+
+
-
- Shared\Core\AzureKeyCredentialPolicy.cs
-
+
+
+
+
+
diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Customization/EventGridSenderClient.cs b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Customization/EventGridSenderClient.cs
index b1bd243da4c1..24157710f649 100644
--- a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Customization/EventGridSenderClient.cs
+++ b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Customization/EventGridSenderClient.cs
@@ -18,6 +18,7 @@ namespace Azure.Messaging.EventGrid.Namespaces
public partial class EventGridSenderClient
{
private readonly string _topicName;
+ private readonly bool _isDistributedTracingEnabled;
/// Initializes a new instance of EventGridSenderClient.
/// The host name of the namespace, e.g. namespaceName1.westus-1.eventgrid.azure.net.
@@ -66,6 +67,7 @@ public EventGridSenderClient(Uri endpoint,
_endpoint = endpoint;
_apiVersion = options.Version;
_topicName = topicName;
+ _isDistributedTracingEnabled = options.Diagnostics.IsDistributedTracingEnabled;
}
/// Initializes a new instance of EventGridSenderClient.
@@ -93,6 +95,7 @@ public EventGridSenderClient(Uri endpoint,
_endpoint = endpoint;
_apiVersion = options.Version;
_topicName = topicName;
+ _isDistributedTracingEnabled = options.Diagnostics.IsDistributedTracingEnabled;
}
/// Publish Single Cloud Event to namespace topic. In case of success, the server responds with an HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return various error codes. For example, 401: which indicates authorization failure, 403: which indicates quota exceeded or message is too large, 410: which indicates that specific topic is not found, 400: for bad request, and 500: for internal server error.
@@ -104,7 +107,7 @@ public virtual Response Send(CloudEvent cloudEvent, CancellationToken cancellati
Argument.AssertNotNull(cloudEvent, nameof(cloudEvent));
RequestContext context = FromCancellationToken(cancellationToken);
- return Send(_topicName, RequestContent.Create(cloudEvent), context);
+ return Send(_topicName, new CloudEventRequestContent(cloudEvent, _isDistributedTracingEnabled), context);
}
/// Publish Single Cloud Event to namespace topic. In case of success, the server responds with an HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return various error codes. For example, 401: which indicates authorization failure, 403: which indicates quota exceeded or message is too large, 410: which indicates that specific topic is not found, 400: for bad request, and 500: for internal server error.
@@ -118,7 +121,7 @@ public virtual async Task SendAsync(
Argument.AssertNotNull(cloudEvent, nameof(cloudEvent));
RequestContext context = FromCancellationToken(cancellationToken);
- return await SendAsync(_topicName, RequestContent.Create(cloudEvent), context).ConfigureAwait(false);
+ return await SendAsync(_topicName, new CloudEventRequestContent(cloudEvent, _isDistributedTracingEnabled), context).ConfigureAwait(false);
}
/// Publish Batch Cloud Event to namespace topic. In case of success, the server responds with an HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return various error codes. For example, 401: which indicates authorization failure, 403: which indicates quota exceeded or message is too large, 410: which indicates that specific topic is not found, 400: for bad request, and 500: for internal server error.
@@ -136,7 +139,7 @@ public virtual async Task SendAsync(IEnumerable cloudEvent
scope.Start();
try
{
- return await SendEventsAsync(_topicName, RequestContent.Create(cloudEvents), context).ConfigureAwait(false);
+ return await SendEventsAsync(_topicName, new CloudEventsRequestContent(cloudEvents, _isDistributedTracingEnabled), context).ConfigureAwait(false);
}
catch (Exception e)
{
@@ -160,7 +163,7 @@ public virtual Response Send(IEnumerable cloudEvents, CancellationTo
scope.Start();
try
{
- return SendEvents(_topicName, RequestContent.Create(cloudEvents), context);
+ return SendEvents(_topicName, new CloudEventsRequestContent(cloudEvents, _isDistributedTracingEnabled), context);
}
catch (Exception e)
{
diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/EventGridClientTests.cs b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/EventGridClientTests.cs
new file mode 100644
index 000000000000..a8b0848fcab6
--- /dev/null
+++ b/sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/tests/EventGridClientTests.cs
@@ -0,0 +1,542 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+using System.Net.NetworkInformation;
+using System.Text.Json;
+using System.Threading;
+using System.Threading.Tasks;
+using Azure.Core.TestFramework;
+using Azure.Core.Tests;
+using Azure.Messaging.EventGrid.Namespaces;
+using NUnit.Framework;
+
+namespace Azure.Messaging.EventGrid.Tests
+{
+ public class EventGridClientTests : ClientTestBase
+ {
+ private const string TraceParentHeaderName = "traceparent";
+ private const string TraceStateHeaderName = "tracestate";
+
+ public EventGridClientTests(bool isAsync) : base(isAsync)
+ {
+ }
+
+ [Test]
+ [TestCase(false, false)]
+ [TestCase(true, true)]
+ [TestCase(true, false)]
+ [TestCase(false, true)]
+ public async Task SendSetsTraceParentExtension(bool inclTraceparent, bool inclTracestate)
+ {
+ MockTransport mockTransport = CreateMockTransport();
+
+ EventGridSenderClientOptions options = new()
+ {
+ Transport = mockTransport,
+ };
+ EventGridSenderClient senderClient = new(new Uri("http://www.example.com"), "topic", new AzureKeyCredential("fake"), options);
+
+ // simulating some other activity already being started before doing operations with the client
+ var activity = new Activity("ParentEvent");
+ activity.SetIdFormat(ActivityIdFormat.W3C);
+ activity.Start();
+ activity.TraceStateString = "tracestatevalue";
+
+ // create list of cloud events
+ List eventsList = new List();
+ for (int i = 0; i < 10; i++)
+ {
+ CloudEvent cloudEvent = new CloudEvent(
+ "record",
+ "Microsoft.MockPublisher.TestEvent",
+ JsonDocument.Parse("{\"property1\": \"abc\", \"property2\": 123}").RootElement)
+ {
+ Id = "id",
+ Subject = $"Subject-{i}",
+ Time = DateTimeOffset.UtcNow
+ };
+ if (inclTraceparent && i % 2 == 0)
+ {
+ cloudEvent.ExtensionAttributes.Add("traceparent", "traceparentValue");
+ }
+ if (inclTracestate && i % 2 == 0)
+ {
+ cloudEvent.ExtensionAttributes.Add("tracestate", "param:value");
+ }
+ eventsList.Add(cloudEvent);
+ }
+
+ // send
+ if (IsAsync)
+ {
+ await senderClient.SendAsync(eventsList).ConfigureAwait(false);
+ }
+ else
+ {
+ senderClient.Send(eventsList);
+ }
+
+ // stop activity after extracting the events from the request as this is where the cloudEvents would actually
+ // be serialized
+ activity.Stop();
+
+ // validate
+ IEnumerator cloudEnum = eventsList.GetEnumerator();
+ for (int i = 0; i < 10; i++)
+ {
+ cloudEnum.MoveNext();
+ IDictionary cloudEventAttr = cloudEnum.Current.ExtensionAttributes;
+ if (inclTraceparent && inclTracestate && i % 2 == 0)
+ {
+ Assert.AreEqual(
+ "traceparentValue",
+ cloudEventAttr[TraceParentHeaderName]);
+
+ Assert.AreEqual(
+ "param:value",
+ cloudEventAttr[TraceStateHeaderName]);
+ }
+ else if (inclTraceparent && i % 2 == 0)
+ {
+ Assert.AreEqual(
+ "traceparentValue",
+ cloudEventAttr[TraceParentHeaderName]);
+ }
+ else if (inclTracestate && i % 2 == 0)
+ {
+ Assert.AreEqual(
+ "param:value",
+ cloudEventAttr[TraceStateHeaderName]);
+ }
+ else
+ {
+ Assert.IsTrue(mockTransport.SingleRequest.Headers.TryGetValue(TraceParentHeaderName, out string requestHeader));
+
+ Assert.AreEqual(
+ requestHeader,
+ cloudEventAttr[TraceParentHeaderName]);
+ }
+ }
+ }
+
+ [Test]
+ [TestCase(false, false)]
+ [TestCase(true, true)]
+ [TestCase(true, false)]
+ [TestCase(false, true)]
+ public async Task SingleEventSendSetsTraceParentExtension(bool inclTraceparent, bool inclTracestate)
+ {
+ MockTransport mockTransport = CreateMockTransport();
+
+ EventGridSenderClientOptions options = new()
+ {
+ Transport = mockTransport,
+ };
+ EventGridSenderClient senderClient = new(new Uri("http://www.example.com"), "topic", new AzureKeyCredential("fake"), options);
+
+ // simulating some other activity already being started before doing operations with the client
+ var activity = new Activity("ParentEvent");
+ activity.SetIdFormat(ActivityIdFormat.W3C);
+ activity.Start();
+ activity.TraceStateString = "tracestatevalue";
+
+ // create cloud event
+ CloudEvent cloudEvent = new CloudEvent(
+ "record",
+ "Microsoft.MockPublisher.TestEvent",
+ JsonDocument.Parse("{\"property1\": \"abc\", \"property2\": 123}").RootElement)
+ {
+ Id = "id",
+ Subject = $"Subject-1",
+ Time = DateTimeOffset.UtcNow
+ };
+
+ if (inclTraceparent)
+ {
+ cloudEvent.ExtensionAttributes.Add("traceparent", "traceparentValue");
+ }
+ if (inclTracestate)
+ {
+ cloudEvent.ExtensionAttributes.Add("tracestate", "param:value");
+ }
+
+ // send
+ if (IsAsync)
+ {
+ await senderClient.SendAsync(cloudEvent).ConfigureAwait(false);
+ }
+ else
+ {
+ senderClient.Send(cloudEvent);
+ }
+
+ // stop activity after extracting the events from the request as this is where the cloudEvents would actually
+ // be serialized
+ activity.Stop();
+
+ // validate
+ IDictionary cloudEventAttr = cloudEvent.ExtensionAttributes;
+ if (inclTraceparent && inclTracestate)
+ {
+ Assert.AreEqual(
+ "traceparentValue",
+ cloudEventAttr[TraceParentHeaderName]);
+
+ Assert.AreEqual(
+ "param:value",
+ cloudEventAttr[TraceStateHeaderName]);
+ }
+ else if (inclTraceparent)
+ {
+ Assert.AreEqual(
+ "traceparentValue",
+ cloudEventAttr[TraceParentHeaderName]);
+ }
+ else if (inclTracestate)
+ {
+ Assert.AreEqual(
+ "param:value",
+ cloudEventAttr[TraceStateHeaderName]);
+ }
+ else
+ {
+ Assert.IsTrue(mockTransport.SingleRequest.Headers.TryGetValue(TraceParentHeaderName, out string requestHeader));
+
+ Assert.AreEqual(
+ requestHeader,
+ cloudEventAttr[TraceParentHeaderName]);
+ }
+ }
+
+ [Test]
+ public async Task SingleEventSendDoesNotSetTraceParentExtensionWhenTracingIsDisabled()
+ {
+ MockTransport mockTransport = CreateMockTransport();
+
+ EventGridSenderClientOptions options = new()
+ {
+ Transport = mockTransport,
+ Diagnostics =
+ {
+ IsDistributedTracingEnabled = false,
+ }
+ };
+ EventGridSenderClient senderClient = new(new Uri("http://www.example.com"), "topic", new AzureKeyCredential("fake"), options);
+
+ using ClientDiagnosticListener diagnosticListener = new(s => s.StartsWith("Azure."), asyncLocal: true);
+
+ // simulating some other activity already being started before doing operations with the client
+ var activity = new Activity("ParentEvent");
+ activity.SetIdFormat(ActivityIdFormat.W3C);
+ activity.Start();
+
+ CloudEvent cloudEvent = new CloudEvent(
+ "record",
+ "Microsoft.MockPublisher.TestEvent",
+ JsonDocument.Parse("{\"property1\": \"abc\", \"property2\": 123}").RootElement);
+
+ if (IsAsync)
+ {
+ await senderClient.SendAsync(cloudEvent).ConfigureAwait(false);
+ }
+ else
+ {
+ senderClient.Send(cloudEvent);
+ }
+
+ activity.Stop();
+
+ Assert.False(cloudEvent.ExtensionAttributes.ContainsKey("traceparent"));
+ Assert.False(cloudEvent.ExtensionAttributes.ContainsKey("tracestate"));
+ }
+
+ [Test]
+ public async Task SendDoesNotSetTraceParentExtensionWhenTracingIsDisabled()
+ {
+ MockTransport mockTransport = CreateMockTransport();
+
+ EventGridSenderClientOptions options = new()
+ {
+ Transport = mockTransport,
+ Diagnostics =
+ {
+ IsDistributedTracingEnabled = false,
+ }
+ };
+ EventGridSenderClient senderClient = new(new Uri("http://www.example.com"), "topic", new AzureKeyCredential("fake"), options);
+
+ using ClientDiagnosticListener diagnosticListener = new(s => s.StartsWith("Azure."), asyncLocal: true);
+
+ // simulating some other activity already being started before doing operations with the client
+ var activity = new Activity("ParentEvent");
+ activity.SetIdFormat(ActivityIdFormat.W3C);
+ activity.Start();
+
+ List eventsList = new List();
+ for (int i = 0; i < 10; i++)
+ {
+ CloudEvent cloudEvent = new CloudEvent(
+ "record",
+ "Microsoft.MockPublisher.TestEvent",
+ JsonDocument.Parse("{\"property1\": \"abc\", \"property2\": 123}").RootElement);
+
+ eventsList.Add(cloudEvent);
+ }
+
+ if (IsAsync)
+ {
+ await senderClient.SendAsync(eventsList).ConfigureAwait(false);
+ }
+ else
+ {
+ senderClient.Send(eventsList);
+ }
+
+ activity.Stop();
+
+ IEnumerator cloudEnum = eventsList.GetEnumerator();
+ for (int i = 0; i < 10; i++)
+ {
+ cloudEnum.MoveNext();
+ Assert.False(cloudEnum.Current.ExtensionAttributes.ContainsKey("traceparent"));
+ Assert.False(cloudEnum.Current.ExtensionAttributes.ContainsKey("tracestate"));
+ }
+ }
+
+ [Test]
+ [TestCase(false, false)]
+ [TestCase(true, true)]
+ [TestCase(true, false)]
+ [TestCase(false, true)]
+ public async Task SendSetsTraceParentExtensionRetries(bool inclTraceparent, bool inclTracestate)
+ {
+ int requestCt = 0;
+ var mockTransport = new MockTransport((request) =>
+ {
+ var stream = new MemoryStream();
+ request.Content.WriteTo(stream, CancellationToken.None);
+ if (requestCt++ == 0)
+ {
+ return new MockResponse(500);
+ }
+ else
+ {
+ return new MockResponse(200);
+ }
+ });
+
+ EventGridSenderClientOptions options = new()
+ {
+ Transport = mockTransport
+ };
+ EventGridSenderClient senderClient = new(new Uri("http://www.example.com"), "topic", new AzureKeyCredential("fake"), options);
+ using ClientDiagnosticListener diagnosticListener = new(s => s.StartsWith("Azure."), asyncLocal: true);
+
+ // simulating some other activity already being started before doing operations with the client
+ var activity = new Activity("ParentEvent");
+ activity.SetIdFormat(ActivityIdFormat.W3C);
+ activity.Start();
+ activity.TraceStateString = "tracestatevalue";
+
+ // create list of cloud events
+ List eventsList = new List();
+ for (int i = 0; i < 10; i++)
+ {
+ CloudEvent cloudEvent = new CloudEvent(
+ "record",
+ "Microsoft.MockPublisher.TestEvent",
+ JsonDocument.Parse("{\"property1\": \"abc\", \"property2\": 123}").RootElement)
+ {
+ Id = "id",
+ Subject = $"Subject-{i}",
+ Time = DateTimeOffset.UtcNow
+ };
+ if (inclTraceparent && i % 2 == 0)
+ {
+ cloudEvent.ExtensionAttributes.Add("traceparent", "traceparentValue");
+ }
+ if (inclTracestate && i % 2 == 0)
+ {
+ cloudEvent.ExtensionAttributes.Add("tracestate", "param:value");
+ }
+ eventsList.Add(cloudEvent);
+ }
+
+ // send
+ if (IsAsync)
+ {
+ await senderClient.SendAsync(eventsList).ConfigureAwait(false);
+ }
+ else
+ {
+ senderClient.Send(eventsList);
+ }
+
+ // stop activity after extracting the events from the request as this is where the cloudEvents would actually
+ // be serialized
+ activity.Stop();
+
+ // validate
+ IEnumerator cloudEnum = eventsList.GetEnumerator();
+ for (int i = 0; i < 10; i++)
+ {
+ cloudEnum.MoveNext();
+ IDictionary cloudEventAttr = cloudEnum.Current.ExtensionAttributes;
+ if (inclTraceparent && inclTracestate && i % 2 == 0)
+ {
+ Assert.AreEqual(
+ "traceparentValue",
+ cloudEventAttr[TraceParentHeaderName]);
+
+ Assert.AreEqual(
+ "param:value",
+ cloudEventAttr[TraceStateHeaderName]);
+ }
+ else if (inclTraceparent && i % 2 == 0)
+ {
+ Assert.AreEqual(
+ "traceparentValue",
+ cloudEventAttr[TraceParentHeaderName]);
+ }
+ else if (inclTracestate && i % 2 == 0)
+ {
+ Assert.AreEqual(
+ "param:value",
+ cloudEventAttr[TraceStateHeaderName]);
+ }
+ else
+ {
+ Assert.IsTrue(mockTransport.Requests[1].Headers.TryGetValue(TraceParentHeaderName, out string traceParent));
+ Assert.AreEqual(
+ traceParent,
+ cloudEventAttr[TraceParentHeaderName]);
+
+ Assert.IsTrue(mockTransport.Requests[1].Headers.TryGetValue(TraceStateHeaderName, out string traceState));
+ Assert.AreEqual(
+ traceState,
+ cloudEventAttr[TraceStateHeaderName]);
+ }
+ }
+ }
+
+ [Test]
+ [TestCase(false, false)]
+ [TestCase(true, true)]
+ [TestCase(true, false)]
+ [TestCase(false, true)]
+ public async Task SingleEventSendSetsTraceParentExtensionRetries(bool inclTraceparent, bool inclTracestate)
+ {
+ int requestCt = 0;
+ var mockTransport = new MockTransport((request) =>
+ {
+ var stream = new MemoryStream();
+ request.Content.WriteTo(stream, CancellationToken.None);
+ if (requestCt++ == 0)
+ {
+ return new MockResponse(500);
+ }
+ else
+ {
+ return new MockResponse(200);
+ }
+ });
+
+ EventGridSenderClientOptions options = new()
+ {
+ Transport = mockTransport
+ };
+ EventGridSenderClient senderClient = new(new Uri("http://www.example.com"), "topic", new AzureKeyCredential("fake"), options);
+ using ClientDiagnosticListener diagnosticListener = new(s => s.StartsWith("Azure."), asyncLocal: true);
+
+ // simulating some other activity already being started before doing operations with the client
+ var activity = new Activity("ParentEvent");
+ activity.SetIdFormat(ActivityIdFormat.W3C);
+ activity.Start();
+ activity.TraceStateString = "tracestatevalue";
+
+ // create list of cloud events
+ CloudEvent cloudEvent = new CloudEvent(
+ "record",
+ "Microsoft.MockPublisher.TestEvent",
+ JsonDocument.Parse("{\"property1\": \"abc\", \"property2\": 123}").RootElement)
+ {
+ Id = "id",
+ Subject = $"Subject-1",
+ Time = DateTimeOffset.UtcNow
+ };
+ if (inclTraceparent)
+ {
+ cloudEvent.ExtensionAttributes.Add("traceparent", "traceparentValue");
+ }
+ if (inclTracestate)
+ {
+ cloudEvent.ExtensionAttributes.Add("tracestate", "param:value");
+ }
+
+ // send
+ if (IsAsync)
+ {
+ await senderClient.SendAsync(cloudEvent).ConfigureAwait(false);
+ }
+ else
+ {
+ senderClient.Send(cloudEvent);
+ }
+
+ // stop activity after extracting the events from the request as this is where the cloudEvents would actually
+ // be serialized
+ activity.Stop();
+
+ // validate
+ IDictionary cloudEventAttr = cloudEvent.ExtensionAttributes;
+ if (inclTraceparent && inclTracestate)
+ {
+ Assert.AreEqual(
+ "traceparentValue",
+ cloudEventAttr[TraceParentHeaderName]);
+
+ Assert.AreEqual(
+ "param:value",
+ cloudEventAttr[TraceStateHeaderName]);
+ }
+ else if (inclTraceparent)
+ {
+ Assert.AreEqual(
+ "traceparentValue",
+ cloudEventAttr[TraceParentHeaderName]);
+ }
+ else if (inclTracestate)
+ {
+ Assert.AreEqual(
+ "param:value",
+ cloudEventAttr[TraceStateHeaderName]);
+ }
+ else
+ {
+ Assert.IsTrue(mockTransport.Requests[1].Headers.TryGetValue(TraceParentHeaderName, out string traceParent));
+ Assert.AreEqual(
+ traceParent,
+ cloudEventAttr[TraceParentHeaderName]);
+
+ Assert.IsTrue(mockTransport.Requests[1].Headers.TryGetValue(TraceStateHeaderName, out string traceState));
+ Assert.AreEqual(
+ traceState,
+ cloudEventAttr[TraceStateHeaderName]);
+ }
+ }
+
+ private static MockTransport CreateMockTransport()
+ {
+ return new MockTransport((request) =>
+ {
+ var stream = new MemoryStream();
+ request.Content.WriteTo(stream, CancellationToken.None);
+ return new MockResponse(200);
+ });
+ }
+ }
+}
diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid/src/EventGridPublisherClient.cs b/sdk/eventgrid/Azure.Messaging.EventGrid/src/EventGridPublisherClient.cs
index 57f581bbc528..b418dfcdc068 100644
--- a/sdk/eventgrid/Azure.Messaging.EventGrid/src/EventGridPublisherClient.cs
+++ b/sdk/eventgrid/Azure.Messaging.EventGrid/src/EventGridPublisherClient.cs
@@ -276,7 +276,7 @@ private async Task SendCloudEventsInternal(IEnumerable eve
request.Headers.Add("aeg-channel-name", channelName);
}
- CloudEventRequestContent content = new CloudEventRequestContent(events, _clientDiagnostics.IsActivityEnabled);
+ CloudEventsRequestContent content = new CloudEventsRequestContent(events, _clientDiagnostics.IsActivityEnabled);
request.Content = content;
if (async)
diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventRequestContent.cs b/sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventRequestContent.cs
new file mode 100644
index 000000000000..12b0f3cc54c2
--- /dev/null
+++ b/sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventRequestContent.cs
@@ -0,0 +1,84 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+using System.Text.Json;
+using System.Threading;
+using System.Threading.Tasks;
+using Azure.Core;
+using Azure.Core.Pipeline;
+
+namespace Azure.Messaging.EventGrid
+{
+ internal class CloudEventRequestContent : RequestContent
+ {
+ private readonly CloudEvent _cloudEvent;
+ private const string TraceParentHeaderName = "traceparent";
+ private const string TraceStateHeaderName = "tracestate";
+ private readonly bool _isDistributedTracingEnabled;
+ private RequestContent _data;
+
+ public CloudEventRequestContent(CloudEvent cloudEvent, bool isDistributedTracingEnabled)
+ {
+ _cloudEvent = cloudEvent;
+ _isDistributedTracingEnabled = isDistributedTracingEnabled;
+ }
+
+ public override void Dispose()
+ {
+ }
+
+ public override bool TryComputeLength(out long length)
+ {
+ EnsureSerialized();
+ return _data.TryComputeLength(out length);
+ }
+
+ public override void WriteTo(Stream stream, CancellationToken cancellationToken)
+ {
+ EnsureSerialized();
+ _data.WriteTo(stream, cancellationToken);
+ }
+
+ public override async Task WriteToAsync(Stream stream, CancellationToken cancellationToken)
+ {
+ EnsureSerialized();
+ await _data.WriteToAsync(stream, cancellationToken).ConfigureAwait(false);
+ }
+
+ private void EnsureSerialized()
+ {
+ if (_data != null)
+ {
+ return;
+ }
+
+ if (_isDistributedTracingEnabled)
+ {
+ string currentActivityId = null;
+ string traceState = null;
+ Activity currentActivity = Activity.Current;
+ if (currentActivity != null && (currentActivity.IdFormat == ActivityIdFormat.W3C))
+ {
+ currentActivityId = currentActivity.Id;
+ traceState = currentActivity.TraceStateString;
+ }
+
+ if (currentActivityId != null &&
+ !_cloudEvent.ExtensionAttributes.ContainsKey(TraceParentHeaderName) &&
+ !_cloudEvent.ExtensionAttributes.ContainsKey(TraceStateHeaderName))
+ {
+ _cloudEvent.ExtensionAttributes.Add(TraceParentHeaderName, currentActivityId);
+ if (traceState != null)
+ {
+ _cloudEvent.ExtensionAttributes.Add(TraceStateHeaderName, traceState);
+ }
+ }
+ }
+
+ _data = RequestContent.Create(_cloudEvent);
+ }
+ }
+}
diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid/src/Customization/CloudEventRequestContent.cs b/sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventsRequestContent.cs
similarity index 76%
rename from sdk/eventgrid/Azure.Messaging.EventGrid/src/Customization/CloudEventRequestContent.cs
rename to sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventsRequestContent.cs
index 5a3ef44de7ae..00a5e74d13c0 100644
--- a/sdk/eventgrid/Azure.Messaging.EventGrid/src/Customization/CloudEventRequestContent.cs
+++ b/sdk/eventgrid/Azure.Messaging.EventGrid/src/Shared/CloudEventsRequestContent.cs
@@ -12,15 +12,15 @@
namespace Azure.Messaging.EventGrid
{
- internal class CloudEventRequestContent : RequestContent
+ internal class CloudEventsRequestContent : RequestContent
{
private readonly IEnumerable _cloudEvents;
private const string TraceParentHeaderName = "traceparent";
private const string TraceStateHeaderName = "tracestate";
private readonly bool _isDistributedTracingEnabled;
- private byte[] _data;
+ private RequestContent _data;
- public CloudEventRequestContent(IEnumerable cloudEvents, bool isDistributedTracingEnabled)
+ public CloudEventsRequestContent(IEnumerable cloudEvents, bool isDistributedTracingEnabled)
{
_cloudEvents = cloudEvents;
_isDistributedTracingEnabled = isDistributedTracingEnabled;
@@ -33,20 +33,19 @@ public override void Dispose()
public override bool TryComputeLength(out long length)
{
EnsureSerialized();
- length = _data.Length;
- return true;
+ return _data.TryComputeLength(out length);
}
public override void WriteTo(Stream stream, CancellationToken cancellationToken)
{
EnsureSerialized();
- stream.Write(_data, 0, _data.Length);
+ _data.WriteTo(stream, cancellationToken);
}
public override async Task WriteToAsync(Stream stream, CancellationToken cancellationToken)
{
EnsureSerialized();
- await stream.WriteAsync(_data, 0, _data.Length, cancellationToken).ConfigureAwait(false);
+ await _data.WriteToAsync(stream, cancellationToken).ConfigureAwait(false);
}
private void EnsureSerialized()
@@ -70,8 +69,8 @@ private void EnsureSerialized()
foreach (CloudEvent cloudEvent in _cloudEvents)
{
if (currentActivityId != null &&
- !cloudEvent.ExtensionAttributes.ContainsKey(TraceParentHeaderName) &&
- !cloudEvent.ExtensionAttributes.ContainsKey(TraceStateHeaderName))
+ !cloudEvent.ExtensionAttributes.ContainsKey(TraceParentHeaderName) &&
+ !cloudEvent.ExtensionAttributes.ContainsKey(TraceStateHeaderName))
{
cloudEvent.ExtensionAttributes.Add(TraceParentHeaderName, currentActivityId);
if (traceState != null)
@@ -81,7 +80,7 @@ private void EnsureSerialized()
}
}
}
- _data = JsonSerializer.SerializeToUtf8Bytes(_cloudEvents, typeof(IEnumerable));
+ _data = RequestContent.Create(_cloudEvents);
}
}
}
diff --git a/sdk/eventgrid/Directory.Build.props b/sdk/eventgrid/Directory.Build.props
new file mode 100644
index 000000000000..d802ba0f887a
--- /dev/null
+++ b/sdk/eventgrid/Directory.Build.props
@@ -0,0 +1,11 @@
+
+
+
+
+
+
+
+ $(MSBuildThisFileDirectory)Azure.Messaging.EventGrid\src\Shared\
+
+
+
\ No newline at end of file