Skip to content

Commit

Permalink
[azservicebus] Fixing bug from #23893, where a message could be recei…
Browse files Browse the repository at this point in the history
…ved in the mess… (#23929)

It was possible for the message releaser to receive a message, but not release it. When that happened the message would be locked by SB, but not released, so you'd see it constantly get redelivered but never see it in your code.
    
Added some tests for this scenario, and a lot of comments to explain how we (now) avoid it.
Also, go get -u'd all the deps, just to keep pace.

Fixes #23893
  • Loading branch information
richardpark-msft authored Jan 11, 2025
1 parent ea1a0e7 commit 0443c04
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 95 deletions.
2 changes: 2 additions & 0 deletions sdk/messaging/azservicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

### Bugs Fixed

- Receivers had a bug where a message could be received but not returned to the user. Callers would see that, occasionally, a message would not be returned from ReceiveMessages(), but would appear to have been received. Thanks to @patrickwhite256 for reporting this issue. (PR#23929)

### Other Changes

## 1.7.3 (2024-10-14)
Expand Down
8 changes: 4 additions & 4 deletions sdk/messaging/azservicebus/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.18
retract v1.1.2 // Breaks customers in situations where close is slow/infinite.

require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.16.0
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0
github.com/Azure/go-amqp v1.3.0
Expand All @@ -31,9 +31,9 @@ require (
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.21.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
16 changes: 8 additions & 8 deletions sdk/messaging/azservicebus/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.16.0 h1:JZg6HRh6W6U4OLl6lk7BZ7BLisIzM9dG1R50zUk9C/M=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.16.0/go.mod h1:YL1xnZ6QejvQHWJrX/AvhFl4WW4rqHVoKspWNVwFk0M=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0 h1:g0EZJwz7xkXQiZAI5xi9f3WWFYBlX1CPTrR+NDToRkQ=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0/go.mod h1:XCW7KnZet0Opnr7HccfUw1PLc4CjHqpcaxW8DHklNkQ=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0 h1:B/dfvscEQtew9dVuoxqxrUKKv8Ih2f55PydknDamU+g=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0/go.mod h1:fiPSssYvltE08HJchL04dOy+RD4hgrjph0cwGGMntdI=
github.com/Azure/azure-sdk-for-go/sdk/azidentity/cache v0.3.0 h1:+m0M/LFxN43KvULkDNfdXOgrjtg6UYJPFBJyuEcRCAw=
Expand Down Expand Up @@ -43,14 +43,14 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand All @@ -59,8 +59,8 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ func SendAndReceiveDrain(remainingArgs []string) {
// this is bad - it means we didn't get _any_ messages within an entire
// minute and might indicate that we're hitting the customer bug.

log.Printf("Exceeded the timeout, trying one more time real fast")
log.Printf("Exceeded the timeout, trying one more time real fast to see what the bug might be...")

// let's see if there is some other momentary issue happening here by doing a quick receive again.
ctx, cancel := context.WithTimeout(sc.Context, time.Minute)
defer cancel()
messages, err = receiver.ReceiveMessages(ctx, numToSend+100, nil)
sc.PanicOnError("Exceeded a minute while waiting for messages", err)
sc.Failf("Exceeded a minute while waiting for messages (got %d messages in second try). Error: %#v", len(messages), err)
}

log.Printf("Got %d messages, completing...", len(messages))
Expand Down
26 changes: 17 additions & 9 deletions sdk/messaging/azservicebus/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Receiver struct {
receiving bool
retryOptions RetryOptions
settler *messageSettler
defaultReleaserTimeout time.Duration // defaults to 1min, settable for unit tests.
}

// ReceiverOptions contains options for the `Client.NewReceiverForQueue` or `Client.NewReceiverForSubscription`
Expand Down Expand Up @@ -133,6 +134,7 @@ func newReceiver(args newReceiverArgs, options *ReceiverOptions) (*Receiver, err
lastPeekedSequenceNumber: 0,
maxAllowedCredits: defaultLinkRxBuffer,
retryOptions: args.retryOptions,
defaultReleaserTimeout: time.Minute,
}

receiver.cancelReleaser.Store(emptyCancelFn)
Expand Down Expand Up @@ -608,32 +610,38 @@ func (r *Receiver) newReleaserFunc(receiver amqpwrap.AMQPReceiver) func() {
}

ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
releaseLoopDone := make(chan struct{})
released := 0

// this func gets called when a new ReceiveMessages() starts
r.cancelReleaser.Store(func() string {
cancel()
<-done
<-releaseLoopDone
return receiver.LinkName()
})

return func() {
defer close(done)
defer close(releaseLoopDone)

for {
// we might not have all the messages we need here.
msg, err := receiver.Receive(ctx, nil)

if err == nil {
err = receiver.ReleaseMessage(ctx, msg)
}
releaseCtx, cancelRelease := context.WithTimeout(context.Background(), r.defaultReleaserTimeout)

if err == nil {
released++
// We don't use `ctx` here to avoid cancelling Release(), and leaving this message
// in limbo until it expires.
err = receiver.ReleaseMessage(releaseCtx, msg)
cancelRelease()

if err == nil {
released++
}
}

if internal.IsCancelError(err) {
// We check `ctx.Err()` here, instead of testing the returned err from .Receive(), because Receive()
// ignores cancellation if it has any messages in its prefetch queue.
if ctx.Err() != nil {
if released > 0 {
r.amqpLinks.Writef(exported.EventReceiver, "Message releaser pausing. Released %d messages", released)
}
Expand Down
128 changes: 128 additions & 0 deletions sdk/messaging/azservicebus/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"regexp"
"sort"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -1043,6 +1044,133 @@ func TestReceiveWithDifferentWaitTime(t *testing.T) {
require.Greater(t, bigger, base2)
}

func TestReceiveCancelReleaser(t *testing.T) {
getLogs := test.CaptureLogsForTest(false)

client, cleanup, queueName := setupLiveTest(t, &liveTestOptions{
QueueProperties: &admin.QueueProperties{
// use a long lock time to really make it clear when we've accidentally
// orphaned a message.
LockDuration: to.Ptr("PT5M"),
},
})
defer cleanup()

receiver, err := client.NewReceiverForQueue(queueName, nil)
require.NoError(t, err)

sender, err := client.NewSender(queueName, nil)
require.NoError(t, err)

padding := make([]byte, 1)

var batch *MessageBatch
const numSent = 2000

t.Logf("Sending messages")
SendLoop:
for i := 0; i < numSent; i++ {
if batch == nil {
tmpBatch, err := sender.NewMessageBatch(context.Background(), nil)
require.NoError(t, err)
batch = tmpBatch
}

err := batch.AddMessage(&Message{
MessageID: to.Ptr(fmt.Sprintf("%d", i)),
Body: padding}, nil)

if errors.Is(err, ErrMessageTooLarge) {
err := sender.SendMessageBatch(context.Background(), batch, nil)
require.NoError(t, err)
batch = nil
i--
continue SendLoop
}

if i == numSent-1 {
err := sender.SendMessageBatch(context.Background(), batch, nil)
require.NoError(t, err)
break SendLoop
}
}

t.Logf("Receiving small subset of messages")
messages, err := receiver.ReceiveMessages(context.Background(), numSent, &ReceiveMessagesOptions{
// Receive with a high credit count, but too little time to actually get them all
// This will force us into a situation where the AMQP receiver will have a lot of messages
// in its prefetch cache, giving us a high chance of triggering a previous bug where early
// cancellation could result in us losing messages.
TimeAfterFirstMessage: time.Nanosecond,
})
require.NoError(t, err)

ids := &sync.Map{}

for _, msg := range messages {
require.NoError(t, receiver.CompleteMessage(context.Background(), msg, nil))
_, exists := ids.LoadOrStore(msg.MessageID, true)
require.False(t, exists)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

expected := numSent - len(messages) // remove any messages we've already received. Usually it's just one but it's not
remaining := expected

t.Logf("Receiving remaining messages (%d)", remaining)

for remaining > 0 {
messages, err := receiver.ReceiveMessages(ctx, remaining, nil)
require.NoError(t, err)
require.NotEmpty(t, messages)

t.Logf("Received %d messages", len(messages))

wg := sync.WaitGroup{}

for _, msg := range messages {
msg := msg
wg.Add(1)

go func() {
defer wg.Done()
require.NoError(t, receiver.CompleteMessage(context.Background(), msg, nil))
_, exists := ids.LoadOrStore(msg.MessageID, true)
require.False(t, exists)
}()
}

wg.Wait()

remaining -= len(messages)
}

count := 0
ids.Range(func(_, _ any) bool {
count++
return true
})

require.Equal(t, numSent, count)

logs := getLogs()

found := 0

for _, log := range logs {
if strings.Contains(log, "Message releaser pausing. Released ") {
found++
}
}

// This is a bit of a non-deterministic bit so I'm not going to fail the overall test
if found == 0 {
t.Logf("Failed to find our 'messages released' log entry: %#v", logs)
}
}

type receivedMessageSlice []*ReceivedMessage

func (messages receivedMessageSlice) Len() int {
Expand Down
Loading

0 comments on commit 0443c04

Please sign in to comment.