Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/messaging/azservicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- Read `com.microsoft:max-message-batch-size` vendor property from the AMQP sender link to correctly limit batch size on Premium large-message entities, where `max-message-size` can be up to 100 MB but the batch limit is 1 MB.

### Other Changes

## 1.10.0 (2025-08-05)
Expand Down
4 changes: 4 additions & 0 deletions sdk/messaging/azservicebus/internal/amqp_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ func (s *FakeAMQPSender) LinkName() string {
return "sender-link-name"
}

func (s *FakeAMQPSender) Properties() map[string]any {
return nil
}

func (s *FakeAMQPSender) Close(ctx context.Context) error {
s.Closed++
return nil
Expand Down
5 changes: 5 additions & 0 deletions sdk/messaging/azservicebus/internal/amqpwrap/amqpwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type AMQPSender interface {
Send(ctx context.Context, msg *amqp.Message, o *amqp.SendOptions) error
MaxMessageSize() uint64
LinkName() string
Properties() map[string]any
}

// AMQPSenderCloser is implemented by [*AMQPSenderWrapper]
Expand Down Expand Up @@ -254,6 +255,10 @@ func (sw *AMQPSenderWrapper) LinkName() string {
return sw.Inner.LinkName()
}

func (sw *AMQPSenderWrapper) Properties() map[string]any {
return sw.Inner.Properties()
}

func (sw *AMQPSenderWrapper) Close(ctx context.Context) error {
ctx, cancel := sw.ContextWithTimeoutFn(ctx, defaultCloseTimeout)
defer cancel()
Expand Down
28 changes: 28 additions & 0 deletions sdk/messaging/azservicebus/internal/amqpwrap/mock_amqp_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (md *MockData) NewSender(ctx context.Context, target string, opts *amqp.Sen
}

sender.EXPECT().MaxMessageSize().Return(uint64(1024 * 1024 * 100)).AnyTimes()
sender.EXPECT().Properties().Return(map[string]any(nil)).AnyTimes()

// this should work fine even for RPC links like $cbs or $management
q := md.upsertQueue(target)
Expand Down
28 changes: 28 additions & 0 deletions sdk/messaging/azservicebus/internal/mock/mock_amqp.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions sdk/messaging/azservicebus/internal/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ func (tester *rpcTester) LinkName() string {
return "hello"
}

func (tester *rpcTester) Properties() map[string]any {
return nil
}

// receiver functions

func (tester *rpcTester) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error) {
Expand Down
25 changes: 25 additions & 0 deletions sdk/messaging/azservicebus/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,32 @@ func (s *Sender) NewMessageBatch(ctx context.Context, options *MessageBatchOptio
var batch *MessageBatch

err := s.links.Retry(ctx, EventSender, "send", func(ctx context.Context, lwid *internal.LinksWithID, args *utils.RetryFnArgs) error {
// Prefer the vendor property 'com.microsoft:max-message-batch-size' which
// correctly reports the batch size limit (e.g. 1 MB on Premium) independent
// of max-message-size (which can be up to 100 MB on large-message entities).
maxBytes := lwid.Sender.MaxMessageSize()
if props := lwid.Sender.Properties(); props != nil {
if v, ok := props["com.microsoft:max-message-batch-size"]; ok {
var batchSize uint64
switch val := v.(type) {
case uint64:
batchSize = val
case uint32:
batchSize = uint64(val)
case int64:
if val > 0 {
batchSize = uint64(val)
}
case int:
if val > 0 {
batchSize = uint64(val)
}
}
if batchSize > 0 {
maxBytes = batchSize
}
}
Comment thread
EldertGrootenboer marked this conversation as resolved.
}

if options != nil && options.MaxBytes != 0 {
maxBytes = options.MaxBytes
Expand Down
99 changes: 99 additions & 0 deletions sdk/messaging/azservicebus/sender_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,102 @@ func TestSenderNewMessageBatch_ConnectionClosed(t *testing.T) {
require.Equal(t, CodeConnectionLost, asSBError.Code)
require.Nil(t, batch)
}

func TestSenderNewMessageBatch_VendorPropertyOverridesMaxMessageSize(t *testing.T) {
_, client, cleanup := newClientWithMockedConn(t, &emulation.MockDataOptions{
PreReceiverMock: func(mr *emulation.MockReceiver, ctx context.Context) error {
return nil
},
PreSenderMock: func(ms *emulation.MockSender, ctx context.Context) error {
if ms.Target != "$cbs" {
// Set MaxMessageSize to 100 MB and the vendor batch-size property to 1 MB
// so this test can verify the vendor property is used as the batch limit.
ms.EXPECT().MaxMessageSize().Return(uint64(100 * 1024 * 1024)).AnyTimes()
ms.EXPECT().Properties().Return(map[string]any{
"com.microsoft:max-message-batch-size": uint64(1048576),
}).AnyTimes()
}
return nil
},
}, &ClientOptions{
RetryOptions: noRetriesNeeded,
})
defer cleanup()

sender, err := client.NewSender("queue", nil)
require.NoError(t, err)

batch, err := sender.NewMessageBatch(context.Background(), nil)
require.NoError(t, err)
require.NotNil(t, batch)

// The batch should use the vendor property (1 MB), not MaxMessageSize (100 MB).
// A 1 MiB body plus AMQP envelope overhead exceeds the 1 MiB batch limit.
largeBody := make([]byte, 1048576)
err = batch.AddMessage(&Message{Body: largeBody}, nil)
require.ErrorIs(t, err, ErrMessageTooLarge, "A 1 MiB message should exceed the vendor batch limit minus overhead")
}

func TestSenderNewMessageBatch_FallsBackWhenVendorPropertyAbsent(t *testing.T) {
_, client, cleanup := newClientWithMockedConn(t, &emulation.MockDataOptions{
PreReceiverMock: func(mr *emulation.MockReceiver, ctx context.Context) error {
return nil
},
PreSenderMock: func(ms *emulation.MockSender, ctx context.Context) error {
if ms.Target != "$cbs" {
ms.EXPECT().MaxMessageSize().Return(uint64(262144)).AnyTimes() // 256 KB
ms.EXPECT().Properties().Return(map[string]any(nil)).AnyTimes()
}
return nil
},
}, &ClientOptions{
RetryOptions: noRetriesNeeded,
})
defer cleanup()

sender, err := client.NewSender("queue", nil)
require.NoError(t, err)

batch, err := sender.NewMessageBatch(context.Background(), nil)
require.NoError(t, err)
require.NotNil(t, batch)

// Should fall back to MaxMessageSize (256 KB). A 256 KB body should be rejected.
body := make([]byte, 262144)
err = batch.AddMessage(&Message{Body: body}, nil)
require.ErrorIs(t, err, ErrMessageTooLarge, "A 256 KB message should exceed the link limit minus overhead")
}

func TestSenderNewMessageBatch_UserMaxBytesOverridesVendorProperty(t *testing.T) {
_, client, cleanup := newClientWithMockedConn(t, &emulation.MockDataOptions{
PreReceiverMock: func(mr *emulation.MockReceiver, ctx context.Context) error {
return nil
},
PreSenderMock: func(ms *emulation.MockSender, ctx context.Context) error {
if ms.Target != "$cbs" {
ms.EXPECT().MaxMessageSize().Return(uint64(100 * 1024 * 1024)).AnyTimes()
ms.EXPECT().Properties().Return(map[string]any{
"com.microsoft:max-message-batch-size": uint64(1048576),
}).AnyTimes()
}
return nil
},
}, &ClientOptions{
RetryOptions: noRetriesNeeded,
})
defer cleanup()

sender, err := client.NewSender("queue", nil)
require.NoError(t, err)

batch, err := sender.NewMessageBatch(context.Background(), &MessageBatchOptions{
MaxBytes: 512,
})
require.NoError(t, err)
require.NotNil(t, batch)

// User override of 512 bytes — a small message should still be rejected
body := make([]byte, 512)
err = batch.AddMessage(&Message{Body: body}, nil)
require.ErrorIs(t, err, ErrMessageTooLarge, "A 512-byte message should exceed the user-specified 512-byte limit minus overhead")
}