diff --git a/sdk/messaging/azservicebus/CHANGELOG.md b/sdk/messaging/azservicebus/CHANGELOG.md index c21b858c7f20..4d31da67ff32 100644 --- a/sdk/messaging/azservicebus/CHANGELOG.md +++ b/sdk/messaging/azservicebus/CHANGELOG.md @@ -8,6 +8,8 @@ ### Bugs Fixed +- Read `com.microsoft:max-message-batch-size` vendor property from the AMQP sender link to correctly limit batch size on Premium large-message entities, where `max-message-size` can be up to 100 MB but the batch limit is 1 MB. + ### Other Changes ## 1.10.0 (2025-08-05) diff --git a/sdk/messaging/azservicebus/internal/amqp_test_utils.go b/sdk/messaging/azservicebus/internal/amqp_test_utils.go index 20dbf52cb7a7..9dd209d5b97f 100644 --- a/sdk/messaging/azservicebus/internal/amqp_test_utils.go +++ b/sdk/messaging/azservicebus/internal/amqp_test_utils.go @@ -238,6 +238,10 @@ func (s *FakeAMQPSender) LinkName() string { return "sender-link-name" } +func (s *FakeAMQPSender) Properties() map[string]any { + return nil +} + func (s *FakeAMQPSender) Close(ctx context.Context) error { s.Closed++ return nil diff --git a/sdk/messaging/azservicebus/internal/amqpwrap/amqpwrap.go b/sdk/messaging/azservicebus/internal/amqpwrap/amqpwrap.go index bb7d0f90a8c5..23f01ac1fa38 100644 --- a/sdk/messaging/azservicebus/internal/amqpwrap/amqpwrap.go +++ b/sdk/messaging/azservicebus/internal/amqpwrap/amqpwrap.go @@ -46,6 +46,7 @@ type AMQPSender interface { Send(ctx context.Context, msg *amqp.Message, o *amqp.SendOptions) error MaxMessageSize() uint64 LinkName() string + Properties() map[string]any } // AMQPSenderCloser is implemented by [*AMQPSenderWrapper] @@ -254,6 +255,10 @@ func (sw *AMQPSenderWrapper) LinkName() string { return sw.Inner.LinkName() } +func (sw *AMQPSenderWrapper) Properties() map[string]any { + return sw.Inner.Properties() +} + func (sw *AMQPSenderWrapper) Close(ctx context.Context) error { ctx, cancel := sw.ContextWithTimeoutFn(ctx, defaultCloseTimeout) defer cancel() diff --git a/sdk/messaging/azservicebus/internal/amqpwrap/mock_amqp_test.go b/sdk/messaging/azservicebus/internal/amqpwrap/mock_amqp_test.go index edd12f914b8d..407b8c8ee5f2 100644 --- a/sdk/messaging/azservicebus/internal/amqpwrap/mock_amqp_test.go +++ b/sdk/messaging/azservicebus/internal/amqpwrap/mock_amqp_test.go @@ -437,6 +437,20 @@ func (mr *MockAMQPSenderMockRecorder) MaxMessageSize() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxMessageSize", reflect.TypeOf((*MockAMQPSender)(nil).MaxMessageSize)) } +// Properties mocks base method. +func (m *MockAMQPSender) Properties() map[string]any { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Properties") + ret0, _ := ret[0].(map[string]any) + return ret0 +} + +// Properties indicates an expected call of Properties. +func (mr *MockAMQPSenderMockRecorder) Properties() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Properties", reflect.TypeOf((*MockAMQPSender)(nil).Properties)) +} + // Send mocks base method. func (m *MockAMQPSender) Send(ctx context.Context, msg *go_amqp.Message, o *go_amqp.SendOptions) error { m.ctrl.T.Helper() @@ -516,6 +530,20 @@ func (mr *MockAMQPSenderCloserMockRecorder) MaxMessageSize() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxMessageSize", reflect.TypeOf((*MockAMQPSenderCloser)(nil).MaxMessageSize)) } +// Properties mocks base method. +func (m *MockAMQPSenderCloser) Properties() map[string]any { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Properties") + ret0, _ := ret[0].(map[string]any) + return ret0 +} + +// Properties indicates an expected call of Properties. +func (mr *MockAMQPSenderCloserMockRecorder) Properties() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Properties", reflect.TypeOf((*MockAMQPSenderCloser)(nil).Properties)) +} + // Send mocks base method. func (m *MockAMQPSenderCloser) Send(ctx context.Context, msg *go_amqp.Message, o *go_amqp.SendOptions) error { m.ctrl.T.Helper() diff --git a/sdk/messaging/azservicebus/internal/mock/emulation/mock_data_sender.go b/sdk/messaging/azservicebus/internal/mock/emulation/mock_data_sender.go index 7ee49c436bc3..a1daabb9d4c9 100644 --- a/sdk/messaging/azservicebus/internal/mock/emulation/mock_data_sender.go +++ b/sdk/messaging/azservicebus/internal/mock/emulation/mock_data_sender.go @@ -63,6 +63,7 @@ func (md *MockData) NewSender(ctx context.Context, target string, opts *amqp.Sen } sender.EXPECT().MaxMessageSize().Return(uint64(1024 * 1024 * 100)).AnyTimes() + sender.EXPECT().Properties().Return(map[string]any(nil)).AnyTimes() // this should work fine even for RPC links like $cbs or $management q := md.upsertQueue(target) diff --git a/sdk/messaging/azservicebus/internal/mock/mock_amqp.go b/sdk/messaging/azservicebus/internal/mock/mock_amqp.go index 85949a15cd38..e7f78130f6ff 100644 --- a/sdk/messaging/azservicebus/internal/mock/mock_amqp.go +++ b/sdk/messaging/azservicebus/internal/mock/mock_amqp.go @@ -438,6 +438,20 @@ func (mr *MockAMQPSenderMockRecorder) MaxMessageSize() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxMessageSize", reflect.TypeOf((*MockAMQPSender)(nil).MaxMessageSize)) } +// Properties mocks base method. +func (m *MockAMQPSender) Properties() map[string]any { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Properties") + ret0, _ := ret[0].(map[string]any) + return ret0 +} + +// Properties indicates an expected call of Properties. +func (mr *MockAMQPSenderMockRecorder) Properties() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Properties", reflect.TypeOf((*MockAMQPSender)(nil).Properties)) +} + // Send mocks base method. func (m *MockAMQPSender) Send(ctx context.Context, msg *amqp.Message, o *amqp.SendOptions) error { m.ctrl.T.Helper() @@ -517,6 +531,20 @@ func (mr *MockAMQPSenderCloserMockRecorder) MaxMessageSize() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxMessageSize", reflect.TypeOf((*MockAMQPSenderCloser)(nil).MaxMessageSize)) } +// Properties mocks base method. +func (m *MockAMQPSenderCloser) Properties() map[string]any { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Properties") + ret0, _ := ret[0].(map[string]any) + return ret0 +} + +// Properties indicates an expected call of Properties. +func (mr *MockAMQPSenderCloserMockRecorder) Properties() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Properties", reflect.TypeOf((*MockAMQPSenderCloser)(nil).Properties)) +} + // Send mocks base method. func (m *MockAMQPSenderCloser) Send(ctx context.Context, msg *amqp.Message, o *amqp.SendOptions) error { m.ctrl.T.Helper() diff --git a/sdk/messaging/azservicebus/internal/rpc_test.go b/sdk/messaging/azservicebus/internal/rpc_test.go index f7ed9e3f3ca9..a0db8ce06233 100644 --- a/sdk/messaging/azservicebus/internal/rpc_test.go +++ b/sdk/messaging/azservicebus/internal/rpc_test.go @@ -304,6 +304,10 @@ func (tester *rpcTester) LinkName() string { return "hello" } +func (tester *rpcTester) Properties() map[string]any { + return nil +} + // receiver functions func (tester *rpcTester) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error) { diff --git a/sdk/messaging/azservicebus/sender.go b/sdk/messaging/azservicebus/sender.go index 4cea59310344..1602450f95cc 100644 --- a/sdk/messaging/azservicebus/sender.go +++ b/sdk/messaging/azservicebus/sender.go @@ -39,7 +39,32 @@ func (s *Sender) NewMessageBatch(ctx context.Context, options *MessageBatchOptio var batch *MessageBatch err := s.links.Retry(ctx, EventSender, "send", func(ctx context.Context, lwid *internal.LinksWithID, args *utils.RetryFnArgs) error { + // Prefer the vendor property 'com.microsoft:max-message-batch-size' which + // correctly reports the batch size limit (e.g. 1 MB on Premium) independent + // of max-message-size (which can be up to 100 MB on large-message entities). maxBytes := lwid.Sender.MaxMessageSize() + if props := lwid.Sender.Properties(); props != nil { + if v, ok := props["com.microsoft:max-message-batch-size"]; ok { + var batchSize uint64 + switch val := v.(type) { + case uint64: + batchSize = val + case uint32: + batchSize = uint64(val) + case int64: + if val > 0 { + batchSize = uint64(val) + } + case int: + if val > 0 { + batchSize = uint64(val) + } + } + if batchSize > 0 { + maxBytes = batchSize + } + } + } if options != nil && options.MaxBytes != 0 { maxBytes = options.MaxBytes diff --git a/sdk/messaging/azservicebus/sender_unit_test.go b/sdk/messaging/azservicebus/sender_unit_test.go index b676ffc75150..26ff09049b40 100644 --- a/sdk/messaging/azservicebus/sender_unit_test.go +++ b/sdk/messaging/azservicebus/sender_unit_test.go @@ -105,3 +105,102 @@ func TestSenderNewMessageBatch_ConnectionClosed(t *testing.T) { require.Equal(t, CodeConnectionLost, asSBError.Code) require.Nil(t, batch) } + +func TestSenderNewMessageBatch_VendorPropertyOverridesMaxMessageSize(t *testing.T) { + _, client, cleanup := newClientWithMockedConn(t, &emulation.MockDataOptions{ + PreReceiverMock: func(mr *emulation.MockReceiver, ctx context.Context) error { + return nil + }, + PreSenderMock: func(ms *emulation.MockSender, ctx context.Context) error { + if ms.Target != "$cbs" { + // Set MaxMessageSize to 100 MB and the vendor batch-size property to 1 MB + // so this test can verify the vendor property is used as the batch limit. + ms.EXPECT().MaxMessageSize().Return(uint64(100 * 1024 * 1024)).AnyTimes() + ms.EXPECT().Properties().Return(map[string]any{ + "com.microsoft:max-message-batch-size": uint64(1048576), + }).AnyTimes() + } + return nil + }, + }, &ClientOptions{ + RetryOptions: noRetriesNeeded, + }) + defer cleanup() + + sender, err := client.NewSender("queue", nil) + require.NoError(t, err) + + batch, err := sender.NewMessageBatch(context.Background(), nil) + require.NoError(t, err) + require.NotNil(t, batch) + + // The batch should use the vendor property (1 MB), not MaxMessageSize (100 MB). + // A 1 MiB body plus AMQP envelope overhead exceeds the 1 MiB batch limit. + largeBody := make([]byte, 1048576) + err = batch.AddMessage(&Message{Body: largeBody}, nil) + require.ErrorIs(t, err, ErrMessageTooLarge, "A 1 MiB message should exceed the vendor batch limit minus overhead") +} + +func TestSenderNewMessageBatch_FallsBackWhenVendorPropertyAbsent(t *testing.T) { + _, client, cleanup := newClientWithMockedConn(t, &emulation.MockDataOptions{ + PreReceiverMock: func(mr *emulation.MockReceiver, ctx context.Context) error { + return nil + }, + PreSenderMock: func(ms *emulation.MockSender, ctx context.Context) error { + if ms.Target != "$cbs" { + ms.EXPECT().MaxMessageSize().Return(uint64(262144)).AnyTimes() // 256 KB + ms.EXPECT().Properties().Return(map[string]any(nil)).AnyTimes() + } + return nil + }, + }, &ClientOptions{ + RetryOptions: noRetriesNeeded, + }) + defer cleanup() + + sender, err := client.NewSender("queue", nil) + require.NoError(t, err) + + batch, err := sender.NewMessageBatch(context.Background(), nil) + require.NoError(t, err) + require.NotNil(t, batch) + + // Should fall back to MaxMessageSize (256 KB). A 256 KB body should be rejected. + body := make([]byte, 262144) + err = batch.AddMessage(&Message{Body: body}, nil) + require.ErrorIs(t, err, ErrMessageTooLarge, "A 256 KB message should exceed the link limit minus overhead") +} + +func TestSenderNewMessageBatch_UserMaxBytesOverridesVendorProperty(t *testing.T) { + _, client, cleanup := newClientWithMockedConn(t, &emulation.MockDataOptions{ + PreReceiverMock: func(mr *emulation.MockReceiver, ctx context.Context) error { + return nil + }, + PreSenderMock: func(ms *emulation.MockSender, ctx context.Context) error { + if ms.Target != "$cbs" { + ms.EXPECT().MaxMessageSize().Return(uint64(100 * 1024 * 1024)).AnyTimes() + ms.EXPECT().Properties().Return(map[string]any{ + "com.microsoft:max-message-batch-size": uint64(1048576), + }).AnyTimes() + } + return nil + }, + }, &ClientOptions{ + RetryOptions: noRetriesNeeded, + }) + defer cleanup() + + sender, err := client.NewSender("queue", nil) + require.NoError(t, err) + + batch, err := sender.NewMessageBatch(context.Background(), &MessageBatchOptions{ + MaxBytes: 512, + }) + require.NoError(t, err) + require.NotNil(t, batch) + + // User override of 512 bytes — a small message should still be rejected + body := make([]byte, 512) + err = batch.AddMessage(&Message{Body: body}, nil) + require.ErrorIs(t, err, ErrMessageTooLarge, "A 512-byte message should exceed the user-specified 512-byte limit minus overhead") +}