diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid/src/Customization/EventGridKeyCredentialPolicy.cs b/sdk/eventgrid/Azure.Messaging.EventGrid/src/Customization/EventGridKeyCredentialPolicy.cs
index 418684561b36..843f8b347ad1 100644
--- a/sdk/eventgrid/Azure.Messaging.EventGrid/src/Customization/EventGridKeyCredentialPolicy.cs
+++ b/sdk/eventgrid/Azure.Messaging.EventGrid/src/Customization/EventGridKeyCredentialPolicy.cs
@@ -8,16 +8,13 @@ namespace Azure.Messaging.EventGrid
{
internal class EventGridKeyCredentialPolicy : HttpPipelineSynchronousPolicy
{
- private readonly string _name;
private readonly AzureKeyCredential _credential;
public const string SystemPublisherKey = "AzureSystemPublisher";
- public EventGridKeyCredentialPolicy(AzureKeyCredential credential, string name)
+ public EventGridKeyCredentialPolicy(AzureKeyCredential credential)
{
Argument.AssertNotNull(credential, nameof(credential));
- Argument.AssertNotNullOrEmpty(name, nameof(name));
_credential = credential;
- _name = name;
}
public override void OnSendingRequest(HttpMessage message)
@@ -29,7 +26,7 @@ public override void OnSendingRequest(HttpMessage message)
// in the request in this case.
if (_credential.Key != SystemPublisherKey)
{
- message.Request.Headers.SetValue(_name, _credential.Key);
+ message.Request.Headers.SetValue(Constants.SasKeyName, _credential.Key);
}
}
}
diff --git a/sdk/eventgrid/Azure.Messaging.EventGrid/src/Customization/EventGridPublisherClient.cs b/sdk/eventgrid/Azure.Messaging.EventGrid/src/Customization/EventGridPublisherClient.cs
index 88e5e8045804..29c77b272c05 100644
--- a/sdk/eventgrid/Azure.Messaging.EventGrid/src/Customization/EventGridPublisherClient.cs
+++ b/sdk/eventgrid/Azure.Messaging.EventGrid/src/Customization/EventGridPublisherClient.cs
@@ -9,7 +9,6 @@
using System.Threading.Tasks;
using Azure.Core;
using Azure.Core.Pipeline;
-using Azure.Core.Serialization;
using Azure.Messaging.EventGrid.Models;
namespace Azure.Messaging.EventGrid
@@ -19,15 +18,9 @@ namespace Azure.Messaging.EventGrid
///
public class EventGridPublisherClient
{
- private readonly EventGridRestClient _serviceRestClient;
private readonly ClientDiagnostics _clientDiagnostics;
- private string _hostName => _endpoint.Authority;
- private readonly Uri _endpoint;
- private readonly AzureKeyCredential _key;
+ private readonly RequestUriBuilder _uriBuilder;
private readonly HttpPipeline _pipeline;
- private readonly string _apiVersion;
-
- private static readonly JsonObjectSerializer s_jsonSerializer = new JsonObjectSerializer();
/// Initalizes a new instance of the class for mocking.
protected EventGridPublisherClient()
@@ -50,11 +43,10 @@ public EventGridPublisherClient(Uri endpoint, AzureKeyCredential credential, Eve
{
Argument.AssertNotNull(credential, nameof(credential));
options ??= new EventGridPublisherClientOptions();
- _apiVersion = options.Version.GetVersionString();
- _endpoint = endpoint;
- _key = credential;
- _pipeline = HttpPipelineBuilder.Build(options, new EventGridKeyCredentialPolicy(credential, Constants.SasKeyName));
- _serviceRestClient = new EventGridRestClient(new ClientDiagnostics(options), _pipeline, options.Version.GetVersionString());
+ _uriBuilder = new RequestUriBuilder();
+ _uriBuilder.Reset(endpoint);
+ _uriBuilder.AppendQuery("api-version", options.Version.GetVersionString(), true);
+ _pipeline = HttpPipelineBuilder.Build(options, new EventGridKeyCredentialPolicy(credential));
_clientDiagnostics = new ClientDiagnostics(options);
}
@@ -69,9 +61,10 @@ public EventGridPublisherClient(Uri endpoint, AzureSasCredential credential, Eve
{
Argument.AssertNotNull(credential, nameof(credential));
options ??= new EventGridPublisherClientOptions();
- _endpoint = endpoint;
- HttpPipeline pipeline = HttpPipelineBuilder.Build(options, new EventGridSharedAccessSignatureCredentialPolicy(credential));
- _serviceRestClient = new EventGridRestClient(new ClientDiagnostics(options), pipeline, options.Version.GetVersionString());
+ _uriBuilder = new RequestUriBuilder();
+ _uriBuilder.Reset(endpoint);
+ _uriBuilder.AppendQuery("api-version", options.Version.GetVersionString(), true);
+ _pipeline = HttpPipelineBuilder.Build(options, new EventGridSharedAccessSignatureCredentialPolicy(credential));
_clientDiagnostics = new ClientDiagnostics(options);
}
@@ -123,20 +116,6 @@ private async Task SendCloudNativeCloudEventsInternalAsync(ReadOnlyMem
}
}
- private Request CreateEventRequest(HttpMessage message, string contentType)
- {
- Request request = message.Request;
- request.Method = RequestMethod.Post;
- var uri = new RawRequestUriBuilder();
- uri.AppendRaw("https://", false);
- uri.AppendRaw(_hostName, false);
- uri.AppendPath("/api/events", false);
- uri.AppendQuery("api-version", _apiVersion, true);
- request.Uri = uri;
- request.Headers.Add("Content-Type", contentType);
- return request;
- }
-
/// Publishes a set of EventGridEvents to an Event Grid topic.
/// The event to be published to Event Grid.
/// An optional cancellation token instance to signal the request to cancel the operation.
@@ -177,13 +156,13 @@ private async Task SendEventsInternal(IEnumerable even
// List of events cannot be null
Argument.AssertNotNull(events, nameof(events));
- List eventsWithSerializedPayloads = new List();
+ using HttpMessage message = _pipeline.CreateMessage();
+ Request request = CreateEventRequest(message, "application/json");
+ var content = new Utf8JsonRequestContent();
+ content.JsonWriter.WriteStartArray();
foreach (EventGridEvent egEvent in events)
{
- // Individual events cannot be null
- Argument.AssertNotNull(egEvent, nameof(egEvent));
JsonDocument data = JsonDocument.Parse(egEvent.Data.ToStream());
-
EventGridEventInternal newEGEvent = new EventGridEventInternal(
egEvent.Id,
egEvent.Subject,
@@ -194,24 +173,27 @@ private async Task SendEventsInternal(IEnumerable even
{
Topic = egEvent.Topic
};
-
- eventsWithSerializedPayloads.Add(newEGEvent);
+ content.JsonWriter.WriteObjectValue(newEGEvent);
}
+
+ content.JsonWriter.WriteEndArray();
+ request.Content = content;
+
if (async)
{
- // Publish asynchronously if called via an async path
- return await _serviceRestClient.PublishEventsAsync(
- _hostName,
- eventsWithSerializedPayloads,
- cancellationToken).ConfigureAwait(false);
+ await _pipeline.SendAsync(message, cancellationToken).ConfigureAwait(false);
}
else
{
- return _serviceRestClient.PublishEvents(
- _hostName,
- eventsWithSerializedPayloads,
- cancellationToken);
+ _pipeline.Send(message, cancellationToken);
}
+ return message.Response.Status switch
+ {
+ 200 => message.Response,
+ _ => async ?
+ throw await _clientDiagnostics.CreateRequestFailedExceptionAsync(message.Response).ConfigureAwait(false) :
+ throw _clientDiagnostics.CreateRequestFailedException(message.Response)
+ };
}
catch (Exception e)
{
@@ -313,44 +295,6 @@ public virtual async Task SendEventsAsync(IEnumerable cust
public virtual Response SendEvents(IEnumerable customEvents, CancellationToken cancellationToken = default)
=> PublishCustomEventsInternal(customEvents, false /*async*/, cancellationToken).EnsureCompleted();
- private async Task PublishCustomEventsInternal(IEnumerable