diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageBatch.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageBatch.cs
new file mode 100644
index 000000000000..5eeebf76f179
--- /dev/null
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageBatch.cs
@@ -0,0 +1,162 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.Linq;
+using Azure.Core;
+using Azure.Messaging.ServiceBus.Core;
+using Microsoft.Azure.Amqp;
+
+namespace Azure.Messaging.ServiceBus.Amqp
+{
+ ///
+ /// A set of messages with known size constraints, based on messages to be sent
+ /// using an AMQP-based transport.
+ ///
+ ///
+ internal class AmqpMessageBatch : TransportMessageBatch
+ {
+ /// The amount of bytes to reserve as overhead for a small message.
+ private const byte OverheadBytesSmallMessage = 5;
+
+ /// The amount of bytes to reserve as overhead for a large message.
+ private const byte OverheadBytesLargeMessage = 8;
+
+ /// The maximum number of bytes that a message may be to be considered small.
+ private const byte MaximumBytesSmallMessage = 255;
+
+ /// A flag that indicates whether or not the instance has been disposed.
+ private bool _disposed = false;
+
+ /// The size of the batch, in bytes, as it will be sent via the AMQP transport.
+ private long _sizeBytes = 0;
+
+ ///
+ /// The maximum size allowed for the batch, in bytes. This includes the messages in the batch as
+ /// well as any overhead for the batch itself when sent to the Queue/Topic.
+ ///
+ ///
+ public override long MaximumSizeInBytes { get; }
+
+ ///
+ /// The size of the batch, in bytes, as it will be sent to the Queue/Topic
+ /// service.
+ ///
+ ///
+ public override long SizeInBytes => _sizeBytes;
+
+ ///
+ /// The count of messages contained in the batch.
+ ///
+ ///
+ public override int Count => BatchMessages.Count;
+
+ ///
+ /// The set of options to apply to the batch.
+ ///
+ ///
+ private CreateBatchOptions Options { get; }
+
+ ///
+ /// The set of messages that have been added to the batch.
+ ///
+ ///
+ private List BatchMessages { get; } = new List();
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The set of options to apply to the batch.
+ ///
+ public AmqpMessageBatch(CreateBatchOptions options)
+ {
+ Argument.AssertNotNull(options, nameof(options));
+ Argument.AssertNotNull(options.MaximumSizeInBytes, nameof(options.MaximumSizeInBytes));
+
+ Options = options;
+ MaximumSizeInBytes = options.MaximumSizeInBytes.Value;
+
+ // Initialize the size by reserving space for the batch envelope.
+
+ using AmqpMessage envelope = AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(Enumerable.Empty());
+ _sizeBytes = envelope.SerializedMessageSize;
+ }
+
+ ///
+ /// Attempts to add a message to the batch, ensuring that the size
+ /// of the batch does not exceed its maximum.
+ ///
+ ///
+ /// The message to attempt to add to the batch.
+ ///
+ /// true if the message was added; otherwise, false.
+ ///
+ public override bool TryAdd(ServiceBusMessage message)
+ {
+ Argument.AssertNotNull(message, nameof(message));
+ Argument.AssertNotDisposed(_disposed, nameof(ServiceBusMessageBatch));
+
+ AmqpMessage amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(message);
+
+ try
+ {
+ // Calculate the size for the message, based on the AMQP message size and accounting for a
+ // bit of reserved overhead size.
+
+ var size = _sizeBytes
+ + amqpMessage.SerializedMessageSize
+ + (amqpMessage.SerializedMessageSize <= MaximumBytesSmallMessage
+ ? OverheadBytesSmallMessage
+ : OverheadBytesLargeMessage);
+
+ if (size > MaximumSizeInBytes)
+ {
+ return false;
+ }
+
+ _sizeBytes = size;
+ BatchMessages.Add(message);
+
+ return true;
+ }
+ finally
+ {
+ amqpMessage?.Dispose();
+ }
+ }
+
+ ///
+ /// Represents the batch as an enumerable set of transport-specific
+ /// representations of a message.
+ ///
+ ///
+ /// The transport-specific message representation being requested.
+ ///
+ /// The set of messages as an enumerable of the requested type.
+ ///
+ public override IEnumerable AsEnumerable()
+ {
+ if (typeof(T) != typeof(ServiceBusMessage))
+ {
+ throw new FormatException(string.Format(CultureInfo.CurrentCulture, Resources1.UnsupportedTransportEventType, typeof(T).Name));
+ }
+
+ return (IEnumerable)BatchMessages;
+ }
+
+ ///
+ /// Performs the task needed to clean up resources used by the .
+ ///
+ ///
+ public override void Dispose()
+ {
+ _disposed = true;
+
+ BatchMessages.Clear();
+ _sizeBytes = 0;
+ }
+ }
+}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs
index c9233360faf0..6e0e54da24d6 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs
@@ -6,9 +6,10 @@ namespace Azure.Messaging.ServiceBus.Amqp
using System;
using System.Collections;
using System.Collections.Generic;
- using System.Data;
using System.IO;
+ using System.Linq;
using System.Runtime.Serialization;
+ using Azure.Core;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Encoding;
using Microsoft.Azure.Amqp.Framing;
@@ -32,70 +33,122 @@ internal static class AmqpMessageConverter
private const string DateTimeOffsetName = AmqpConstants.Vendor + ":datetime-offset";
private const int GuidSize = 16;
- public static AmqpMessage BatchSBMessagesAsAmqpMessage(IEnumerable sbMessages)
+ /// The size, in bytes, to use as a buffer for stream operations.
+ private const int StreamBufferSizeInBytes = 512;
+
+ public static AmqpMessage BatchSBMessagesAsAmqpMessage(IEnumerable source)
{
- if (sbMessages == null)
- {
- throw Fx.Exception.ArgumentNull(nameof(sbMessages));
- }
+ Argument.AssertNotNull(source, nameof(source));
+ return BuildAmqpBatchFromMessage(source);
+ }
- AmqpMessage amqpMessage;
+ ///
+ /// Builds a batch from a set of
+ /// optionally propagating the custom properties.
+ ///
+ ///
+ /// The set of messages to use as the body of the batch message.
+ ///
+ /// The batch containing the source messages.
+ ///
+ private static AmqpMessage BuildAmqpBatchFromMessage(IEnumerable source)
+ {
AmqpMessage firstAmqpMessage = null;
SBMessage firstMessage = null;
- List dataList = null;
- var messageCount = 0;
- foreach (var sbMessage in sbMessages)
- {
- messageCount++;
- amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(sbMessage);
- if (firstAmqpMessage == null)
+ return BuildAmqpBatchFromMessages(
+ source.Select(sbMessage =>
{
- firstAmqpMessage = amqpMessage;
- firstMessage = sbMessage;
- continue;
- }
+ if (firstAmqpMessage == null)
+ {
+ firstAmqpMessage = SBMessageToAmqpMessage(sbMessage);
+ firstMessage = sbMessage;
+ return firstAmqpMessage;
+ }
+ else
+ {
+ return SBMessageToAmqpMessage(sbMessage);
+ }
+ }), firstMessage);
+ }
- if (dataList == null)
- {
- dataList = new List { ToData(firstAmqpMessage) };
- }
+ ///
+ /// Builds a batch from a set of .
+ ///
+ ///
+ /// The set of messages to use as the body of the batch message.
+ ///
+ ///
+ /// The batch containing the source messages.
+ ///
+ private static AmqpMessage BuildAmqpBatchFromMessages(
+ IEnumerable source,
+ SBMessage firstMessage = null)
+ {
+ AmqpMessage batchEnvelope;
- dataList.Add(ToData(amqpMessage));
- }
+ var batchMessages = source.ToList();
- if (messageCount == 1 && firstAmqpMessage != null)
+ if (batchMessages.Count == 1)
{
- firstAmqpMessage.Batchable = true;
- return firstAmqpMessage;
+ batchEnvelope = batchMessages[0];
}
+ else
+ {
+ batchEnvelope = AmqpMessage.Create(batchMessages.Select(message =>
+ {
+ message.Batchable = true;
+ using var messageStream = message.ToStream();
+ return new Data { Value = ReadStreamToArraySegment(messageStream) };
+ }));
- amqpMessage = AmqpMessage.Create(dataList);
- amqpMessage.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat;
+ batchEnvelope.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat;
+ }
- if (firstMessage.MessageId != null)
+ if (firstMessage?.MessageId != null)
{
- amqpMessage.Properties.MessageId = firstMessage.MessageId;
+ batchEnvelope.Properties.MessageId = firstMessage.MessageId;
}
- if (firstMessage.SessionId != null)
+ if (firstMessage?.SessionId != null)
{
- amqpMessage.Properties.GroupId = firstMessage.SessionId;
+ batchEnvelope.Properties.GroupId = firstMessage.SessionId;
}
- if (firstMessage.PartitionKey != null)
+ if (firstMessage?.PartitionKey != null)
{
- amqpMessage.MessageAnnotations.Map[AmqpMessageConverter.PartitionKeyName] =
+ batchEnvelope.MessageAnnotations.Map[AmqpMessageConverter.PartitionKeyName] =
firstMessage.PartitionKey;
}
- if (firstMessage.ViaPartitionKey != null)
+ if (firstMessage?.ViaPartitionKey != null)
{
- amqpMessage.MessageAnnotations.Map[AmqpMessageConverter.ViaPartitionKeyName] =
+ batchEnvelope.MessageAnnotations.Map[AmqpMessageConverter.ViaPartitionKeyName] =
firstMessage.ViaPartitionKey;
}
- amqpMessage.Batchable = true;
- return amqpMessage;
+ batchEnvelope.Batchable = true;
+ return batchEnvelope;
+ }
+
+ ///
+ /// Converts a stream to an representation.
+ ///
+ ///
+ /// The stream to read and capture in memory.
+ ///
+ /// The containing the stream data.
+ ///
+ private static ArraySegment ReadStreamToArraySegment(Stream stream)
+ {
+ if (stream == null)
+ {
+ return new ArraySegment();
+ }
+
+ using var memStream = new MemoryStream(StreamBufferSizeInBytes);
+ stream.CopyTo(memStream, StreamBufferSizeInBytes);
+
+ return new ArraySegment(memStream.ToArray());
}
public static AmqpMessage SBMessageToAmqpMessage(SBMessage sbMessage)
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs
index fe6a72a54945..80cc999f97e4 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs
@@ -124,168 +124,250 @@ public AmqpSender(
}
///
- /// Sends a set of events to the associated Service Bus entity using a batched approach. If the size of events exceed the
- /// maximum size of a single batch, an exception will be triggered and the send will fail.
+ /// Creates a size-constraint batch to which may be added using a try-based pattern. If a message would
+ /// exceed the maximum allowable size of the batch, the batch will not allow adding the message and signal that scenario using its
+ /// return value.
+ ///
+ /// Because messages that would violate the size constraint cannot be added, publishing a batch will not trigger an exception when
+ /// attempting to send the message to the Queue/Topic.
///
///
- /// The set of event data to send.
+ /// The set of options to consider when creating this batch.
/// An optional instance to signal the request to cancel the operation.
///
- public override async Task SendAsync(IEnumerable messages,
- CancellationToken cancellationToken)
+ /// An with the requested .
+ ///
+ public override async ValueTask CreateBatchAsync(
+ CreateBatchOptions options,
+ CancellationToken cancellationToken)
+ {
+ TransportMessageBatch messageBatch = null;
+ Task createBatchTask = _retryPolicy.RunOperation(async (timeout) =>
+ {
+ messageBatch = await CreateBatchInternalAsync(
+ options,
+ timeout).ConfigureAwait(false);
+ },
+ _entityName,
+ _connectionScope,
+ cancellationToken);
+ await createBatchTask.ConfigureAwait(false);
+ return messageBatch;
+ }
+
+ internal async ValueTask CreateBatchInternalAsync(
+ CreateBatchOptions options,
+ TimeSpan timeout)
{
- Argument.AssertNotNull(messages, nameof(messages));
+ Argument.AssertNotNull(options, nameof(options));
+
+ // Ensure that maximum message size has been determined; this depends on the underlying
+ // AMQP link, so if not set, requesting the link will ensure that it is populated.
+
+ if (!MaximumMessageSize.HasValue)
+ {
+ await _sendLink.GetOrCreateAsync(timeout).ConfigureAwait(false);
+ }
+
+ // Ensure that there was a maximum size populated; if none was provided,
+ // default to the maximum size allowed by the link.
+
+ options.MaximumSizeInBytes ??= MaximumMessageSize;
+
+ Argument.AssertInRange(options.MaximumSizeInBytes.Value, ServiceBusSender.MinimumBatchSizeLimit, MaximumMessageSize.Value, nameof(options.MaximumSizeInBytes));
+ return new AmqpMessageBatch(options);
+ }
+
+ ///
+ /// Sends a set of messages to the associated Queue/Topic using a batched approach.
+ ///
+ ///
+ /// The set of messages to send.
+ /// An optional instance to signal the request to cancel the operation.
+ ///
+ /// A task to be resolved on when the operation has completed.
+ ///
+ public override async Task SendBatchAsync(
+ ServiceBusMessageBatch messageBatch,
+ CancellationToken cancellationToken)
+ {
+ Argument.AssertNotNull(messageBatch, nameof(messageBatch));
Argument.AssertNotClosed(_closed, nameof(AmqpSender));
- AmqpMessage messageFactory() => AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(messages);
- await SendAsync(messageFactory, cancellationToken).ConfigureAwait(false);
+ await _retryPolicy.RunOperation(async (timeout) =>
+ await SendBatchInternalAsync(
+ messageBatch,
+ timeout,
+ cancellationToken).ConfigureAwait(false),
+ _entityName,
+ _connectionScope,
+ cancellationToken).ConfigureAwait(false);
}
///
- /// Closes the connection to the transport producer instance.
+ /// Sends a set of messages to the associated Queue/Topic using a batched approach.
///
///
+ ///
+ ///
/// An optional instance to signal the request to cancel the operation.
///
- public override async Task CloseAsync(CancellationToken cancellationToken)
+ internal virtual async Task SendBatchInternalAsync(
+ ServiceBusMessageBatch messageBatch,
+ TimeSpan timeout,
+ CancellationToken cancellationToken)
{
- if (_closed)
+ var stopWatch = Stopwatch.StartNew();
+
+ AmqpMessage messageFactory() => AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(messageBatch.AsEnumerable());
+
+ using (AmqpMessage batchMessage = messageFactory())
{
- return;
- }
+ //ServiceBusEventSource.Log.SendStart(Entityname, messageHash);
- _closed = true;
+ string messageHash = batchMessage.GetHashCode().ToString();
- var clientId = GetHashCode().ToString();
- var clientType = GetType();
+ SendingAmqpLink link = await _sendLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, timeout)).ConfigureAwait(false);
- try
- {
- ServiceBusEventSource.Log.ClientCloseStart(clientType, _entityName, clientId);
+ // Validate that the message is not too large to send. This is done after the link is created to ensure
+ // that the maximum message size is known, as it is dictated by the service using the link.
+
+ if (batchMessage.SerializedMessageSize > MaximumMessageSize)
+ {
+ throw new ServiceBusException(string.Format(Resources1.MessageSizeExceeded, messageHash, batchMessage.SerializedMessageSize, MaximumMessageSize, _entityName), ServiceBusException.FailureReason.MessageSizeExceeded);
+ }
+
+ // Attempt to send the message batch.
+
+ var deliveryTag = new ArraySegment(BitConverter.GetBytes(Interlocked.Increment(ref _deliveryCount)));
+ var outcome = await link.SendMessageAsync(batchMessage, deliveryTag, AmqpConstants.NullBinary, timeout.CalculateRemaining(stopWatch.Elapsed)).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
- if (_sendLink?.TryGetOpenedObject(out var _) == true)
+ if (outcome.DescriptorCode != Accepted.Code)
{
- cancellationToken.ThrowIfCancellationRequested();
- await _sendLink.CloseAsync().ConfigureAwait(false);
+ throw AmqpError.CreateExceptionForError((outcome as Rejected)?.Error, _entityName);
}
- _sendLink?.Dispose();
- }
- catch (Exception ex)
- {
- _closed = false;
- ServiceBusEventSource.Log.ClientCloseError(clientType, _entityName, clientId, ex.Message);
+ //ServiceBusEventSource.Log.SendStop(Entityname, messageHash);
- throw;
- }
- finally
- {
- ServiceBusEventSource.Log.ClientCloseComplete(clientType, _entityName, clientId);
+ cancellationToken.ThrowIfCancellationRequested();
+ stopWatch.Stop();
}
}
///
- /// Sends an AMQP message that contains a batch of events to the associated Service Bus entity. If the size of events exceed the
- /// maximum size of a single batch, an exception will be triggered and the send will fail.
+ /// Sends a message to the associated Service Bus entity.
///
///
- /// A factory which can be used to produce an AMQP message containing the batch of events to be sent.
+ /// A message to send.
/// An optional instance to signal the request to cancel the operation.
///
- protected virtual async Task SendAsync(Func messageFactory,
- CancellationToken cancellationToken)
+ public override async Task SendAsync(
+ ServiceBusMessage message,
+ CancellationToken cancellationToken)
{
- var failedAttemptCount = 0;
- var stopWatch = Stopwatch.StartNew();
+ Argument.AssertNotNull(message, nameof(message));
+ Argument.AssertNotClosed(_closed, nameof(AmqpSender));
- SendingAmqpLink link;
+ await _retryPolicy.RunOperation(async (timeout) =>
+ await SendInternalAsync(
+ message,
+ timeout,
+ cancellationToken).ConfigureAwait(false),
+ _entityName,
+ _connectionScope,
+ cancellationToken).ConfigureAwait(false);
+ }
- try
+ ///
+ /// Sends a message to the associated Service Bus entity.
+ ///
+ ///
+ ///
+ ///
+ /// An optional instance to signal the request to cancel the operation.
+ ///
+ internal virtual async Task SendInternalAsync(
+ ServiceBusMessage message,
+ TimeSpan timeout,
+ CancellationToken cancellationToken)
+ {
+ var stopWatch = Stopwatch.StartNew();
+ using (AmqpMessage amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(message))
{
- var tryTimeout = _retryPolicy.CalculateTryTimeout(0);
- while (!cancellationToken.IsCancellationRequested)
+ //ServiceBusEventSource.Log.SendStart(Entityname, messageHash);
+
+ string messageHash = amqpMessage.GetHashCode().ToString();
+
+ SendingAmqpLink link = await _sendLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, timeout)).ConfigureAwait(false);
+
+ // Validate that the message is not too large to send. This is done after the link is created to ensure
+ // that the maximum message size is known, as it is dictated by the service using the link.
+
+ if (amqpMessage.SerializedMessageSize > MaximumMessageSize)
{
- try
- {
- using AmqpMessage batchMessage = messageFactory();
- string messageHash = batchMessage.GetHashCode().ToString();
+ throw new ServiceBusException(string.Format(Resources1.MessageSizeExceeded, messageHash, amqpMessage.SerializedMessageSize, MaximumMessageSize), ServiceBusException.FailureReason.MessageSizeExceeded, _entityName);
+ }
- //ServiceBusEventSource.Log.EventPublishStart(EventHubName, logPartition, messageHash);
+ // Attempt to send the message batch.
- link = await _sendLink.GetOrCreateAsync(UseMinimum(_connectionScope.SessionTimeout, tryTimeout)).ConfigureAwait(false);
- cancellationToken.ThrowIfCancellationRequested();
+ var deliveryTag = new ArraySegment(BitConverter.GetBytes(Interlocked.Increment(ref _deliveryCount)));
+ var outcome = await link.SendMessageAsync(amqpMessage, deliveryTag, AmqpConstants.NullBinary, timeout.CalculateRemaining(stopWatch.Elapsed)).ConfigureAwait(false);
+ cancellationToken.ThrowIfCancellationRequested();
- // Validate that the batch of messages is not too large to send. This is done after the link is created to ensure
- // that the maximum message size is known, as it is dictated by the service using the link.
+ if (outcome.DescriptorCode != Accepted.Code)
+ {
+ throw AmqpError.CreateExceptionForError((outcome as Rejected)?.Error, _entityName);
+ }
- if (batchMessage.SerializedMessageSize > MaximumMessageSize)
- {
- throw new ServiceBusException(
- string.Format(Resources1.MessageSizeExceeded, messageHash, batchMessage.SerializedMessageSize, MaximumMessageSize),
- ServiceBusException.FailureReason.MessageSizeExceeded,
- _entityName);
- }
+ //ServiceBusEventSource.Log.SendStop(Entityname, messageHash);
- // Attempt to send the message batch.
+ cancellationToken.ThrowIfCancellationRequested();
+ stopWatch.Stop();
+ }
+ }
- var deliveryTag = new ArraySegment(BitConverter.GetBytes(Interlocked.Increment(ref _deliveryCount)));
- var outcome = await link.SendMessageAsync(batchMessage, deliveryTag, AmqpConstants.NullBinary, tryTimeout.CalculateRemaining(stopWatch.Elapsed)).ConfigureAwait(false);
- cancellationToken.ThrowIfCancellationRequested();
+ ///
+ /// Closes the connection to the transport sender instance.
+ ///
+ ///
+ /// An optional instance to signal the request to cancel the operation.
+ ///
+ public override async Task CloseAsync(CancellationToken cancellationToken)
+ {
+ if (_closed)
+ {
+ return;
+ }
- if (outcome.DescriptorCode != Accepted.Code)
- {
- throw AmqpError.CreateExceptionForError((outcome as Rejected)?.Error, _entityName);
- }
+ _closed = true;
- // The send operation should be considered successful; return to
- // exit the retry loop.
+ var clientId = GetHashCode().ToString();
+ var clientType = GetType();
- return;
- }
- catch (Exception ex)
- {
- Exception activeEx = ex.TranslateServiceException(_entityName);
-
- // Determine if there should be a retry for the next attempt; if so enforce the delay but do not quit the loop.
- // Otherwise, bubble the exception.
-
- ++failedAttemptCount;
- TimeSpan? retryDelay = _retryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);
-
- if (retryDelay.HasValue && !_connectionScope.IsDisposed && !cancellationToken.IsCancellationRequested)
- {
- //ServiceBusEventSource.Log.EventPublishError(EventHubName, messageHash, activeEx.Message);
- await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);
-
- tryTimeout = _retryPolicy.CalculateTryTimeout(failedAttemptCount);
- stopWatch.Reset();
- }
- else if (ex is AmqpException)
- {
- throw activeEx;
- }
- else
- {
- throw;
- }
- }
- }
+ try
+ {
+ ServiceBusEventSource.Log.ClientCloseStart(clientType, _entityName, clientId);
+ cancellationToken.ThrowIfCancellationRequested();
- // If no value has been returned nor exception thrown by this point,
- // then cancellation has been requested.
+ if (_sendLink?.TryGetOpenedObject(out var _) == true)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ await _sendLink.CloseAsync().ConfigureAwait(false);
+ }
- throw new TaskCanceledException();
+ _sendLink?.Dispose();
}
- catch (Exception)
+ catch (Exception ex)
{
- //ServiceBusEventSource.Log.EventPublishError(EventHubName, logPartition, messageHash, ex.Message);
+ _closed = false;
+ ServiceBusEventSource.Log.ClientCloseError(clientType, _entityName, clientId, ex.Message);
+
throw;
}
finally
{
- stopWatch.Stop();
- //ServiceBusEventSource.Log.EventPublishComplete(EventHubName, logPartition, messageHash);
+ ServiceBusEventSource.Log.ClientCloseComplete(clientType, _entityName, clientId);
}
}
@@ -293,18 +375,16 @@ protected virtual async Task SendAsync(Func messageFactory,
///
///
///
- ///
///
///
public override async Task ScheduleMessageAsync(
ServiceBusMessage message,
- ServiceBusRetryPolicy retryPolicy,
CancellationToken cancellationToken = default)
{
long sequenceNumber = 0;
- Task scheduleTask = retryPolicy.RunOperation(async (timeout) =>
+ Task scheduleTask = _retryPolicy.RunOperation(async (timeout) =>
{
- sequenceNumber = await ScheduleMessageInternal(
+ sequenceNumber = await ScheduleMessageInternalAsync(
message,
timeout,
cancellationToken).ConfigureAwait(false);
@@ -323,7 +403,7 @@ public override async Task ScheduleMessageAsync(
///
///
///
- internal async Task ScheduleMessageInternal(
+ internal async Task ScheduleMessageInternalAsync(
ServiceBusMessage message,
TimeSpan timeout,
CancellationToken cancellationToken = default)
@@ -408,19 +488,16 @@ internal async Task ScheduleMessageInternal(
///
///
///
- ///
///
///
public override async Task CancelScheduledMessageAsync(
long sequenceNumber,
- ServiceBusRetryPolicy retryPolicy,
CancellationToken cancellationToken = default)
{
- Task cancelMessageTask = retryPolicy.RunOperation(async (timeout) =>
+ Task cancelMessageTask = _retryPolicy.RunOperation(async (timeout) =>
{
- await CancelScheduledMessageInternal(
+ await CancelScheduledMessageInternalAsync(
sequenceNumber,
- retryPolicy,
timeout,
cancellationToken).ConfigureAwait(false);
},
@@ -434,13 +511,11 @@ await CancelScheduledMessageInternal(
///
///
///
- ///
///
///
///
- internal async Task CancelScheduledMessageInternal(
+ internal async Task CancelScheduledMessageInternalAsync(
long sequenceNumber,
- ServiceBusRetryPolicy retryPolicy,
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportMessageBatch.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportMessageBatch.cs
new file mode 100644
index 000000000000..a451bbbe3d22
--- /dev/null
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportMessageBatch.cs
@@ -0,0 +1,65 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+using System;
+using System.Collections.Generic;
+
+namespace Azure.Messaging.ServiceBus.Core
+{
+ ///
+ /// Provides an abstraction for generalizing a batch of messages so that a dedicated instance may provide operations
+ /// for a specific transport, such as AMQP or JMS. It is intended that the public employ
+ /// a transport batch via containment and delegate operations to it rather than understanding protocol-specific details
+ /// for different transports.
+ ///
+ ///
+ internal abstract class TransportMessageBatch : IDisposable
+ {
+ ///
+ /// The maximum size allowed for the batch, in bytes. This includes the messages in the batch as
+ /// well as any overhead for the batch itself when sent to the Queue/Topic.
+ ///
+ ///
+ public abstract long MaximumSizeInBytes { get; }
+
+ ///
+ /// The size of the batch, in bytes, as it will be sent to the Queue/Topic
+ ///
+ ///
+ public abstract long SizeInBytes { get; }
+
+ ///
+ /// The count of messages contained in the batch.
+ ///
+ ///
+ public abstract int Count { get; }
+
+ ///
+ /// Attempts to add a message to the batch, ensuring that the size
+ /// of the batch does not exceed its maximum.
+ ///
+ ///
+ /// The message to attempt to add to the batch.
+ ///
+ /// true if the message was added; otherwise, false.
+ ///
+ public abstract bool TryAdd(ServiceBusMessage message);
+
+ ///
+ /// Represents the batch as an enumerable set of transport-specific
+ /// representations of a message.
+ ///
+ ///
+ /// The transport-specific message representation being requested.
+ ///
+ /// The set of messages as an enumerable of the requested type.
+ ///
+ public abstract IEnumerable AsEnumerable();
+
+ ///
+ /// Performs the task needed to clean up resources used by the .
+ ///
+ ///
+ public abstract void Dispose();
+ }
+}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportSender.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportSender.cs
index 3cff6ab6db0a..acac87540f5f 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportSender.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportSender.cs
@@ -28,39 +28,66 @@ internal abstract class TransportSender
public virtual bool IsClosed { get; }
///
- /// Sends a set of events to the associated Service Bus entity using a batched approach. If the size of events exceed the
- /// maximum size of a single batch, an exception will be triggered and the send will fail.
+ /// Creates a size-constraint batch to which may be added using a try-based pattern. If a message would
+ /// exceed the maximum allowable size of the batch, the batch will not allow adding the message and signal that scenario using its
+ /// return value.
+ ///
+ /// Because messages that would violate the size constraint cannot be added, publishing a batch will not trigger an exception when
+ /// attempting to send the message to the Queue/Topic.
+ ///
+ ///
+ /// The set of options to consider when creating this batch.
+ /// An optional instance to signal the request to cancel the operation.
+ ///
+ /// An with the requested .
+ ///
+ ///
+ ///
+ public abstract ValueTask CreateBatchAsync(
+ CreateBatchOptions options,
+ CancellationToken cancellationToken);
+ ///
+ /// Sends a message to the associated Service Bus entity.
///
///
- /// The set of event data to send.
+ /// A message to send.
/// An optional instance to signal the request to cancel the operation.
///
public abstract Task SendAsync(
- IEnumerable messages,
+ ServiceBusMessage message,
+ CancellationToken cancellationToken);
+
+ ///
+ /// Sends a set of messages to the associated Queue/Topic using a batched approach.
+ ///
+ ///
+ /// The set of messages to send.
+ /// An optional instance to signal the request to cancel the operation.
+ ///
+ /// A task to be resolved on when the operation has completed.
+ ///
+ public abstract Task SendBatchAsync(
+ ServiceBusMessageBatch messageBatch,
CancellationToken cancellationToken);
///
///
///
///
- ///
///
///
public abstract Task ScheduleMessageAsync(
ServiceBusMessage message,
- ServiceBusRetryPolicy retryPolicy,
CancellationToken cancellationToken = default);
///
///
///
///
- ///
///
///
public abstract Task CancelScheduledMessageAsync(
long sequenceNumber,
- ServiceBusRetryPolicy retryPolicy,
CancellationToken cancellationToken = default);
///
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/CreateBatchOptions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/CreateBatchOptions.cs
new file mode 100644
index 000000000000..1350e1c58bef
--- /dev/null
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/CreateBatchOptions.cs
@@ -0,0 +1,85 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+using System.ComponentModel;
+using Azure.Core;
+
+namespace Azure.Messaging.ServiceBus
+{
+
+ ///
+ /// The set of options that can be specified to influence the way in which an service bus message batch
+ /// behaves and is sent to the Queue/Topic.
+ ///
+ ///
+ public class CreateBatchOptions
+ {
+ /// The requested maximum size to allow for the batch, in bytes.
+ private long? _maximumSizeInBytes = null;
+
+ ///
+ /// The maximum size to allow for a single batch of messages, in bytes.
+ ///
+ ///
+ ///
+ /// The desired limit, in bytes, for the size of the associated service bus message batch. If null,
+ /// the maximum size allowed by the active transport will be used.
+ ///
+ ///
+ public long? MaximumSizeInBytes
+ {
+ get => _maximumSizeInBytes;
+
+ set
+ {
+ if (value.HasValue)
+ {
+ Argument.AssertAtLeast(value.Value, ServiceBusSender.MinimumBatchSizeLimit, nameof(MaximumSizeInBytes));
+ }
+
+ _maximumSizeInBytes = value;
+ }
+ }
+
+ ///
+ /// Determines whether the specified is equal to this instance.
+ ///
+ ///
+ /// The to compare with this instance.
+ ///
+ /// true if the specified is equal to this instance; otherwise, false.
+ ///
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public override bool Equals(object obj) => base.Equals(obj);
+
+ ///
+ /// Returns a hash code for this instance.
+ ///
+ ///
+ /// A hash code for this instance, suitable for use in hashing algorithms and data structures like a hash table.
+ ///
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public override int GetHashCode() => base.GetHashCode();
+
+ ///
+ /// Converts the instance to string representation.
+ ///
+ ///
+ /// A that represents this instance.
+ ///
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public override string ToString() => base.ToString();
+
+ ///
+ /// Creates a new copy of the current , cloning its attributes into a new instance.
+ ///
+ ///
+ /// A new copy of .
+ ///
+ internal CreateBatchOptions Clone() =>
+ new CreateBatchOptions
+ {
+ _maximumSizeInBytes = MaximumSizeInBytes
+ };
+ }
+}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusMessageBatch.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusMessageBatch.cs
index db836a35e4d3..32959b5fb5a8 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusMessageBatch.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusMessageBatch.cs
@@ -2,124 +2,95 @@
// Licensed under the MIT License.
using System;
-using System.Collections;
using System.Collections.Generic;
using Azure.Core;
+using Azure.Messaging.ServiceBus.Core;
namespace Azure.Messaging.ServiceBus
{
///
/// A set of with size constraints known up-front,
- /// intended to be sent to the Event Hubs service as a single batch.
+ /// intended to be sent to the Queue/Topic as a single batch.
///
///
- public sealed class ServiceBusMessageBatch : IDisposable, IEnumerable
+ public sealed class ServiceBusMessageBatch : IDisposable
{
///
- /// The maximum size allowed for the batch, in bytes. This includes the events in the batch as
- /// well as any overhead for the batch itself when sent to the Event Hubs service.
+ /// The maximum size allowed for the batch, in bytes. This includes the messages in the batch as
+ /// well as any overhead for the batch itself when sent to the Queue/Topic.
///
///
- public long MaximumSizeInBytes { get; }
+ public long MaximumSizeInBytes => InnerBatch.MaximumSizeInBytes;
///
- /// The size of the batch, in bytes, as it will be sent to the Event Hubs
- /// service.
+ /// The size of the batch, in bytes, as it will be sent to the Queue/Topic.
///
///
- public long SizeInBytes { get; }
+ public long SizeInBytes => InnerBatch.SizeInBytes;
///
- /// The count of events contained in the batch.
+ /// The count of messages contained in the batch.
///
///
- public int Count { get; }
+ public int Count => InnerBatch.Count;
- /////
- ///// The set of options that should be used when publishing the batch.
- /////
- /////
- //internal SendEventOptions SendOptions { get; }
-
- /////
- ///// The transport-specific batch responsible for performing the batch operations
- ///// in a manner compatible with the associated .
- /////
- /////
- //private TransportEventBatch InnerBatch { get; }
+ ///
+ /// The transport-specific batch responsible for performing the batch operations
+ /// in a manner compatible with the associated .
+ ///
+ ///
+ private TransportMessageBatch InnerBatch { get; }
- /////
- ///// Initializes a new instance of the class.
- /////
- /////
- ///// The transport-specific batch responsible for performing the batch operations.
- ///// The set of options that should be used when publishing the batch.
- /////
- /////
- ///// As an internal type, this class performs only basic sanity checks against its arguments. It
- ///// is assumed that callers are trusted and have performed deep validation.
- /////
- ///// Any parameters passed are assumed to be owned by this instance and safe to mutate or dispose;
- ///// creation of clones or otherwise protecting the parameters is assumed to be the purview of the
- ///// caller.
- /////
- /////
- //internal EventDataBatch(TransportEventBatch transportBatch,
- // SendEventOptions sendOptions)
- //{
- // Argument.AssertNotNull(transportBatch, nameof(transportBatch));
- // Argument.AssertNotNull(sendOptions, nameof(sendOptions));
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The transport-specific batch responsible for performing the batch operations.
+ ///
+ ///
+ /// As an internal type, this class performs only basic sanity checks against its arguments. It
+ /// is assumed that callers are trusted and have performed deep validation.
+ ///
+ /// Any parameters passed are assumed to be owned by this instance and safe to mutate or dispose;
+ /// creation of clones or otherwise protecting the parameters is assumed to be the purview of the
+ /// caller.
+ ///
+ ///
+ internal ServiceBusMessageBatch(TransportMessageBatch transportBatch)
+ {
+ Argument.AssertNotNull(transportBatch, nameof(transportBatch));
- // InnerBatch = transportBatch;
- // SendOptions = sendOptions;
- //}
+ InnerBatch = transportBatch;
+ }
///
- /// Attempts to add an event to the batch, ensuring that the size
+ /// Attempts to add a message to the batch, ensuring that the size
/// of the batch does not exceed its maximum.
///
///
- /// The event to attempt to add to the batch.
+ /// Message to attempt to add to the batch.
///
- /// true if the event was added; otherwise, false.
+ /// true if the message was added; otherwise, false.
///
public bool TryAdd(ServiceBusMessage message)
{
- return true;
- //bool instrumented = EventDataInstrumentation.InstrumentEvent(eventData);
- //bool added = InnerBatch.TryAdd(eventData);
-
- //if (!added && instrumented)
- //{
- // EventDataInstrumentation.ResetEvent(eventData);
- //}
-
- //return added;
+ return InnerBatch.TryAdd(message);
}
///
/// Performs the task needed to clean up resources used by the .
///
///
- public void Dispose() { }
+ public void Dispose() => InnerBatch.Dispose();
///
- ///
+ /// Represents the batch as an enumerable set of specific representations of a message.
///
- ///
- public IEnumerator GetEnumerator()
- {
- throw new NotImplementedException();
- }
-
- /////
- ///// Represents the batch as an enumerable set of specific representations of an event.
- /////
- /////
- ///// The specific event representation being requested.
- /////
- ///// The set of events as an enumerable of the requested type.
- /////
- //internal IEnumerable AsEnumerable() => InnerBatch.AsEnumerable();
+ ///
+ /// The specific message representation being requested.
+ ///
+ /// The set of messages as an enumerable of the requested type.
+ ///
+ internal IEnumerable AsEnumerable() => InnerBatch.AsEnumerable();
}
}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs
index 7e35ede9d348..89d98e25c366 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs
@@ -23,6 +23,9 @@ namespace Azure.Messaging.ServiceBus
///
public class ServiceBusSender : IAsyncDisposable
{
+ /// The minimum allowable size, in bytes, for a batch to be sent.
+ internal const int MinimumBatchSizeLimit = 24;
+
///
/// The fully qualified Service Bus namespace that the producer is associated with. This is likely
/// to be similar to {yournamespace}.servicebus.windows.net.
@@ -120,54 +123,82 @@ protected ServiceBusSender()
}
///
- /// Sends a set of events to the associated Service Bus entity using a batched approach. If the size of events exceed the
- /// maximum size of a single batch, an exception will be triggered and the send will fail.
+ /// Sends a message to the associated entity of Service Bus.
///
///
- /// The set of event data to send.
+ /// A messsage to send.
/// An optional instance to signal the request to cancel the operation.
///
/// A task to be resolved on when the operation has completed.
///
- ///
- ///
public virtual async Task SendAsync(
ServiceBusMessage message,
CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(message, nameof(message));
- await SendBatchAsync(new ServiceBusMessage[]{message}, cancellationToken).ConfigureAwait(false);
+ await _innerSender.SendAsync(message, cancellationToken).ConfigureAwait(false);
+ }
+
+ ///
+ /// Creates a size-constraint batch to which may be added using a try-based pattern. If a message would
+ /// exceed the maximum allowable size of the batch, the batch will not allow adding the message and signal that scenario using its
+ /// return value.
+ ///
+ /// Because messages that would violate the size constraint cannot be added, publishing a batch will not trigger an exception when
+ /// attempting to send the messages to the Queue/Topic.
+ ///
+ ///
+ /// An optional instance to signal the request to cancel the operation.
+ ///
+ /// An with the default batch options.
+ ///
+ ///
+ ///
+ public virtual ValueTask CreateBatchAsync(CancellationToken cancellationToken = default) => CreateBatchAsync(null, cancellationToken);
+
+ ///
+ /// Creates a size-constraint batch to which may be added using a try-based pattern. If a message would
+ /// exceed the maximum allowable size of the batch, the batch will not allow adding the message and signal that scenario using its
+ /// return value.
+ ///
+ /// Because messages that would violate the size constraint cannot be added, publishing a batch will not trigger an exception when
+ /// attempting to send the messages to the Queue/Topic.
+ ///
+ ///
+ /// The set of options to consider when creating this batch.
+ /// An optional instance to signal the request to cancel the operation.
+ ///
+ /// An with the requested .
+ ///
+ ///
+ ///
+ public virtual async ValueTask CreateBatchAsync(
+ CreateBatchOptions options,
+ CancellationToken cancellationToken = default)
+ {
+ options = options?.Clone() ?? new CreateBatchOptions();
+
+ TransportMessageBatch transportBatch = await _innerSender.CreateBatchAsync(options, cancellationToken).ConfigureAwait(false);
+ return new ServiceBusMessageBatch(transportBatch);
}
///
- /// Sends a set of events to the associated Service Bus entity using a batched approach. If the size of events exceed the
+ /// Sends a set of messages to the associated Service Bus entity using a batched approach. If the size of messages exceed the
/// maximum size of a single batch, an exception will be triggered and the send will fail.
///
///
- /// The set of event data to send.
+ /// The set of messages to send. A batch may be created using .
/// An optional instance to signal the request to cancel the operation.
///
/// A task to be resolved on when the operation has completed.
///
- ///
- ///
- public virtual async Task SendBatchAsync(IEnumerable messages, CancellationToken cancellationToken = default) =>
- await SendRangeInternal(messages, cancellationToken).ConfigureAwait(false);
-
- /////
- ///// Sends a set of events to the associated Service Bus entity using a batched approach. If the size of events exceed the
- ///// maximum size of a single batch, an exception will be triggered and the send will fail.
- /////
- /////
- ///// The set of event data to send.
- ///// An optional instance to signal the request to cancel the operation.
- /////
- ///// A task to be resolved on when the operation has completed.
- /////
- /////
- /////
- //public virtual async Task SendBatchAsync(ServiceBusMessageBatch messages, CancellationToken cancellationToken = default) =>
- // await SendRangeInternal(new ServiceBusMessage[] { }, cancellationToken).ConfigureAwait(false);
+ public virtual async Task SendBatchAsync(
+ ServiceBusMessageBatch messageBatch,
+ CancellationToken cancellationToken = default)
+ {
+ Argument.AssertNotNull(messageBatch, nameof(messageBatch));
+ await _innerSender.SendBatchAsync(messageBatch, cancellationToken).ConfigureAwait(false);
+ }
///
/// Schedules a message to appear on Service Bus at a later time.
@@ -184,7 +215,7 @@ public virtual async Task ScheduleMessageAsync(
//this.ThrowIfClosed();
Argument.AssertNotNull(message, nameof(message));
message.ScheduledEnqueueTimeUtc = scheduleEnqueueTimeUtc.UtcDateTime;
- return await _innerSender.ScheduleMessageAsync(message, _retryPolicy, cancellationToken).ConfigureAwait(false);
+ return await _innerSender.ScheduleMessageAsync(message, cancellationToken).ConfigureAwait(false);
}
///
@@ -195,40 +226,7 @@ public virtual async Task ScheduleMessageAsync(
public virtual async Task CancelScheduledMessageAsync(long sequenceNumber, CancellationToken cancellationToken = default)
{
//this.ThrowIfClosed();
- await _innerSender.CancelScheduledMessageAsync(sequenceNumber, _retryPolicy, cancellationToken).ConfigureAwait(false);
- }
-
- ///
- /// Sends a set of events to the associated Service Bus entity using a batched approach. If the size of events exceed the
- /// maximum size of a single batch, an exception will be triggered and the send will fail.
- ///
- ///
- /// The set of event data to send.
- /// An optional instance to signal the request to cancel the operation.
- ///
- /// A task to be resolved on when the operation has completed.
- ///
-
- ///
- internal async Task SendRangeInternal(
- IEnumerable messages,
- CancellationToken cancellationToken)
- {
- Argument.AssertNotNull(messages, nameof(messages));
-
- using DiagnosticScope scope = CreateDiagnosticScope();
- messages = messages.ToList();
- InstrumentMessages(messages);
-
- try
- {
- await _innerSender.SendAsync(messages, cancellationToken).ConfigureAwait(false);
- }
- catch (Exception ex)
- {
- scope.Failed(ex);
- throw;
- }
+ await _innerSender.CancelScheduledMessageAsync(sequenceNumber, cancellationToken).ConfigureAwait(false);
}
///
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusLiveTestBase.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusLiveTestBase.cs
similarity index 100%
rename from sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusLiveTestBase.cs
rename to sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusLiveTestBase.cs
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusTestBase.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusTestBase.cs
old mode 100755
new mode 100644
similarity index 83%
rename from sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusTestBase.cs
rename to sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusTestBase.cs
index 37eeb417a10b..47192a362edb
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusTestBase.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusTestBase.cs
@@ -21,6 +21,16 @@ protected IEnumerable GetMessages(int count, string sessionId
return messages;
}
+ protected ServiceBusMessageBatch AddMessages(ServiceBusMessageBatch batch, int count, string sessionId = null, string partitionKey = null)
+ {
+ for (int i = 0; i < count; i++)
+ {
+ Assert.That(() => batch.TryAdd(GetMessage(sessionId, partitionKey)), Is.True, "A message was rejected by the batch; all messages should be accepted.");
+ }
+
+ return batch;
+ }
+
protected Task ExceptionHandler(ProcessErrorEventArgs eventArgs)
{
Assert.Fail(eventArgs.Exception.ToString());
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs
similarity index 91%
rename from sdk/servicebus/Azure.Messaging.ServiceBus/tests/ProcessorLiveTests.cs
rename to sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs
index 3139dbc39409..f67840fd090b 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ProcessorLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs
@@ -1,11 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
-using System;
-using System.Collections.Generic;
using System.Linq;
-using System.Linq.Expressions;
-using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
@@ -30,7 +26,11 @@ public async Task Receive_Event(int numThreads)
// use double the number of threads so we can make sure we test that we don't
// retrieve more messages than expected when there are more messages available
- await sender.SendBatchAsync(GetMessages(numThreads * 2));
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ ServiceBusMessageBatch messageBatch = AddMessages(batch, numThreads * 2);
+
+ await sender.SendBatchAsync(messageBatch);
+
await using var processor = client.GetProcessor(scope.QueueName);
int messageCt = 0;
@@ -83,7 +83,10 @@ public async Task Receive_StopProcessing(int numThreads)
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
ServiceBusSender sender = client.GetSender(scope.QueueName);
int numMessages = 50;
- await sender.SendBatchAsync(GetMessages(numMessages));
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ ServiceBusMessageBatch messageBatch = AddMessages(batch, numMessages);
+
+ await sender.SendBatchAsync(messageBatch);
await using var processor = client.GetProcessor(scope.QueueName);
int messageProcessedCt = 0;
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs
similarity index 81%
rename from sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs
rename to sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs
index 3c1a5502e895..bd1b5089be92 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs
@@ -3,10 +3,8 @@
using System;
using System.Collections.Generic;
-using System.Diagnostics;
using System.Linq;
using System.Text;
-using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
@@ -20,11 +18,13 @@ public async Task Peek()
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
- ServiceBusSender sender = client.GetSender(scope.QueueName);
var messageCt = 10;
- IEnumerable sentMessages = GetMessages(messageCt);
- await sender.SendBatchAsync(sentMessages);
+ ServiceBusSender sender = client.GetSender(scope.QueueName);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ IEnumerable sentMessages = AddMessages(batch, messageCt).AsEnumerable();
+
+ await sender.SendBatchAsync(batch);
await using var receiver = client.GetReceiver(scope.QueueName);
@@ -51,10 +51,13 @@ public async Task ReceiveMessagesInPeekLockMode()
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
- ServiceBusSender sender = client.GetSender(scope.QueueName);
var messageCount = 10;
- IEnumerable messages = GetMessages(messageCount);
- await sender.SendBatchAsync(messages);
+
+ ServiceBusSender sender = client.GetSender(scope.QueueName);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ IEnumerable messages = AddMessages(batch, messageCount).AsEnumerable();
+
+ await sender.SendBatchAsync(batch);
var receiver = client.GetReceiver(scope.QueueName);
var receivedMessageCount = 0;
@@ -84,10 +87,13 @@ public async Task CompleteMessages()
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
- ServiceBusSender sender = client.GetSender(scope.QueueName);
var messageCount = 10;
- IEnumerable messages = GetMessages(messageCount);
- await sender.SendBatchAsync(messages);
+
+ ServiceBusSender sender = client.GetSender(scope.QueueName);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ IEnumerable messages = AddMessages(batch, messageCount).AsEnumerable();
+
+ await sender.SendBatchAsync(batch);
var receiver = client.GetReceiver(scope.QueueName);
var receivedMessageCount = 0;
@@ -102,8 +108,8 @@ public async Task CompleteMessages()
}
Assert.AreEqual(messageCount, receivedMessageCount);
- var message = receiver.PeekAsync();
- Assert.IsNull(message.Result);
+ var peekedMessage = receiver.PeekAsync();
+ Assert.IsNull(peekedMessage.Result);
}
}
@@ -113,10 +119,13 @@ public async Task AbandonMessages()
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
- ServiceBusSender sender = client.GetSender(scope.QueueName);
var messageCount = 10;
- IEnumerable messages = GetMessages(messageCount);
- await sender.SendBatchAsync(messages);
+
+ ServiceBusSender sender = client.GetSender(scope.QueueName);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ IEnumerable messages = AddMessages(batch, messageCount).AsEnumerable();
+
+ await sender.SendBatchAsync(batch);
var receiver = client.GetReceiver(scope.QueueName);
var receivedMessageCount = 0;
@@ -150,10 +159,13 @@ public async Task DeadLetterMessages()
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
- ServiceBusSender sender = client.GetSender(scope.QueueName);
var messageCount = 10;
- IEnumerable messages = GetMessages(messageCount);
- await sender.SendBatchAsync(messages);
+
+ ServiceBusSender sender = client.GetSender(scope.QueueName);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ IEnumerable messages = AddMessages(batch, messageCount).AsEnumerable();
+
+ await sender.SendBatchAsync(batch);
var receiver = client.GetReceiver(scope.QueueName);
var receivedMessageCount = 0;
@@ -168,8 +180,8 @@ public async Task DeadLetterMessages()
}
Assert.AreEqual(messageCount, receivedMessageCount);
- var message = receiver.PeekAsync();
- Assert.IsNull(message.Result);
+ var peekedMessage = receiver.PeekAsync();
+ Assert.IsNull(peekedMessage.Result);
messageEnum.Reset();
receivedMessageCount = 0;
@@ -196,10 +208,13 @@ public async Task DeferMessages()
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
- ServiceBusSender sender = client.GetSender(scope.QueueName);
var messageCount = 10;
- IEnumerable messages = GetMessages(messageCount);
- await sender.SendBatchAsync(messages);
+
+ ServiceBusSender sender = client.GetSender(scope.QueueName);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ IEnumerable messages = AddMessages(batch, messageCount).AsEnumerable();
+
+ await sender.SendBatchAsync(batch);
var receiver = client.GetReceiver(scope.QueueName);
var receivedMessageCount = 0;
@@ -233,10 +248,13 @@ public async Task ReceiveMessagesInReceiveAndDeleteMode()
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
- ServiceBusSender sender = client.GetSender(scope.QueueName);
var messageCount = 10;
- IEnumerable messages = GetMessages(messageCount);
- await sender.SendBatchAsync(messages);
+
+ ServiceBusSender sender = client.GetSender(scope.QueueName);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ IEnumerable messages = AddMessages(batch, messageCount).AsEnumerable();
+
+ await sender.SendBatchAsync(batch);
var clientOptions = new ServiceBusReceiverOptions()
{
@@ -254,8 +272,8 @@ public async Task ReceiveMessagesInReceiveAndDeleteMode()
}
Assert.AreEqual(messageCount, receivedMessageCount);
- var message = receiver.PeekAsync();
- Assert.IsNull(message.Result);
+ var peekedMessage = receiver.PeekAsync();
+ Assert.IsNull(peekedMessage.Result);
}
}
@@ -290,30 +308,27 @@ public async Task RenewMessageLock()
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
ServiceBusSender sender = client.GetSender(scope.QueueName);
var messageCount = 1;
- IEnumerable messages = GetMessages(messageCount);
- await sender.SendBatchAsync(messages);
+ ServiceBusMessage message = GetMessage();
+ await sender.SendAsync(message);
var receiver = client.GetReceiver(scope.QueueName);
ServiceBusReceivedMessage[] receivedMessages = (await receiver.ReceiveBatchAsync(messageCount)).ToArray();
- var message = receivedMessages.First();
- var firstLockedUntilUtcTime = message.LockedUntilUtc;
+ var receivedMessage = receivedMessages.First();
+ var firstLockedUntilUtcTime = receivedMessage.LockedUntilUtc;
// Sleeping for 10 seconds...
await Task.Delay(10000);
- await receiver.RenewLockAsync(message);
+ await receiver.RenewLockAsync(receivedMessage);
- Assert.True(message.LockedUntilUtc >= firstLockedUntilUtcTime + TimeSpan.FromSeconds(10));
+ Assert.True(receivedMessage.LockedUntilUtc >= firstLockedUntilUtcTime + TimeSpan.FromSeconds(10));
// Complete Messages
- await receiver.CompleteAsync(message);
-
- var messageEnum = messages.GetEnumerator();
- messageEnum.MoveNext();
+ await receiver.CompleteAsync(receivedMessage);
Assert.AreEqual(messageCount, receivedMessages.Length);
- Assert.AreEqual(messageEnum.Current.MessageId, message.MessageId);
+ Assert.AreEqual(message.MessageId, receivedMessage.MessageId);
var peekedMessage = receiver.PeekAsync();
Assert.IsNull(peekedMessage.Result);
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs
similarity index 89%
rename from sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs
rename to sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs
index df05e1b10ee6..23205245f740 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs
@@ -5,11 +5,9 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
-using System.Runtime.InteropServices.ComTypes;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
-using Azure.Core;
using NUnit.Framework;
using NUnit.Framework.Internal;
@@ -33,8 +31,10 @@ public async Task Peek_Session(long? sequenceNumber, string partitionKey)
var sessionId = Guid.NewGuid().ToString();
// send the messages
- IEnumerable sentMessages = GetMessages(messageCt, sessionId, partitionKey);
- await sender.SendBatchAsync(sentMessages);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ IEnumerable sentMessages = AddMessages(batch, messageCt, sessionId, partitionKey).AsEnumerable();
+
+ await sender.SendBatchAsync(batch);
Dictionary sentMessageIdToMsg = new Dictionary();
foreach (ServiceBusMessage message in sentMessages)
{
@@ -81,8 +81,10 @@ public async Task Lock_Same_Session_Should_Throw()
int messageCt = 10;
var sessionId = Guid.NewGuid().ToString();
// send the messages
- IEnumerable sentMessages = GetMessages(messageCt, sessionId);
- await sender.SendBatchAsync(sentMessages);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ ServiceBusMessageBatch messageBatch = AddMessages(batch, messageCt, sessionId);
+
+ await sender.SendBatchAsync(messageBatch);
var options = new ServiceBusReceiverOptions
{
RetryOptions = new ServiceBusRetryOptions
@@ -119,8 +121,10 @@ public async Task PeekRange_IncrementsSequenceNumber(int messageCt, int peekCt)
var sessionId = Guid.NewGuid().ToString();
// send the messages
- IEnumerable sentMessages = GetMessages(messageCt, sessionId);
- await sender.SendBatchAsync(sentMessages);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ ServiceBusMessageBatch messagebatch = AddMessages(batch, messageCt, sessionId);
+
+ await sender.SendBatchAsync(messagebatch);
ServiceBusReceiver receiver = await client.GetSessionReceiverAsync(scope.QueueName);
long seq = 0;
@@ -151,8 +155,10 @@ public async Task Peek_IncrementsSequenceNmber(int messageCt)
ServiceBusSender sender = client.GetSender(scope.QueueName);
var sessionId = Guid.NewGuid().ToString();
// send the messages
- IEnumerable sentMessages = GetMessages(messageCt, sessionId);
- await sender.SendBatchAsync(sentMessages);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ ServiceBusMessageBatch messagebatch = AddMessages(batch, messageCt, sessionId);
+
+ await sender.SendBatchAsync(messagebatch);
ServiceBusReceiver receiver = await client.GetSessionReceiverAsync(scope.QueueName);
@@ -180,11 +186,12 @@ public async Task RoundRobinSessions()
var messageCt = 10;
HashSet sessions = new HashSet() { "1", "2", "3" };
-
// send the messages
foreach (string session in sessions)
{
- await sender.SendBatchAsync(GetMessages(messageCt, session));
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ ServiceBusMessageBatch messageBatch = AddMessages(batch, messageCt, session);
+ await sender.SendBatchAsync(messageBatch);
}
// create receiver not scoped to a specific session
@@ -196,7 +203,7 @@ public async Task RoundRobinSessions()
fromSequenceNumber: 1,
maxMessages: 10))
{
- var sessionId = receiver.SessionManager.SessionId;
+ var sessionId = receiver.SessionManager.SessionId;
Assert.AreEqual(sessionId, peekedMessage.SessionId);
}
@@ -216,8 +223,10 @@ public async Task ReceiveMessagesInPeekLockMode()
var messageCount = 10;
var sessionId = "sessionId1";
- IEnumerable messages = GetMessages(messageCount, sessionId);
- await sender.SendBatchAsync(messages);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ IEnumerable messages = AddMessages(batch, messageCount, sessionId).AsEnumerable();
+
+ await sender.SendBatchAsync(batch);
ServiceBusReceiver receiver = await client.GetSessionReceiverAsync(scope.QueueName);
@@ -254,8 +263,10 @@ public async Task ReceiveMessagesInReceiveAndDeleteMode()
var messageCount = 10;
var sessionId = "sessionId1";
- IEnumerable messages = GetMessages(messageCount, sessionId);
- await sender.SendBatchAsync(messages);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ IEnumerable messages = AddMessages(batch, messageCount, sessionId).AsEnumerable();
+
+ await sender.SendBatchAsync(batch);
var clientOptions = new ServiceBusReceiverOptions()
{
@@ -279,8 +290,8 @@ public async Task ReceiveMessagesInReceiveAndDeleteMode()
}
Assert.AreEqual(messageCount, receivedMessageCount);
- var message = receiver.PeekAsync();
- Assert.IsNull(message.Result);
+ var peekedMessage = receiver.PeekAsync();
+ Assert.IsNull(peekedMessage.Result);
}
}
@@ -296,8 +307,10 @@ public async Task CompleteMessages(bool useSpecificSession)
var messageCount = 10;
var sessionId = "sessionId1";
- IEnumerable messages = GetMessages(messageCount, sessionId);
- await sender.SendBatchAsync(messages);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ IEnumerable messages = AddMessages(batch, messageCount, sessionId).AsEnumerable();
+
+ await sender.SendBatchAsync(batch);
ServiceBusReceiver receiver = await client.GetSessionReceiverAsync(
scope.QueueName,
@@ -315,8 +328,8 @@ public async Task CompleteMessages(bool useSpecificSession)
}
Assert.AreEqual(messageCount, receivedMessageCount);
- var message = receiver.PeekAsync();
- Assert.IsNull(message.Result);
+ var peekedMessage = receiver.PeekAsync();
+ Assert.IsNull(peekedMessage.Result);
}
}
@@ -332,8 +345,10 @@ public async Task AbandonMessages(bool useSpecificSession)
var messageCount = 10;
var sessionId = "sessionId1";
- IEnumerable messages = GetMessages(messageCount, sessionId);
- await sender.SendBatchAsync(messages);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ IEnumerable messages = AddMessages(batch, messageCount, sessionId).AsEnumerable();
+
+ await sender.SendBatchAsync(batch);
ServiceBusReceiver receiver = await client.GetSessionReceiverAsync(
scope.QueueName,
@@ -376,8 +391,10 @@ public async Task DeadLetterMessages(bool useSpecificSession)
ServiceBusSender sender = client.GetSender(scope.QueueName);
var messageCount = 10;
var sessionId = "sessionId1";
- IEnumerable messages = GetMessages(messageCount, sessionId);
- await sender.SendBatchAsync(messages);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ IEnumerable messages = AddMessages(batch, messageCount, sessionId).AsEnumerable();
+
+ await sender.SendBatchAsync(batch);
var receiver = await client.GetSessionReceiverAsync(
scope.QueueName,
@@ -395,8 +412,8 @@ public async Task DeadLetterMessages(bool useSpecificSession)
}
Assert.AreEqual(messageCount, receivedMessageCount);
- var message = receiver.PeekAsync();
- Assert.IsNull(message.Result);
+ var peekedMessage = receiver.PeekAsync();
+ Assert.IsNull(peekedMessage.Result);
// TODO: System.InvalidOperationException : Cannot create a MessageSession for a sub-queue.
@@ -432,8 +449,10 @@ public async Task DeferMessages(bool useSpecificSession)
var messageCount = 10;
var sessionId = "sessionId1";
- IEnumerable messages = GetMessages(messageCount, sessionId);
- await sender.SendBatchAsync(messages);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ IEnumerable messages = AddMessages(batch, messageCount, sessionId).AsEnumerable();
+
+ await sender.SendBatchAsync(batch);
var receiver = await client.GetSessionReceiverAsync(
scope.QueueName,
@@ -473,11 +492,11 @@ public async Task RenewSessionLock(bool isSessionSpecified)
ServiceBusSender sender = client.GetSender(scope.QueueName);
var messageCount = 1;
var sessionId1 = "sessionId1";
- IEnumerable messages = GetMessages(messageCount, sessionId1);
+ ServiceBusMessage message = GetMessage(sessionId1);
// send another session message before the one we are interested in to make sure that when isSessionSpecified=true, it is being respected
- await sender.SendBatchAsync(GetMessages(messageCount, "sessionId2"));
- await sender.SendBatchAsync(messages);
+ await sender.SendAsync(GetMessage("sessionId2"));
+ await sender.SendAsync(message);
ServiceBusReceiver receiver = await client.GetSessionReceiverAsync(scope.QueueName, sessionId: isSessionSpecified ? sessionId1 : null);
if (isSessionSpecified)
@@ -486,7 +505,7 @@ public async Task RenewSessionLock(bool isSessionSpecified)
}
ServiceBusReceivedMessage[] receivedMessages = (await receiver.ReceiveBatchAsync(messageCount)).ToArray();
- var message = receivedMessages.First();
+ var receivedMessage = receivedMessages.First();
var firstLockedUntilUtcTime = receiver.SessionManager.LockedUntilUtc;
// Sleeping for 10 seconds...
@@ -497,15 +516,12 @@ public async Task RenewSessionLock(bool isSessionSpecified)
Assert.True(receiver.SessionManager.LockedUntilUtc >= firstLockedUntilUtcTime + TimeSpan.FromSeconds(10));
// Complete Messages
- await receiver.CompleteAsync(message);
-
- var messageEnum = messages.GetEnumerator();
- messageEnum.MoveNext();
+ await receiver.CompleteAsync(receivedMessage);
Assert.AreEqual(messageCount, receivedMessages.Length);
if (isSessionSpecified)
{
- Assert.AreEqual(messageEnum.Current.MessageId, message.MessageId);
+ Assert.AreEqual(message.MessageId, receivedMessage.MessageId);
}
var peekedMessage = receiver.PeekAsync();
@@ -632,7 +648,7 @@ async Task ProcessMessage(ProcessMessageEventArgs args)
var message = args.Message;
await receiver.CompleteAsync(message);
sessions.TryRemove(message.SessionId, out bool _);
- Assert.AreEqual(message.SessionId, receiver.SessionManager.SessionId);
+ Assert.AreEqual(message.SessionId, receiver.SessionManager.SessionId);
Assert.IsNotNull(receiver.SessionManager.LockedUntilUtc);
}
finally
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/SenderLiveTests.cs
similarity index 62%
rename from sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs
rename to sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/SenderLiveTests.cs
index 0eb161a32b27..10ce18bbce3e 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/SenderLiveTests.cs
@@ -2,7 +2,9 @@
// Licensed under the MIT License.
using System;
+using System.Collections.Generic;
using System.Net;
+using System.Text;
using System.Threading.Tasks;
using Azure.Identity;
using Azure.Messaging.ServiceBus;
@@ -20,7 +22,7 @@ public async Task Send_ConnString()
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var sender = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString).GetSender(scope.QueueName);
- await sender.SendBatchAsync(GetMessages(10));
+ await sender.SendAsync(GetMessage());
}
}
@@ -88,6 +90,85 @@ public async Task Send_Topic_Session()
}
}
+ [Test]
+ public async Task SenderCanSendAMessageBatch()
+ {
+ await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
+ {
+ await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
+ ServiceBusSender sender = client.GetSender(scope.QueueName);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ ServiceBusMessageBatch messageBatch = AddMessages(batch, 3);
+
+ await sender.SendBatchAsync(messageBatch);
+ }
+ }
+
+ [Test]
+ public async Task SenderCanSendAnEmptyBodyMessageBatch()
+ {
+ await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
+ {
+ await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
+ ServiceBusSender sender = client.GetSender(scope.QueueName);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ batch.TryAdd(new ServiceBusMessage(Array.Empty()));
+
+ await sender.SendBatchAsync(batch);
+ }
+ }
+
+ [Test]
+ public async Task SenderCanSendLargeMessageBatch()
+ {
+ await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
+ {
+ await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
+ ServiceBusSender sender = client.GetSender(scope.QueueName);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+
+ // Actual limit is 262144 bytes for a single message.
+ batch.TryAdd(new ServiceBusMessage(new byte[100000 / 3]));
+ batch.TryAdd(new ServiceBusMessage(new byte[100000 / 3]));
+ batch.TryAdd(new ServiceBusMessage(new byte[100000 / 3]));
+
+ await sender.SendBatchAsync(batch);
+ }
+ }
+
+ [Test]
+ public async Task SenderCannotSendLargerThanMaximumSize()
+ {
+ await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
+ {
+ await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
+ ServiceBusSender sender = client.GetSender(scope.QueueName);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+
+ // Actual limit is 262144 bytes for a single message.
+ ServiceBusMessage message = new ServiceBusMessage(new byte[300000]);
+
+ Assert.That(async () => await sender.SendAsync(message), Throws.InstanceOf().And.Property(nameof(ServiceBusException.Reason)).EqualTo(ServiceBusException.FailureReason.MessageSizeExceeded));
+ }
+ }
+
+ [Test]
+ public async Task TryAddReturnsFalseIfSizeExceed()
+ {
+ await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
+ {
+ await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
+ ServiceBusSender sender = client.GetSender(scope.QueueName);
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+
+ // Actual limit is 262144 bytes for a single message.
+ Assert.That(() => batch.TryAdd(new ServiceBusMessage(new byte[200000])), Is.True, "A message was rejected by the batch; all messages should be accepted.");
+ Assert.That(() => batch.TryAdd(new ServiceBusMessage(new byte[200000])), Is.False, "A message was rejected by the batch; message size exceed.");
+
+ await sender.SendBatchAsync(batch);
+ }
+ }
+
[Test]
public async Task ClientProperties()
{
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/SenderTests.cs
similarity index 80%
rename from sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderTests.cs
rename to sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/SenderTests.cs
index f8525bc89543..3a620b007897 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/SenderTests.cs
@@ -26,22 +26,22 @@ public void Send_NullShouldThrow()
Assert.ThrowsAsync(async () => await mock.Object.SendAsync(null));
}
- [Test]
- public async Task Send_DelegatesToSendRange()
- {
- var mock = new Mock()
- {
- CallBase = true
- };
- mock
- .Setup(m => m.SendBatchAsync(
- It.Is>(value => value.Count() == 1),
- It.IsAny()))
- .Returns(Task.CompletedTask)
- .Verifiable("The single send should delegate to the batch send.");
+ //[Test]
+ //public async Task Send_DelegatesToSendRange()
+ //{
+ // var mock = new Mock()
+ // {
+ // CallBase = true
+ // };
+ // mock
+ // .Setup(m => m.SendBatchAsync(
+ // It.Is>(value => value.Count() == 1),
+ // It.IsAny()))
+ // .Returns(Task.CompletedTask)
+ // .Verifiable("The single send should delegate to the batch send.");
- await mock.Object.SendAsync(new ServiceBusMessage());
- }
+ // await mock.Object.SendAsync(new ServiceBusMessage());
+ //}
[Test]
public void SendRange_NullShouldThrow()