From e3b2ef6fdd9cc7059afc33546a64b6757e0784e2 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Tue, 3 Mar 2026 16:45:52 -0800 Subject: [PATCH 01/12] Cap ServiceBusMessageBatch size at 1 MB to match broker enforcement The Service Bus broker enforces a 1 MB batch size limit regardless of the max-message-size advertised on the AMQP link. Premium partitioned namespaces advertise 100 MB on the link, causing tryAddMessage() to accept batches the broker will reject. Cap batch creation in ServiceBusSenderAsyncClient.createMessageBatch() at 1 MB (MAX_BATCH_SIZE_BYTES). This is the single enforcement point: both sendMessages(iterable) and scheduleMessages(iterable) call createMessageBatch internally. Single-message paths (sendMessage, scheduleMessage) are NOT capped since the 1 MB limit is batch-specific and individual messages on Premium can validly exceed 1 MB up to the per-entity limit. When a user requests a batch size exceeding 1 MB via CreateMessageBatchOptions, throw ServiceBusException. Tracking: azure-service-bus#708 ICM: 51000000793879 --- .../ServiceBusSenderAsyncClient.java | 18 +- .../ServiceBusSenderAsyncClientTest.java | 329 ++++++++++++++++++ 2 files changed, 343 insertions(+), 4 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java index 58db3b129e65..35abdc92caa5 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java @@ -222,6 +222,16 @@ public final class ServiceBusSenderAsyncClient implements AutoCloseable { * The default maximum allowable size, in bytes, for a batch to be sent. */ static final int MAX_MESSAGE_LENGTH_BYTES = 256 * 1024; + // Temporary workaround: Service Bus enforces a maximum batch payload size of 1 MB that is not + // communicated via the AMQP link's max-message-size property. The link reports the per-message + // limit (up to 100 MB for Premium partitioned), but the broker rejects batch sends above 1 MB. + // This cap is applied only in createMessageBatch(), which is the single enforcement point for + // batch size limits. The sendMessages(iterable) and scheduleMessages(iterable) paths use + // createMessageBatch() internally and are therefore also capped. Single-message paths + // (sendMessage, scheduleMessage) are not capped since individual messages on Premium can + // validly exceed 1 MB up to the per-entity limit. + // Tracked by: https://github.com/Azure/azure-service-bus/issues/708 + static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024; private static final String TRANSACTION_LINK_NAME = "coordinator"; private static final ServiceBusMessage END = new ServiceBusMessage(new byte[0]); private static final CreateMessageBatchOptions DEFAULT_BATCH_OPTIONS = new CreateMessageBatchOptions(); @@ -463,15 +473,15 @@ public Mono createMessageBatch(CreateMessageBatchOptions final int maxSize = options.getMaximumSizeInBytes(); return getSendLinkWithRetry("create-batch").flatMap(link -> link.getLinkSize().flatMap(size -> { - final int maximumLinkSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES; + final int maximumLinkSize = Math.min(size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES, MAX_BATCH_SIZE_BYTES); if (maxSize > maximumLinkSize) { return monoError(logger, new IllegalArgumentException(String.format(Locale.US, - "CreateMessageBatchOptions.getMaximumSizeInBytes (%s bytes) is larger than the link size" - + " (%s bytes).", + "CreateMessageBatchOptions.getMaximumSizeInBytes (%s bytes) is larger than the maximum" + + " allowed size (%s bytes).", maxSize, maximumLinkSize))); } - final int batchSize = maxSize > 0 ? maxSize : maximumLinkSize; + final int batchSize = maxSize > 0 ? Math.min(maxSize, maximumLinkSize) : maximumLinkSize; return Mono .just(new ServiceBusMessageBatch(isV2, batchSize, link::getErrorContext, tracer, messageSerializer)); })).onErrorMap(this::mapError); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java index 20ca897316e9..a170d0e3d3dd 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java @@ -75,6 +75,7 @@ import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY; import static com.azure.core.util.tracing.Tracer.PARENT_TRACE_CONTEXT_KEY; import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY; +import static com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.MAX_BATCH_SIZE_BYTES; import static com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.MAX_MESSAGE_LENGTH_BYTES; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -302,6 +303,105 @@ void createsMessageBatchWithSize(boolean isV2) { }).expectComplete().verify(DEFAULT_TIMEOUT); } + /** + * Verifies that the batch max size is capped at MAX_BATCH_SIZE_BYTES (1 MB) when the link reports a larger size. + * This simulates a Premium partitioned namespace where the link advertises up to 100 MB per-message. + */ + @ParameterizedTest + @MethodSource("selectStack") + void createBatchCappedAtMaxBatchSizeWhenLinkReportsLargerSize(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + int largeLinkSize = 100 * 1024 * 1024; // 100 MB + + final AmqpSendLink link = mock(AmqpSendLink.class); + when(link.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link)); + + // Act & Assert + StepVerifier.create(sender.createMessageBatch()).assertNext(batch -> { + Assertions.assertEquals(MAX_BATCH_SIZE_BYTES, batch.getMaxSizeInBytes()); + }).expectComplete().verify(DEFAULT_TIMEOUT); + } + + /** + * Verifies that the batch max size uses the link size when it is smaller than MAX_BATCH_SIZE_BYTES (1 MB). + * This simulates a Standard namespace where the link advertises 256 KB. + */ + @ParameterizedTest + @MethodSource("selectStack") + void createBatchUsesLinkSizeWhenSmallerThanMaxBatchSize(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + int smallLinkSize = 256 * 1024; // 256 KB + + final AmqpSendLink link = mock(AmqpSendLink.class); + when(link.getLinkSize()).thenReturn(Mono.just(smallLinkSize)); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link)); + + // Act & Assert + StepVerifier.create(sender.createMessageBatch()).assertNext(batch -> { + Assertions.assertEquals(smallLinkSize, batch.getMaxSizeInBytes()); + }).expectComplete().verify(DEFAULT_TIMEOUT); + } + + /** + * Verifies that user-specified maxSize exceeding the effective 1 MB cap throws an error. + */ + @ParameterizedTest + @MethodSource("selectStack") + void createBatchWithOptionsExceedingMaxBatchSizeCapThrowsError(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + int largeLinkSize = 100 * 1024 * 1024; // 100 MB + int requestedBatchSize = 2 * 1024 * 1024; // 2 MB - exceeds 1 MB cap + + final AmqpSendLink link = mock(AmqpSendLink.class); + when(link.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link)); + + final CreateMessageBatchOptions options + = new CreateMessageBatchOptions().setMaximumSizeInBytes(requestedBatchSize); + + // Act & Assert + // The IllegalArgumentException from createMessageBatch is wrapped by mapError into ServiceBusException. + StepVerifier.create(sender.createMessageBatch(options)) + .expectError(ServiceBusException.class) + .verify(DEFAULT_TIMEOUT); + } + + /** + * Verifies that user-specified maxSize smaller than the 1 MB cap is respected. + */ + @ParameterizedTest + @MethodSource("selectStack") + void createBatchWithOptionsSmallerThanMaxBatchSizeCapIsRespected(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + int largeLinkSize = 100 * 1024 * 1024; // 100 MB + int requestedBatchSize = 500 * 1024; // 500 KB + + final AmqpSendLink link = mock(AmqpSendLink.class); + when(link.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link)); + + final CreateMessageBatchOptions options + = new CreateMessageBatchOptions().setMaximumSizeInBytes(requestedBatchSize); + + // Act & Assert + StepVerifier.create(sender.createMessageBatch(options)).assertNext(batch -> { + Assertions.assertEquals(requestedBatchSize, batch.getMaxSizeInBytes()); + }).expectComplete().verify(DEFAULT_TIMEOUT); + } + @ParameterizedTest @MethodSource("selectStack") void scheduleMessageSizeTooBig(boolean isV2) { @@ -745,6 +845,31 @@ void sendMessagesList(boolean isV2) { messagesSent.forEach(message -> Assertions.assertEquals(Section.SectionType.Data, message.getBody().getType())); } + /** + * Verifies that sendMessages(Iterable) internally uses createMessageBatch() which caps at + * MAX_BATCH_SIZE_BYTES (1 MB) even when the link reports a much larger size (e.g. 100 MB Premium). + * The sendIterable → sendNextIterableBatch → createMessageBatch() path is covered. + */ + @ParameterizedTest + @MethodSource("selectStack") + void sendMessagesIterableWithLargeLinkCapsAt1MB(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + final int largeLinkSize = 100 * 1024 * 1024; // 100 MB - simulates Premium partitioned namespace + final int count = 4; + final List messages = TestUtils.getServiceBusMessages(count, UUID.randomUUID().toString()); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); + when(sendLink.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + when(sendLink.send(anyList())).thenReturn(Mono.empty()); + when(sendLink.send(any(Message.class))).thenReturn(Mono.empty()); + + // Act - sendMessages(Iterable) goes through sendIterable → createMessageBatch() which caps at 1 MB. + // Small messages still fit within the 1 MB cap, so operation completes successfully. + StepVerifier.create(sender.sendMessages(messages)).expectComplete().verify(DEFAULT_TIMEOUT); + } + /** * Verifies that sending multiple message which does not fit in single batch will throw exception. */ @@ -851,6 +976,31 @@ void sendSingleMessage(boolean isV2) { Assertions.assertEquals(Section.SectionType.Data, message.getBody().getType()); } + /** + * Verifies that sendMessage(single) does NOT cap at MAX_BATCH_SIZE_BYTES on a Premium-like link (100 MB). + * The single-message path goes through sendFluxInternal → AmqpMessageCollector which bypasses + * createMessageBatch() and therefore is not subject to the 1 MB batch cap. + */ + @ParameterizedTest + @MethodSource("selectStack") + void sendSingleMessageNotCappedWithLargeLink(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + final int largeLinkSize = 100 * 1024 * 1024; // 100 MB + final ServiceBusMessage testData = new ServiceBusMessage(TEST_CONTENTS); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); + when(sendLink.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + when(sendLink.send(any(org.apache.qpid.proton.message.Message.class))).thenReturn(Mono.empty()); + + // Act - sendMessage(single) should succeed; the raw link size (100 MB) is used, not 1 MB cap. + StepVerifier.create(sender.sendMessage(testData)).expectComplete().verify(DEFAULT_TIMEOUT); + + // Assert - message was sent successfully (no size rejection from the 1 MB cap) + verify(sendLink, times(1)).send(any(org.apache.qpid.proton.message.Message.class)); + } + @ParameterizedTest @MethodSource("selectStack") void scheduleMessage(boolean isV2) { @@ -907,6 +1057,185 @@ void scheduleMessageWithTransaction(boolean isV2) { Assertions.assertEquals(message, actualMessages.get(0)); } + /** + * Verifies that scheduleMessages(Iterable) internally uses createMessageBatch() which caps at + * MAX_BATCH_SIZE_BYTES (1 MB) even when the link reports a much larger size. The capped batch max + * size is passed as maxSize to managementNode.schedule(), NOT the raw 100 MB. + */ + @ParameterizedTest + @MethodSource("selectStack") + void scheduleMessagesIterableWithLargeLinkCapsAt1MB(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + final int largeLinkSize = 100 * 1024 * 1024; // 100 MB + final long sequenceNumberReturned = 42L; + final OffsetDateTime instant = mock(OffsetDateTime.class); + final int count = 3; + final List messages = TestUtils.getServiceBusMessages(count, UUID.randomUUID().toString()); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); + when(sendLink.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + when(managementNode.schedule(anyList(), eq(instant), eq(MAX_BATCH_SIZE_BYTES), eq(LINK_NAME), isNull())) + .thenReturn(Flux.fromStream(IntStream.range(0, count).mapToObj(i -> sequenceNumberReturned + i))); + + // Act & Assert - scheduleMessages(Iterable) → createMessageBatch() caps at 1 MB, + // then passes MAX_BATCH_SIZE_BYTES to managementNode.schedule(), NOT the raw 100 MB. + StepVerifier.create(sender.scheduleMessages(messages, instant)) + .expectNextCount(count) + .expectComplete() + .verify(DEFAULT_TIMEOUT); + + verify(managementNode).schedule(anyList(), eq(instant), eq(MAX_BATCH_SIZE_BYTES), eq(LINK_NAME), isNull()); + } + + /** + * Verifies that scheduleMessage(single) passes the raw link size (100 MB) to managementNode.schedule(), + * NOT the 1 MB cap. The single-message schedule path (scheduleMessageInternal) does not go through + * createMessageBatch() and therefore is not capped. + */ + @ParameterizedTest + @MethodSource("selectStack") + void scheduleMessageSingleNotCappedWithLargeLink(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + final int largeLinkSize = 100 * 1024 * 1024; // 100 MB + final long sequenceNumberReturned = 10; + final OffsetDateTime instant = mock(OffsetDateTime.class); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); + when(sendLink.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + when(managementNode.schedule(anyList(), eq(instant), eq(largeLinkSize), eq(LINK_NAME), isNull())) + .thenReturn(Flux.just(sequenceNumberReturned)); + + // Act & Assert + StepVerifier.create(sender.scheduleMessage(message, instant)) + .expectNext(sequenceNumberReturned) + .expectComplete() + .verify(DEFAULT_TIMEOUT); + + // Verify managementNode.schedule() received the raw 100 MB link size, not the 1 MB cap + verify(managementNode).schedule(sbMessagesCaptor.capture(), eq(instant), eq(largeLinkSize), eq(LINK_NAME), + isNull()); + List actualMessages = sbMessagesCaptor.getValue(); + Assertions.assertNotNull(actualMessages); + Assertions.assertEquals(1, actualMessages.size()); + Assertions.assertEquals(message, actualMessages.get(0)); + } + + /** + * Verifies that sendMessage(single) with a message larger than 1 MB succeeds on a large link. + * This proves the single-message path (sendFluxInternal -> AmqpMessageCollector) is NOT capped + * at MAX_BATCH_SIZE_BYTES (1 MB). On Premium namespaces with large per-entity limits, individual + * messages exceeding 1 MB are valid. + */ + @ParameterizedTest + @MethodSource("selectStack") + void sendSingleMessageLargerThan1MBSucceedsWithLargeLink(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + final int largeLinkSize = 5 * 1024 * 1024; // 5 MB + // Create a message with payload > 1 MB. Serialized size will be ~2 MB + AMQP overhead. + final ServiceBusMessage largeMessage = new ServiceBusMessage(BinaryData.fromBytes(new byte[2 * 1024 * 1024])); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); + when(sendLink.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + when(sendLink.send(any(org.apache.qpid.proton.message.Message.class))).thenReturn(Mono.empty()); + + // Act & Assert - A 2 MB message on a 5 MB link succeeds because sendMessage(single) uses the raw + // link size (5 MB) via sendFluxInternal, NOT the 1 MB batch cap. + StepVerifier.create(sender.sendMessage(largeMessage)).expectComplete().verify(DEFAULT_TIMEOUT); + + verify(sendLink, times(1)).send(any(org.apache.qpid.proton.message.Message.class)); + } + + /** + * Verifies that sendMessages(Iterable) rejects a single message larger than 1 MB even on a large link. + * This proves the asymmetry: the same message that succeeds via sendMessage(single) FAILS via + * sendMessages(Iterable) because the iterable path goes through createMessageBatch() which caps + * the batch at MAX_BATCH_SIZE_BYTES (1 MB). + */ + @ParameterizedTest + @MethodSource("selectStack") + void sendMessagesIterableRejectsSingleMessageLargerThan1MBOnLargeLink(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + final int largeLinkSize = 5 * 1024 * 1024; // 5 MB + // Create a single message with payload > 1 MB. + final ServiceBusMessage largeMessage = new ServiceBusMessage(BinaryData.fromBytes(new byte[2 * 1024 * 1024])); + final List messages = new ArrayList<>(); + messages.add(largeMessage); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); + when(sendLink.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + + // Act & Assert - The iterable path uses createMessageBatch() which caps at 1 MB. + // The 2 MB message cannot fit in the 1 MB-capped batch, so it fails. + StepVerifier.create(sender.sendMessages(messages)) + .expectError(ServiceBusException.class) + .verify(DEFAULT_TIMEOUT); + + verify(sendLink, never()).send(anyList()); + verify(sendLink, never()).send(any(org.apache.qpid.proton.message.Message.class)); + } + + /** + * Verifies that scheduleMessages(Iterable) rejects a single message larger than 1 MB on a large link. + * The schedule iterable path goes through createMessageBatch() -> tryAddMessage for all messages. + * Unlike sendMessages(Iterable), scheduleMessages(Iterable) does NOT auto-split: all messages + * must fit in one batch. + */ + @ParameterizedTest + @MethodSource("selectStack") + void scheduleMessagesIterableRejectsSingleMessageLargerThan1MBOnLargeLink(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + final int largeLinkSize = 5 * 1024 * 1024; // 5 MB + final OffsetDateTime instant = mock(OffsetDateTime.class); + // Create a single message with payload > 1 MB. + final ServiceBusMessage largeMessage = new ServiceBusMessage(BinaryData.fromBytes(new byte[2 * 1024 * 1024])); + final List messages = new ArrayList<>(); + messages.add(largeMessage); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); + when(sendLink.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + + // Act & Assert - The schedule iterable path uses createMessageBatch() which caps at 1 MB. + // The 2 MB message cannot fit, so it fails with ServiceBusException. + StepVerifier.create(sender.scheduleMessages(messages, instant)) + .expectError(ServiceBusException.class) + .verify(DEFAULT_TIMEOUT); + + verify(managementNode, never()).schedule(anyList(), any(), anyInt(), any(), any()); + } + + /** + * Verifies that createMessageBatch() falls back to MAX_MESSAGE_LENGTH_BYTES when the link reports + * size 0. The fallback (256 KB) is smaller than MAX_BATCH_SIZE_BYTES (1 MB), so the batch max + * size equals the fallback value. + */ + @ParameterizedTest + @MethodSource("selectStack") + void createBatchFallbackWhenLinkReportsZeroSize(boolean isV2) { + // Arrange + arrangeIfV2(isV2); + final AmqpSendLink link = mock(AmqpSendLink.class); + when(link.getLinkSize()).thenReturn(Mono.just(0)); // Link reports zero + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link)); + + // Act & Assert - When link size is 0, fallback is MAX_MESSAGE_LENGTH_BYTES (256 KB), then + // Math.min(256KB, 1MB) = 256 KB. + StepVerifier.create(sender.createMessageBatch()).assertNext(batch -> { + Assertions.assertEquals(MAX_MESSAGE_LENGTH_BYTES, batch.getMaxSizeInBytes()); + }).expectComplete().verify(DEFAULT_TIMEOUT); + } + @ParameterizedTest @MethodSource("selectStack") void cancelScheduleMessage(boolean isV2) { From 1b80911364211b357c5c712b7c898ba642bc5f0f Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Wed, 8 Apr 2026 09:00:33 -0700 Subject: [PATCH 02/12] Read max-message-batch-size vendor property for batch sizing - Add AmqpConstants.MAX_MESSAGE_BATCH_SIZE for com.microsoft:max-message-batch-size - Add AmqpSendLink.getMaxBatchSize() default method (falls back to getLinkSize()) - Override in ReactorSender to read vendor property from link remote properties - ServiceBusSenderAsyncClient.createMessageBatch() uses getMaxBatchSize() instead of getLinkSize(), with 1 MB fallback when vendor property is absent - Single-message paths (sendMessage, scheduleMessage) continue using getLinkSize() - 6 new ReactorSenderTest tests for vendor property, fallback, and edge cases - 2 new ServiceBusSenderAsyncClientTest tests for asymmetry and vendor property - Bump azure-core-amqp dependency to 2.12.0-beta.1 - Add CHANGELOG entry under Bugs Fixed --- .../amqp/implementation/AmqpConstants.java | 7 + .../amqp/implementation/AmqpSendLink.java | 12 ++ .../amqp/implementation/ReactorSender.java | 46 +++++++ .../implementation/ReactorSenderTest.java | 129 ++++++++++++++++++ .../azure-messaging-servicebus/CHANGELOG.md | 4 + .../azure-messaging-servicebus/pom.xml | 2 +- .../ServiceBusSenderAsyncClient.java | 51 +++---- ...SenderAsyncClientRecoveryIsolatedTest.java | 2 + .../ServiceBusSenderAsyncClientTest.java | 105 ++++++++++---- 9 files changed, 306 insertions(+), 52 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpConstants.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpConstants.java index 3c714b525589..9a6ab80de5b3 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpConstants.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpConstants.java @@ -26,6 +26,13 @@ public final class AmqpConstants { public static final Symbol CLIENT_RECEIVER_IDENTIFIER = Symbol.getSymbol(VENDOR + ":receiver-name"); + /** + * Vendor property advertised on the sender link by Service Bus to indicate the maximum batch payload size, + * which may differ from the standard AMQP {@code max-message-size} (e.g., on Premium large-message entities + * the link reports {@code max-message-size} up to 100 MB but enforces 1 MB for batched sends). + */ + public static final Symbol MAX_MESSAGE_BATCH_SIZE = Symbol.getSymbol(VENDOR + ":max-message-batch-size"); + private AmqpConstants() { } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpSendLink.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpSendLink.java index 341111d441a7..d9688f53ee33 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpSendLink.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpSendLink.java @@ -77,6 +77,18 @@ public interface AmqpSendLink extends AmqpLink { */ Mono getLinkSize(); + /** + * Gets the maximum batch payload size for the send link. This reads the vendor property + * {@code com.microsoft:max-message-batch-size} from the remote link attach frame, which may be smaller + * than {@link #getLinkSize()} (e.g., 1 MB for batches vs. 100 MB for single messages on Premium + * large-message entities). If the vendor property is not present, falls back to {@link #getLinkSize()}. + * + * @return A Mono that completes and returns the maximum batch size for the send link. + */ + default Mono getMaxBatchSize() { + return getLinkSize(); + } + /** * Gets the context for this AMQP send link. * diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java index dea2c8ba82a1..dec00715785a 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java @@ -110,6 +110,7 @@ class ReactorSender implements AmqpSendLink, AsyncCloseable, AutoCloseable { private volatile Exception lastKnownLinkError; private volatile Instant lastKnownErrorReportedAt; private volatile int linkSize; + private volatile int maxBatchSize; /** * Creates an instance of {@link ReactorSender}. @@ -408,6 +409,51 @@ public Mono getLinkSize() { } } + @Override + public Mono getMaxBatchSize() { + if (maxBatchSize > 0) { + return Mono.defer(() -> Mono.just(this.maxBatchSize)); + } + + synchronized (this) { + if (maxBatchSize > 0) { + return Mono.defer(() -> Mono.just(maxBatchSize)); + } + + return RetryUtil + .withRetry(getEndpointStates().takeUntil(state -> state == AmqpEndpointState.ACTIVE), retryOptions, + activeTimeoutMessage) + .then(Mono.fromCallable(() -> { + final Map remoteProperties = sender.getRemoteProperties(); + if (remoteProperties != null + && remoteProperties.containsKey(AmqpConstants.MAX_MESSAGE_BATCH_SIZE)) { + final Object value = remoteProperties.get(AmqpConstants.MAX_MESSAGE_BATCH_SIZE); + // The AMQP property may arrive as UnsignedLong, UnsignedInteger, Long, or Integer. + // intValue() is consistent with getLinkSize() — values > Integer.MAX_VALUE (impossible + // for batch sizes) would overflow to negative and trigger the fallback below. + if (value instanceof Number) { + maxBatchSize = ((Number) value).intValue(); + } + } + + // Fall back to the standard max-message-size if the vendor property is absent or + // non-numeric (expected for non-Service Bus brokers and older service deployments). + if (maxBatchSize <= 0) { + final UnsignedLong remoteMaxMessageSize = sender.getRemoteMaxMessageSize(); + logger.verbose( + "Vendor property '{}' not found or non-numeric on link, " + + "falling back to max-message-size: {}.", + AmqpConstants.MAX_MESSAGE_BATCH_SIZE, remoteMaxMessageSize); + if (remoteMaxMessageSize != null) { + maxBatchSize = remoteMaxMessageSize.intValue(); + } + } + + return maxBatchSize; + })); + } + } + @Override public boolean isDisposed() { return isDisposed.get(); diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSenderTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSenderTest.java index a0e758ea97ee..f05c9c4b9b20 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSenderTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSenderTest.java @@ -186,6 +186,135 @@ public void testLinkSize() { verify(sender).getRemoteMaxMessageSize(); } + /** + * Verifies that getMaxBatchSize reads the vendor property com.microsoft:max-message-batch-size from the link's + * remote properties and returns its value. + */ + @Test + public void testMaxBatchSizeReadsVendorProperty() { + // Arrange — vendor property present with UnsignedLong value (typical AMQP encoding) + final int expectedBatchSize = 1048576; // 1 MB + final java.util.Map remoteProperties = new java.util.HashMap<>(); + remoteProperties.put(AmqpConstants.MAX_MESSAGE_BATCH_SIZE, UnsignedLong.valueOf(expectedBatchSize)); + when(sender.getRemoteProperties()).thenReturn(remoteProperties); + + reactorSender = new ReactorSender(amqpConnection, ENTITY_PATH, sender, handler, reactorProvider, tokenManager, + messageSerializer, options, scheduler, AmqpMetricsProvider.noop()); + + // Act & Assert — vendor property value is used + StepVerifier.create(reactorSender.getMaxBatchSize()) + .expectNext(expectedBatchSize) + .expectComplete() + .verify(VERIFY_TIMEOUT); + + // Second call returns cached value + StepVerifier.create(reactorSender.getMaxBatchSize()) + .expectNext(expectedBatchSize) + .expectComplete() + .verify(VERIFY_TIMEOUT); + + // getRemoteProperties called only once (value was cached) + verify(sender, times(1)).getRemoteProperties(); + } + + /** + * Verifies that getMaxBatchSize falls back to max-message-size when the vendor property is absent. + */ + @Test + public void testMaxBatchSizeFallsBackToMaxMessageSize() { + // Arrange — no vendor property, but max-message-size is available (set in @BeforeEach to 1000) + when(sender.getRemoteProperties()).thenReturn(null); + + reactorSender = new ReactorSender(amqpConnection, ENTITY_PATH, sender, handler, reactorProvider, tokenManager, + messageSerializer, options, scheduler, AmqpMetricsProvider.noop()); + + // Act & Assert — falls back to getRemoteMaxMessageSize (1000, set in setup) + StepVerifier.create(reactorSender.getMaxBatchSize()).expectNext(1000).expectComplete().verify(VERIFY_TIMEOUT); + } + + /** + * Verifies that getMaxBatchSize falls back to max-message-size when the vendor property contains a + * non-numeric value. + */ + @Test + public void testMaxBatchSizeFallsBackWhenPropertyIsNotNumber() { + // Arrange — vendor property present but with a String value + final java.util.Map remoteProperties = new java.util.HashMap<>(); + remoteProperties.put(AmqpConstants.MAX_MESSAGE_BATCH_SIZE, "not-a-number"); + when(sender.getRemoteProperties()).thenReturn(remoteProperties); + + reactorSender = new ReactorSender(amqpConnection, ENTITY_PATH, sender, handler, reactorProvider, tokenManager, + messageSerializer, options, scheduler, AmqpMetricsProvider.noop()); + + // Act & Assert — falls back to getRemoteMaxMessageSize (1000) + StepVerifier.create(reactorSender.getMaxBatchSize()).expectNext(1000).expectComplete().verify(VERIFY_TIMEOUT); + } + + /** + * Verifies that getMaxBatchSize returns a different value from getLinkSize when the vendor property + * differs from max-message-size (the Premium large-message scenario). + */ + @Test + public void testMaxBatchSizeDiffersFromLinkSize() { + // Arrange — link max-message-size = 100 MB, vendor batch property = 1 MB + final int maxMessageSize = 100 * 1024 * 1024; // 100 MB + final int maxBatchSize = 1024 * 1024; // 1 MB + when(sender.getRemoteMaxMessageSize()).thenReturn(UnsignedLong.valueOf(maxMessageSize)); + + final java.util.Map remoteProperties = new java.util.HashMap<>(); + remoteProperties.put(AmqpConstants.MAX_MESSAGE_BATCH_SIZE, UnsignedLong.valueOf(maxBatchSize)); + when(sender.getRemoteProperties()).thenReturn(remoteProperties); + + reactorSender = new ReactorSender(amqpConnection, ENTITY_PATH, sender, handler, reactorProvider, tokenManager, + messageSerializer, options, scheduler, AmqpMetricsProvider.noop()); + + // Act & Assert — getLinkSize returns 100 MB, getMaxBatchSize returns 1 MB + StepVerifier.create(reactorSender.getLinkSize()) + .expectNext(maxMessageSize) + .expectComplete() + .verify(VERIFY_TIMEOUT); + + StepVerifier.create(reactorSender.getMaxBatchSize()) + .expectNext(maxBatchSize) + .expectComplete() + .verify(VERIFY_TIMEOUT); + } + + /** + * Verifies that getMaxBatchSize returns 0 when both the vendor property and max-message-size are absent. + * The caller (ServiceBusSenderAsyncClient) handles this with DEFAULT_MAX_BATCH_SIZE_BYTES. + */ + @Test + public void testMaxBatchSizeReturnsZeroWhenBothAbsent() { + // Arrange — no vendor property and no max-message-size + when(sender.getRemoteProperties()).thenReturn(null); + when(sender.getRemoteMaxMessageSize()).thenReturn(null); + + reactorSender = new ReactorSender(amqpConnection, ENTITY_PATH, sender, handler, reactorProvider, tokenManager, + messageSerializer, options, scheduler, AmqpMetricsProvider.noop()); + + // Act & Assert — returns 0, caller must handle + StepVerifier.create(reactorSender.getMaxBatchSize()).expectNext(0).expectComplete().verify(VERIFY_TIMEOUT); + } + + /** + * Verifies that getMaxBatchSize falls back to max-message-size when the vendor property is present + * but has value 0 (UnsignedLong.valueOf(0)). + */ + @Test + public void testMaxBatchSizeFallsBackWhenPropertyIsZero() { + // Arrange — vendor property present but value is 0 + final java.util.Map remoteProperties = new java.util.HashMap<>(); + remoteProperties.put(AmqpConstants.MAX_MESSAGE_BATCH_SIZE, UnsignedLong.valueOf(0)); + when(sender.getRemoteProperties()).thenReturn(remoteProperties); + + reactorSender = new ReactorSender(amqpConnection, ENTITY_PATH, sender, handler, reactorProvider, tokenManager, + messageSerializer, options, scheduler, AmqpMetricsProvider.noop()); + + // Act & Assert — falls back to getRemoteMaxMessageSize (1000, set in @BeforeEach) + StepVerifier.create(reactorSender.getMaxBatchSize()).expectNext(1000).expectComplete().verify(VERIFY_TIMEOUT); + } + @Test public void testSendWithTransactionFailed() { // Arrange diff --git a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md index fce49f300f01..351e84bd15ed 100644 --- a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md @@ -8,6 +8,10 @@ ### Bugs Fixed +- Fixed `ServiceBusMessageBatch` accepting messages beyond the service-enforced batch size limit on + Premium large-message entities by reading the `com.microsoft:max-message-batch-size` vendor property + from the AMQP sender link instead of using `max-message-size`. ([#48214](https://github.com/Azure/azure-sdk-for-java/pull/48214)) + ### Other Changes ## 7.17.17 (2026-01-29) diff --git a/sdk/servicebus/azure-messaging-servicebus/pom.xml b/sdk/servicebus/azure-messaging-servicebus/pom.xml index 47390eae5860..7dcb218ab3b6 100644 --- a/sdk/servicebus/azure-messaging-servicebus/pom.xml +++ b/sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -69,7 +69,7 @@ com.azure azure-core-amqp - 2.11.3 + 2.12.0-beta.1 com.azure diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java index 35abdc92caa5..f3da6d4a0688 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java @@ -222,16 +222,12 @@ public final class ServiceBusSenderAsyncClient implements AutoCloseable { * The default maximum allowable size, in bytes, for a batch to be sent. */ static final int MAX_MESSAGE_LENGTH_BYTES = 256 * 1024; - // Temporary workaround: Service Bus enforces a maximum batch payload size of 1 MB that is not - // communicated via the AMQP link's max-message-size property. The link reports the per-message - // limit (up to 100 MB for Premium partitioned), but the broker rejects batch sends above 1 MB. - // This cap is applied only in createMessageBatch(), which is the single enforcement point for - // batch size limits. The sendMessages(iterable) and scheduleMessages(iterable) paths use - // createMessageBatch() internally and are therefore also capped. Single-message paths - // (sendMessage, scheduleMessage) are not capped since individual messages on Premium can - // validly exceed 1 MB up to the per-entity limit. - // Tracked by: https://github.com/Azure/azure-service-bus/issues/708 - static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024; + // Fallback batch size limit (1 MB) used when the service does not advertise the vendor property + // com.microsoft:max-message-batch-size on the AMQP sender link. When the property is present, + // its value is used directly. This fallback protects against older service deployments that + // do not advertise the property — without it, the SDK would use max-message-size (up to 100 MB + // on Premium large-message entities) for batch sizing, and the broker would reject. + static final int DEFAULT_MAX_BATCH_SIZE_BYTES = 1024 * 1024; private static final String TRANSACTION_LINK_NAME = "coordinator"; private static final ServiceBusMessage END = new ServiceBusMessage(new byte[0]); private static final CreateMessageBatchOptions DEFAULT_BATCH_OPTIONS = new CreateMessageBatchOptions(); @@ -472,19 +468,24 @@ public Mono createMessageBatch(CreateMessageBatchOptions final int maxSize = options.getMaximumSizeInBytes(); - return getSendLinkWithRetry("create-batch").flatMap(link -> link.getLinkSize().flatMap(size -> { - final int maximumLinkSize = Math.min(size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES, MAX_BATCH_SIZE_BYTES); - if (maxSize > maximumLinkSize) { - return monoError(logger, - new IllegalArgumentException(String.format(Locale.US, - "CreateMessageBatchOptions.getMaximumSizeInBytes (%s bytes) is larger than the maximum" - + " allowed size (%s bytes).", - maxSize, maximumLinkSize))); - } - final int batchSize = maxSize > 0 ? Math.min(maxSize, maximumLinkSize) : maximumLinkSize; - return Mono - .just(new ServiceBusMessageBatch(isV2, batchSize, link::getErrorContext, tracer, messageSerializer)); - })).onErrorMap(this::mapError); + return getSendLinkWithRetry("create-batch") + .flatMap(link -> link.getMaxBatchSize().flatMap(batchSizeFromLink -> { + // Use the value from getMaxBatchSize() (vendor property, or standard max-message-size fallback + // in ReactorSender). If neither is available (batchSizeFromLink <= 0), use 1 MB as a + // last-resort default to prevent oversized batches on broken links. + final int maximumLinkSize = batchSizeFromLink > 0 ? batchSizeFromLink : DEFAULT_MAX_BATCH_SIZE_BYTES; + if (maxSize > maximumLinkSize) { + return monoError(logger, + new IllegalArgumentException(String.format(Locale.US, + "CreateMessageBatchOptions.getMaximumSizeInBytes (%s bytes) is larger than the maximum" + + " allowed size (%s bytes).", + maxSize, maximumLinkSize))); + } + final int batchSize = maxSize > 0 ? Math.min(maxSize, maximumLinkSize) : maximumLinkSize; + return Mono.just( + new ServiceBusMessageBatch(isV2, batchSize, link::getErrorContext, tracer, messageSerializer)); + })) + .onErrorMap(this::mapError); } /** @@ -893,6 +894,10 @@ private Mono sendFluxInternal(Flux messages, new IllegalStateException(String.format(INVALID_OPERATION_DISPOSED_SENDER, "sendMessage"))); } + // Uses getLinkSize() intentionally — this path is for single-message sends only (sendMessage()). + // Single messages should use the full link capacity (up to 100 MB on Premium large-message), + // not the batch-size cap. The batch path (sendMessages(Iterable), scheduleMessages(Iterable)) + // goes through createMessageBatch() which calls getMaxBatchSize() for the vendor-property-based cap. final Mono> batchList = getSendLinkWithRetry("send-batches").flatMap(link -> link.getLinkSize().flatMap(size -> { final int batchSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientRecoveryIsolatedTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientRecoveryIsolatedTest.java index 208ec00e42b4..9f70ed644f5e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientRecoveryIsolatedTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientRecoveryIsolatedTest.java @@ -1312,6 +1312,8 @@ void arrange() { doNothing().when(sender).open(); when(amqpSendLink.getLinkSize()) .thenReturn(Mono.just(ServiceBusSenderAsyncClient.MAX_MESSAGE_LENGTH_BYTES)); + when(amqpSendLink.getMaxBatchSize()) + .thenReturn(Mono.just(ServiceBusSenderAsyncClient.MAX_MESSAGE_LENGTH_BYTES)); when(amqpSendLink.getEndpointStates()) .thenReturn(sendLinkStateSink.asFlux().distinctUntilChanged().map(state -> toAmqpEndpointState(state))); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java index a170d0e3d3dd..f040caba9268 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java @@ -75,7 +75,7 @@ import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY; import static com.azure.core.util.tracing.Tracer.PARENT_TRACE_CONTEXT_KEY; import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY; -import static com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.MAX_BATCH_SIZE_BYTES; +import static com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.DEFAULT_MAX_BATCH_SIZE_BYTES; import static com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.MAX_MESSAGE_LENGTH_BYTES; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -178,6 +178,7 @@ void setup() { .thenReturn(just(managementNode)); when(sendLink.getLinkSize()).thenReturn(Mono.just(ServiceBusSenderAsyncClient.MAX_MESSAGE_LENGTH_BYTES)); + when(sendLink.getMaxBatchSize()).thenReturn(Mono.just(ServiceBusSenderAsyncClient.MAX_MESSAGE_LENGTH_BYTES)); when(sendLink.getLinkName()).thenReturn(LINK_NAME); doNothing().when(onClientClose).run(); @@ -249,6 +250,7 @@ void createBatchWhenSizeTooBig(boolean isV2) { final AmqpSendLink link = mock(AmqpSendLink.class); when(link.getLinkSize()).thenReturn(Mono.just(maxLinkSize)); + when(link.getMaxBatchSize()).thenReturn(Mono.just(maxLinkSize)); when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link)); @@ -279,6 +281,7 @@ void createsMessageBatchWithSize(boolean isV2) { final AmqpSendLink link = mock(AmqpSendLink.class); when(link.getLinkSize()).thenReturn(Mono.just(maxLinkSize)); + when(link.getMaxBatchSize()).thenReturn(Mono.just(maxLinkSize)); // EC is the prefix they use when creating a link that sends to the service round-robin. when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), @@ -304,31 +307,33 @@ void createsMessageBatchWithSize(boolean isV2) { } /** - * Verifies that the batch max size is capped at MAX_BATCH_SIZE_BYTES (1 MB) when the link reports a larger size. - * This simulates a Premium partitioned namespace where the link advertises up to 100 MB per-message. + * Verifies that the batch max size uses the vendor property (max-message-batch-size) from the link, + * which returns 1 MB even when the link's max-message-size reports 100 MB (Premium large-message). */ @ParameterizedTest @MethodSource("selectStack") void createBatchCappedAtMaxBatchSizeWhenLinkReportsLargerSize(boolean isV2) { // Arrange arrangeIfV2(isV2); - int largeLinkSize = 100 * 1024 * 1024; // 100 MB + int largeLinkSize = 100 * 1024 * 1024; // 100 MB - max-message-size on Premium large-message + int vendorBatchSize = 1024 * 1024; // 1 MB - com.microsoft:max-message-batch-size final AmqpSendLink link = mock(AmqpSendLink.class); when(link.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + when(link.getMaxBatchSize()).thenReturn(Mono.just(vendorBatchSize)); when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link)); // Act & Assert StepVerifier.create(sender.createMessageBatch()).assertNext(batch -> { - Assertions.assertEquals(MAX_BATCH_SIZE_BYTES, batch.getMaxSizeInBytes()); + Assertions.assertEquals(vendorBatchSize, batch.getMaxSizeInBytes()); }).expectComplete().verify(DEFAULT_TIMEOUT); } /** - * Verifies that the batch max size uses the link size when it is smaller than MAX_BATCH_SIZE_BYTES (1 MB). - * This simulates a Standard namespace where the link advertises 256 KB. + * Verifies that the batch max size uses the vendor property value when it is smaller than 1 MB. + * This simulates a Standard namespace where the link advertises 256 KB for both properties. */ @ParameterizedTest @MethodSource("selectStack") @@ -339,6 +344,7 @@ void createBatchUsesLinkSizeWhenSmallerThanMaxBatchSize(boolean isV2) { final AmqpSendLink link = mock(AmqpSendLink.class); when(link.getLinkSize()).thenReturn(Mono.just(smallLinkSize)); + when(link.getMaxBatchSize()).thenReturn(Mono.just(smallLinkSize)); when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link)); @@ -358,10 +364,12 @@ void createBatchWithOptionsExceedingMaxBatchSizeCapThrowsError(boolean isV2) { // Arrange arrangeIfV2(isV2); int largeLinkSize = 100 * 1024 * 1024; // 100 MB - int requestedBatchSize = 2 * 1024 * 1024; // 2 MB - exceeds 1 MB cap + int vendorBatchSize = 1024 * 1024; // 1 MB + int requestedBatchSize = 2 * 1024 * 1024; // 2 MB - exceeds vendor property final AmqpSendLink link = mock(AmqpSendLink.class); when(link.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + when(link.getMaxBatchSize()).thenReturn(Mono.just(vendorBatchSize)); when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link)); @@ -385,10 +393,12 @@ void createBatchWithOptionsSmallerThanMaxBatchSizeCapIsRespected(boolean isV2) { // Arrange arrangeIfV2(isV2); int largeLinkSize = 100 * 1024 * 1024; // 100 MB + int vendorBatchSize = 1024 * 1024; // 1 MB int requestedBatchSize = 500 * 1024; // 500 KB final AmqpSendLink link = mock(AmqpSendLink.class); when(link.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + when(link.getMaxBatchSize()).thenReturn(Mono.just(vendorBatchSize)); when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link)); @@ -416,10 +426,10 @@ void scheduleMessageSizeTooBig(boolean isV2) { final AmqpSendLink link = mock(AmqpSendLink.class); when(link.getLinkSize()).thenReturn(Mono.just(maxLinkSize)); + when(link.getMaxBatchSize()).thenReturn(Mono.just(maxLinkSize)); when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link)); - when(link.getLinkSize()).thenReturn(Mono.just(maxLinkSize)); // Act & Assert StepVerifier.create(sender.scheduleMessages(messages, instant)) @@ -846,8 +856,8 @@ void sendMessagesList(boolean isV2) { } /** - * Verifies that sendMessages(Iterable) internally uses createMessageBatch() which caps at - * MAX_BATCH_SIZE_BYTES (1 MB) even when the link reports a much larger size (e.g. 100 MB Premium). + * Verifies that sendMessages(Iterable) internally uses createMessageBatch() which reads the vendor + * property (1 MB) even when the link's max-message-size reports 100 MB (Premium large-message). * The sendIterable → sendNextIterableBatch → createMessageBatch() path is covered. */ @ParameterizedTest @@ -855,18 +865,20 @@ void sendMessagesList(boolean isV2) { void sendMessagesIterableWithLargeLinkCapsAt1MB(boolean isV2) { // Arrange arrangeIfV2(isV2); - final int largeLinkSize = 100 * 1024 * 1024; // 100 MB - simulates Premium partitioned namespace + final int largeLinkSize = 100 * 1024 * 1024; // 100 MB - max-message-size on Premium large-message + final int vendorBatchSize = 1024 * 1024; // 1 MB - com.microsoft:max-message-batch-size final int count = 4; final List messages = TestUtils.getServiceBusMessages(count, UUID.randomUUID().toString()); when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); when(sendLink.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + when(sendLink.getMaxBatchSize()).thenReturn(Mono.just(vendorBatchSize)); when(sendLink.send(anyList())).thenReturn(Mono.empty()); when(sendLink.send(any(Message.class))).thenReturn(Mono.empty()); - // Act - sendMessages(Iterable) goes through sendIterable → createMessageBatch() which caps at 1 MB. - // Small messages still fit within the 1 MB cap, so operation completes successfully. + // Act - sendMessages(Iterable) goes through sendIterable → createMessageBatch() which uses + // the vendor property (1 MB). Small messages still fit, so operation completes successfully. StepVerifier.create(sender.sendMessages(messages)).expectComplete().verify(DEFAULT_TIMEOUT); } @@ -885,6 +897,7 @@ void sendMessagesListExceedSize(boolean isV2) { when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); when(sendLink.getLinkSize()).thenReturn(linkMaxSize); + when(sendLink.getMaxBatchSize()).thenReturn(linkMaxSize); // Act & Assert StepVerifier.create(sender.sendMessages(messages)) @@ -1001,6 +1014,36 @@ void sendSingleMessageNotCappedWithLargeLink(boolean isV2) { verify(sendLink, times(1)).send(any(org.apache.qpid.proton.message.Message.class)); } + /** + * End-to-end asymmetry test: on the SAME link with max-message-size=100 MB and vendor batch property=1 MB, + * verifies that createMessageBatch() caps at 1 MB while sendMessage(single) uses the full 100 MB. + * This is the core Premium large-message scenario that this change addresses. + */ + @ParameterizedTest + @MethodSource("selectStack") + void premiumLargeMessageAsymmetry(boolean isV2) { + // Arrange — same link advertises 100 MB max-message-size and 1 MB max-message-batch-size + arrangeIfV2(isV2); + final int largeLinkSize = 100 * 1024 * 1024; // 100 MB - max-message-size + final int vendorBatchSize = 1024 * 1024; // 1 MB - com.microsoft:max-message-batch-size + final ServiceBusMessage testData = new ServiceBusMessage(TEST_CONTENTS); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), + eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); + when(sendLink.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + when(sendLink.getMaxBatchSize()).thenReturn(Mono.just(vendorBatchSize)); + when(sendLink.send(any(org.apache.qpid.proton.message.Message.class))).thenReturn(Mono.empty()); + + // Act & Assert 1: createMessageBatch() uses vendor property → batch capped at 1 MB + StepVerifier.create(sender.createMessageBatch()).assertNext(batch -> { + Assertions.assertEquals(vendorBatchSize, batch.getMaxSizeInBytes()); + }).expectComplete().verify(DEFAULT_TIMEOUT); + + // Act & Assert 2: sendMessage(single) uses full link size → succeeds (100 MB capacity) + StepVerifier.create(sender.sendMessage(testData)).expectComplete().verify(DEFAULT_TIMEOUT); + verify(sendLink, times(1)).send(any(org.apache.qpid.proton.message.Message.class)); + } + @ParameterizedTest @MethodSource("selectStack") void scheduleMessage(boolean isV2) { @@ -1058,8 +1101,8 @@ void scheduleMessageWithTransaction(boolean isV2) { } /** - * Verifies that scheduleMessages(Iterable) internally uses createMessageBatch() which caps at - * MAX_BATCH_SIZE_BYTES (1 MB) even when the link reports a much larger size. The capped batch max + * Verifies that scheduleMessages(Iterable) internally uses createMessageBatch() which reads the vendor + * property (1 MB) even when the link's max-message-size reports 100 MB. The capped batch max * size is passed as maxSize to managementNode.schedule(), NOT the raw 100 MB. */ @ParameterizedTest @@ -1068,6 +1111,7 @@ void scheduleMessagesIterableWithLargeLinkCapsAt1MB(boolean isV2) { // Arrange arrangeIfV2(isV2); final int largeLinkSize = 100 * 1024 * 1024; // 100 MB + final int vendorBatchSize = 1024 * 1024; // 1 MB - com.microsoft:max-message-batch-size final long sequenceNumberReturned = 42L; final OffsetDateTime instant = mock(OffsetDateTime.class); final int count = 3; @@ -1076,17 +1120,18 @@ void scheduleMessagesIterableWithLargeLinkCapsAt1MB(boolean isV2) { when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); when(sendLink.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); - when(managementNode.schedule(anyList(), eq(instant), eq(MAX_BATCH_SIZE_BYTES), eq(LINK_NAME), isNull())) + when(sendLink.getMaxBatchSize()).thenReturn(Mono.just(vendorBatchSize)); + when(managementNode.schedule(anyList(), eq(instant), eq(vendorBatchSize), eq(LINK_NAME), isNull())) .thenReturn(Flux.fromStream(IntStream.range(0, count).mapToObj(i -> sequenceNumberReturned + i))); - // Act & Assert - scheduleMessages(Iterable) → createMessageBatch() caps at 1 MB, - // then passes MAX_BATCH_SIZE_BYTES to managementNode.schedule(), NOT the raw 100 MB. + // Act & Assert - scheduleMessages(Iterable) → createMessageBatch() reads vendor property (1 MB), + // then passes that value to managementNode.schedule(), NOT the raw 100 MB. StepVerifier.create(sender.scheduleMessages(messages, instant)) .expectNextCount(count) .expectComplete() .verify(DEFAULT_TIMEOUT); - verify(managementNode).schedule(anyList(), eq(instant), eq(MAX_BATCH_SIZE_BYTES), eq(LINK_NAME), isNull()); + verify(managementNode).schedule(anyList(), eq(instant), eq(vendorBatchSize), eq(LINK_NAME), isNull()); } /** @@ -1154,8 +1199,8 @@ void sendSingleMessageLargerThan1MBSucceedsWithLargeLink(boolean isV2) { /** * Verifies that sendMessages(Iterable) rejects a single message larger than 1 MB even on a large link. * This proves the asymmetry: the same message that succeeds via sendMessage(single) FAILS via - * sendMessages(Iterable) because the iterable path goes through createMessageBatch() which caps - * the batch at MAX_BATCH_SIZE_BYTES (1 MB). + * sendMessages(Iterable) because the iterable path goes through createMessageBatch() which uses + * the vendor property (1 MB) for batch sizing. */ @ParameterizedTest @MethodSource("selectStack") @@ -1163,6 +1208,7 @@ void sendMessagesIterableRejectsSingleMessageLargerThan1MBOnLargeLink(boolean is // Arrange arrangeIfV2(isV2); final int largeLinkSize = 5 * 1024 * 1024; // 5 MB + final int vendorBatchSize = 1024 * 1024; // 1 MB // Create a single message with payload > 1 MB. final ServiceBusMessage largeMessage = new ServiceBusMessage(BinaryData.fromBytes(new byte[2 * 1024 * 1024])); final List messages = new ArrayList<>(); @@ -1171,6 +1217,7 @@ void sendMessagesIterableRejectsSingleMessageLargerThan1MBOnLargeLink(boolean is when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); when(sendLink.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + when(sendLink.getMaxBatchSize()).thenReturn(Mono.just(vendorBatchSize)); // Act & Assert - The iterable path uses createMessageBatch() which caps at 1 MB. // The 2 MB message cannot fit in the 1 MB-capped batch, so it fails. @@ -1194,6 +1241,7 @@ void scheduleMessagesIterableRejectsSingleMessageLargerThan1MBOnLargeLink(boolea // Arrange arrangeIfV2(isV2); final int largeLinkSize = 5 * 1024 * 1024; // 5 MB + final int vendorBatchSize = 1024 * 1024; // 1 MB final OffsetDateTime instant = mock(OffsetDateTime.class); // Create a single message with payload > 1 MB. final ServiceBusMessage largeMessage = new ServiceBusMessage(BinaryData.fromBytes(new byte[2 * 1024 * 1024])); @@ -1203,6 +1251,7 @@ void scheduleMessagesIterableRejectsSingleMessageLargerThan1MBOnLargeLink(boolea when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); when(sendLink.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); + when(sendLink.getMaxBatchSize()).thenReturn(Mono.just(vendorBatchSize)); // Act & Assert - The schedule iterable path uses createMessageBatch() which caps at 1 MB. // The 2 MB message cannot fit, so it fails with ServiceBusException. @@ -1214,9 +1263,9 @@ void scheduleMessagesIterableRejectsSingleMessageLargerThan1MBOnLargeLink(boolea } /** - * Verifies that createMessageBatch() falls back to MAX_MESSAGE_LENGTH_BYTES when the link reports - * size 0. The fallback (256 KB) is smaller than MAX_BATCH_SIZE_BYTES (1 MB), so the batch max - * size equals the fallback value. + * Verifies that createMessageBatch() falls back to DEFAULT_MAX_BATCH_SIZE_BYTES when the link reports + * size 0 from getMaxBatchSize(). This simulates a link where the vendor property is absent and the + * remote max-message-size is also unavailable. */ @ParameterizedTest @MethodSource("selectStack") @@ -1225,14 +1274,14 @@ void createBatchFallbackWhenLinkReportsZeroSize(boolean isV2) { arrangeIfV2(isV2); final AmqpSendLink link = mock(AmqpSendLink.class); when(link.getLinkSize()).thenReturn(Mono.just(0)); // Link reports zero + when(link.getMaxBatchSize()).thenReturn(Mono.just(0)); // Vendor property also absent when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull(), eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(link)); - // Act & Assert - When link size is 0, fallback is MAX_MESSAGE_LENGTH_BYTES (256 KB), then - // Math.min(256KB, 1MB) = 256 KB. + // Act & Assert - When getMaxBatchSize() is 0, fallback is DEFAULT_MAX_BATCH_SIZE_BYTES (1 MB). StepVerifier.create(sender.createMessageBatch()).assertNext(batch -> { - Assertions.assertEquals(MAX_MESSAGE_LENGTH_BYTES, batch.getMaxSizeInBytes()); + Assertions.assertEquals(DEFAULT_MAX_BATCH_SIZE_BYTES, batch.getMaxSizeInBytes()); }).expectComplete().verify(DEFAULT_TIMEOUT); } From fba6852ff06e017101c7758c3b32e17f689cd791 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Wed, 8 Apr 2026 09:10:36 -0700 Subject: [PATCH 03/12] Improve error message to say 'maximum batch size' instead of 'maximum allowed size' --- .../azure/messaging/servicebus/ServiceBusSenderAsyncClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java index f3da6d4a0688..2403b0b5c7b7 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java @@ -478,7 +478,7 @@ public Mono createMessageBatch(CreateMessageBatchOptions return monoError(logger, new IllegalArgumentException(String.format(Locale.US, "CreateMessageBatchOptions.getMaximumSizeInBytes (%s bytes) is larger than the maximum" - + " allowed size (%s bytes).", + + " batch size (%s bytes).", maxSize, maximumLinkSize))); } final int batchSize = maxSize > 0 ? Math.min(maxSize, maximumLinkSize) : maximumLinkSize; From 1e8aa33d65a5aa29259469ba8ee984007d145179 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Wed, 8 Apr 2026 09:17:20 -0700 Subject: [PATCH 04/12] Update MAX_MESSAGE_LENGTH_BYTES JavaDoc to reflect actual usage as link-size fallback --- .../azure/messaging/servicebus/ServiceBusSenderAsyncClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java index 2403b0b5c7b7..5540f501b433 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java @@ -219,7 +219,7 @@ @ServiceClient(builder = ServiceBusClientBuilder.class, isAsync = true) public final class ServiceBusSenderAsyncClient implements AutoCloseable { /** - * The default maximum allowable size, in bytes, for a batch to be sent. + * Fallback maximum message size (256 KB) used when the AMQP link does not report a size. */ static final int MAX_MESSAGE_LENGTH_BYTES = 256 * 1024; // Fallback batch size limit (1 MB) used when the service does not advertise the vendor property From 8671b033bd06113ccc9d568b5bfb7a36c6341ecc Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Wed, 8 Apr 2026 12:05:12 -0700 Subject: [PATCH 05/12] Rename test to createBatchUsesVendorBatchSizeOnStandardNamespace for clarity --- .../servicebus/ServiceBusSenderAsyncClientTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java index f040caba9268..d7b0358d1cf4 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java @@ -332,12 +332,12 @@ void createBatchCappedAtMaxBatchSizeWhenLinkReportsLargerSize(boolean isV2) { } /** - * Verifies that the batch max size uses the vendor property value when it is smaller than 1 MB. - * This simulates a Standard namespace where the link advertises 256 KB for both properties. + * Verifies that the batch max size uses the vendor property value (256 KB) on a Standard namespace. + * Both max-message-size and max-message-batch-size report 256 KB on Standard tier. */ @ParameterizedTest @MethodSource("selectStack") - void createBatchUsesLinkSizeWhenSmallerThanMaxBatchSize(boolean isV2) { + void createBatchUsesVendorBatchSizeOnStandardNamespace(boolean isV2) { // Arrange arrangeIfV2(isV2); int smallLinkSize = 256 * 1024; // 256 KB From e6e0150e3319b432d1a2da0cfb2a18143eb2c78f Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Wed, 8 Apr 2026 14:07:03 -0700 Subject: [PATCH 06/12] Clarify JavaDoc on default getMaxBatchSize(), rename maximumLinkSize to maximumBatchSize, simplify redundant Math.min --- .../azure/core/amqp/implementation/AmqpSendLink.java | 10 ++++++---- .../servicebus/ServiceBusSenderAsyncClient.java | 8 ++++---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpSendLink.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpSendLink.java index d9688f53ee33..894dcabf3f75 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpSendLink.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpSendLink.java @@ -78,10 +78,12 @@ public interface AmqpSendLink extends AmqpLink { Mono getLinkSize(); /** - * Gets the maximum batch payload size for the send link. This reads the vendor property - * {@code com.microsoft:max-message-batch-size} from the remote link attach frame, which may be smaller - * than {@link #getLinkSize()} (e.g., 1 MB for batches vs. 100 MB for single messages on Premium - * large-message entities). If the vendor property is not present, falls back to {@link #getLinkSize()}. + * Gets the maximum batch payload size for the send link. The default implementation + * returns {@link #getLinkSize()}, which is correct for brokers that do not advertise a + * separate batch limit. {@code ReactorSender} overrides this to read the vendor property + * {@code com.microsoft:max-message-batch-size} from the remote link attach frame, which + * may be smaller than {@link #getLinkSize()} (e.g., 1 MB for batches vs. 100 MB for + * single messages on Premium large-message entities). * * @return A Mono that completes and returns the maximum batch size for the send link. */ diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java index 5540f501b433..e8b488042f81 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java @@ -473,15 +473,15 @@ public Mono createMessageBatch(CreateMessageBatchOptions // Use the value from getMaxBatchSize() (vendor property, or standard max-message-size fallback // in ReactorSender). If neither is available (batchSizeFromLink <= 0), use 1 MB as a // last-resort default to prevent oversized batches on broken links. - final int maximumLinkSize = batchSizeFromLink > 0 ? batchSizeFromLink : DEFAULT_MAX_BATCH_SIZE_BYTES; - if (maxSize > maximumLinkSize) { + final int maximumBatchSize = batchSizeFromLink > 0 ? batchSizeFromLink : DEFAULT_MAX_BATCH_SIZE_BYTES; + if (maxSize > maximumBatchSize) { return monoError(logger, new IllegalArgumentException(String.format(Locale.US, "CreateMessageBatchOptions.getMaximumSizeInBytes (%s bytes) is larger than the maximum" + " batch size (%s bytes).", - maxSize, maximumLinkSize))); + maxSize, maximumBatchSize))); } - final int batchSize = maxSize > 0 ? Math.min(maxSize, maximumLinkSize) : maximumLinkSize; + final int batchSize = maxSize > 0 ? maxSize : maximumBatchSize; return Mono.just( new ServiceBusMessageBatch(isV2, batchSize, link::getErrorContext, tracer, messageSerializer)); })) From bdb3e03e911b093e534dede5d2ad0e42336d98f4 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Thu, 9 Apr 2026 09:06:42 -0700 Subject: [PATCH 07/12] fix: use unreleased dependency versioning for azure-core-amqp - Add unreleased_com.azure:azure-core-amqp;2.12.0-beta.1 to version_client.txt - Update pom.xml tag to unreleased_com.azure:azure-core-amqp;dependency - Ran update_versions.py --skip-readme to propagate --- eng/versioning/version_client.txt | 1 + sdk/servicebus/azure-messaging-servicebus/pom.xml | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/eng/versioning/version_client.txt b/eng/versioning/version_client.txt index 564b22c6d648..314c506a4a98 100644 --- a/eng/versioning/version_client.txt +++ b/eng/versioning/version_client.txt @@ -555,6 +555,7 @@ io.clientcore:optional-dependency-tests;1.0.0-beta.1;1.0.0-beta.1 # In the pom, the version update tag after the version should name the unreleased package and the dependency version: # +unreleased_com.azure:azure-core-amqp;2.12.0-beta.1 unreleased_com.azure.v2:azure-core;2.0.0-beta.1 unreleased_com.azure.v2:azure-identity;2.0.0-beta.1 unreleased_com.azure.v2:azure-data-appconfiguration;2.0.0-beta.1 diff --git a/sdk/servicebus/azure-messaging-servicebus/pom.xml b/sdk/servicebus/azure-messaging-servicebus/pom.xml index 7dcb218ab3b6..994eee2e816a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/pom.xml +++ b/sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -69,7 +69,7 @@ com.azure azure-core-amqp - 2.12.0-beta.1 + 2.12.0-beta.1 com.azure From 0533c68fc1341c8412238c1af3937f1fa1f59ccc Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Thu, 9 Apr 2026 09:16:55 -0700 Subject: [PATCH 08/12] fix: clarify fallback log message to include non-positive case - Update comment and verbose log to say 'not found, non-numeric, or non-positive' instead of 'not found or non-numeric' since the <= 0 check also catches zero and negative overflow values --- .../com/azure/core/amqp/implementation/ReactorSender.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java index dec00715785a..db51e1e6ee9a 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java @@ -436,12 +436,13 @@ public Mono getMaxBatchSize() { } } - // Fall back to the standard max-message-size if the vendor property is absent or - // non-numeric (expected for non-Service Bus brokers and older service deployments). + // Fall back to the standard max-message-size if the vendor property is absent, + // non-numeric, or non-positive (expected for non-Service Bus brokers and older + // service deployments). if (maxBatchSize <= 0) { final UnsignedLong remoteMaxMessageSize = sender.getRemoteMaxMessageSize(); logger.verbose( - "Vendor property '{}' not found or non-numeric on link, " + "Vendor property '{}' not found, non-numeric, or non-positive on link, " + "falling back to max-message-size: {}.", AmqpConstants.MAX_MESSAGE_BATCH_SIZE, remoteMaxMessageSize); if (remoteMaxMessageSize != null) { From 43344993a16431eae93a8f8a6876108e0f4f8265 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Thu, 9 Apr 2026 09:26:11 -0700 Subject: [PATCH 09/12] fix: correct constant name in test javadocs - Replace MAX_BATCH_SIZE_BYTES with DEFAULT_MAX_BATCH_SIZE_BYTES in two test javadocs to match the actual constant name in ServiceBusSenderAsyncClient --- .../servicebus/ServiceBusSenderAsyncClientTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java index d7b0358d1cf4..922817c6bf5b 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java @@ -990,8 +990,8 @@ void sendSingleMessage(boolean isV2) { } /** - * Verifies that sendMessage(single) does NOT cap at MAX_BATCH_SIZE_BYTES on a Premium-like link (100 MB). - * The single-message path goes through sendFluxInternal → AmqpMessageCollector which bypasses + * Verifies that sendMessage(single) does NOT cap at DEFAULT_MAX_BATCH_SIZE_BYTES on a Premium-like link + * (100 MB). The single-message path goes through sendFluxInternal → AmqpMessageCollector which bypasses * createMessageBatch() and therefore is not subject to the 1 MB batch cap. */ @ParameterizedTest @@ -1172,8 +1172,8 @@ void scheduleMessageSingleNotCappedWithLargeLink(boolean isV2) { /** * Verifies that sendMessage(single) with a message larger than 1 MB succeeds on a large link. * This proves the single-message path (sendFluxInternal -> AmqpMessageCollector) is NOT capped - * at MAX_BATCH_SIZE_BYTES (1 MB). On Premium namespaces with large per-entity limits, individual - * messages exceeding 1 MB are valid. + * at the default 1 MB batch size limit. On Premium namespaces with large per-entity limits, + * individual messages exceeding 1 MB are valid. */ @ParameterizedTest @MethodSource("selectStack") From 00e7dff296b7e1ae8fd3691c460374675dd89e41 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Thu, 7 May 2026 21:32:09 -0700 Subject: [PATCH 10/12] test: avoid single-use Stream in scheduleMessages mock Replace Flux.fromStream(IntStream...) with Flux.range().map() so the publisher remains resubscribable. Stream instances are single-use; if the code under test re-subscribes (retry, tracing instrumentation), the second subscription would fail with 'stream has already been operated upon or closed'. --- .../messaging/servicebus/ServiceBusSenderAsyncClientTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java index 922817c6bf5b..7dae0faf5317 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java @@ -1122,7 +1122,7 @@ void scheduleMessagesIterableWithLargeLinkCapsAt1MB(boolean isV2) { when(sendLink.getLinkSize()).thenReturn(Mono.just(largeLinkSize)); when(sendLink.getMaxBatchSize()).thenReturn(Mono.just(vendorBatchSize)); when(managementNode.schedule(anyList(), eq(instant), eq(vendorBatchSize), eq(LINK_NAME), isNull())) - .thenReturn(Flux.fromStream(IntStream.range(0, count).mapToObj(i -> sequenceNumberReturned + i))); + .thenReturn(Flux.range(0, count).map(i -> sequenceNumberReturned + i)); // Act & Assert - scheduleMessages(Iterable) → createMessageBatch() reads vendor property (1 MB), // then passes that value to managementNode.schedule(), NOT the raw 100 MB. From d63c3e255eb3a2f18836df4a02e06441d7799423 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Mon, 11 May 2026 18:58:39 -0700 Subject: [PATCH 11/12] fix(azure-core-amqp): drop ineffective retry on cached endpointStates in getMaxBatchSize The endpointStates Flux is built with .cache(1) (ReactorSender.java L167), so once it reaches a terminal state - ACTIVE, COMPLETE, or error - every subsequent subscription replays the cached signal. Wrapping the chain in RetryUtil.withRetry produces no new network activity on error; it just replays the same error after each backoff delay, unnecessarily delaying when the caller sees it. Removed the wrapper from getMaxBatchSize and added a comment so the pattern isn't reintroduced. Scope is intentionally limited to the new method - the identical pre-existing pattern in getLinkSize (L396) is out of scope for this PR. Tests still pass because the test setup pre-emits EndpointState.ACTIVE before subscribing, so takeUntil(ACTIVE) completes immediately whether wrapped in retry or not. Addresses anuchandy review feedback on PR #48214. --- .../com/azure/core/amqp/implementation/ReactorSender.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java index db51e1e6ee9a..69ac2abcb77d 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java @@ -420,9 +420,11 @@ public Mono getMaxBatchSize() { return Mono.defer(() -> Mono.just(maxBatchSize)); } - return RetryUtil - .withRetry(getEndpointStates().takeUntil(state -> state == AmqpEndpointState.ACTIVE), retryOptions, - activeTimeoutMessage) + // Don't wrap in RetryUtil.withRetry: getEndpointStates() is a cache(1) Flux, so once it + // reaches a terminal state (ACTIVE, COMPLETE, or error), every subscription replays that + // cached signal. Retrying on error would just replay the same error after the backoff + // delay without producing new network activity. + return getEndpointStates().takeUntil(state -> state == AmqpEndpointState.ACTIVE) .then(Mono.fromCallable(() -> { final Map remoteProperties = sender.getRemoteProperties(); if (remoteProperties != null From 85aaa25dc32c622853816e97743fd026c5b007bc Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Mon, 11 May 2026 19:48:09 -0700 Subject: [PATCH 12/12] fix(azure-core-amqp): preserve timeout in getMaxBatchSize after dropping retry Removing RetryUtil.withRetry also removed source.timeout(getTryTimeout()) that the wrapper applied. Without it, getMaxBatchSize() can hang indefinitely if the link never transitions to ACTIVE (and never errors). Add an explicit .timeout(retryOptions.getTryTimeout()) so hang protection matches getLinkSize(), while still avoiding the ineffective retry loop. Addresses Copilot review feedback on PR #48214. --- .../com/azure/core/amqp/implementation/ReactorSender.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java index 69ac2abcb77d..ece618d76ccc 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java @@ -423,8 +423,10 @@ public Mono getMaxBatchSize() { // Don't wrap in RetryUtil.withRetry: getEndpointStates() is a cache(1) Flux, so once it // reaches a terminal state (ACTIVE, COMPLETE, or error), every subscription replays that // cached signal. Retrying on error would just replay the same error after the backoff - // delay without producing new network activity. + // delay without producing new network activity. A bare timeout still applies so callers + // don't hang indefinitely if the link never transitions to ACTIVE. return getEndpointStates().takeUntil(state -> state == AmqpEndpointState.ACTIVE) + .timeout(retryOptions.getTryTimeout()) .then(Mono.fromCallable(() -> { final Map remoteProperties = sender.getRemoteProperties(); if (remoteProperties != null