Skip to content

Commit

Permalink
Do not use atomics in DeferredConfirmation
Browse files Browse the repository at this point in the history
  • Loading branch information
tie committed Dec 21, 2022
1 parent 5d508ea commit 19e503a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 18 deletions.
35 changes: 18 additions & 17 deletions confirms.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package amqp091
import (
"context"
"sync"
"sync/atomic"
)

// confirms resequences and notifies one or multiple publisher confirmation listeners
Expand Down Expand Up @@ -173,44 +172,46 @@ func (d *deferredConfirmations) Close() {
}
}

// setAck sets the acknowledgement status of the confirmation. Note that it is
// not safe for concurrent use and must not be called more than once.
// setAck sets the acknowledgement status of the confirmation. Note that it must
// not be called more than once.
func (d *DeferredConfirmation) setAck(ack bool) {
if ack {
atomic.StoreInt32(&d.ack, 1)
}
d.ack = ack
close(d.done)
}

// Done returns the channel that can be used to wait for the publisher
// confirmation.
func (d *DeferredConfirmation) Done() chan struct{} {
func (d *DeferredConfirmation) Done() <-chan struct{} {
return d.done
}

// Acked returns the publisher confirmation in a non-blocking manner. It returns
// false if the confirmation was not confirmed yet or was not acknowledged.
// false if the confirmation was not acknowledged yet or received negative
// acknowledgement.
func (d *DeferredConfirmation) Acked() bool {
return atomic.LoadInt32(&d.ack) != 0
select {
case <-d.done:
default:
return false
}
return d.ack
}

// Wait blocks until the publisher confirmation. Returns true if server
// Wait blocks until the publisher confirmation. It returns true if the server
// successfully received the publishing.
func (d *DeferredConfirmation) Wait() bool {
<-d.done
// NB we do not use atomics here since setAck is called at most once and
// ack is already guarded by done channel. That is, ack is guaranteed to
// be read-only at this point.
return d.ack != 0
return d.ack
}

// WaitContext is like Wait but returns ctx.Err() if the given context expires.
// WaitContext waits until the publisher confirmation. It returns true if the
// server successfully received the publishing. If the context expires before
// that, ctx.Err() is returned.
func (d *DeferredConfirmation) WaitContext(ctx context.Context) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
case <-d.done:
}
// See comment in Wait regarding non-atomic ack access.
return d.ack != 0, nil
return d.ack, nil
}
2 changes: 1 addition & 1 deletion types.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ type DeferredConfirmation struct {
DeliveryTag uint64

done chan struct{}
ack int32 // atomic bool
ack bool
}

// Confirmation notifies the acknowledgment or negative acknowledgement of a
Expand Down

0 comments on commit 19e503a

Please sign in to comment.