Skip to content

Commit

Permalink
Merge pull request #140 from tie/main
Browse files Browse the repository at this point in the history
Do not embed context in DeferredConfirmation
  • Loading branch information
Zerpet authored Jan 3, 2023
2 parents 0c7af02 + 19e503a commit a733bce
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 117 deletions.
3 changes: 1 addition & 2 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,6 @@ func (ch *Channel) NotifyPublish(confirm chan Confirmation) chan Confirmation {
}

return confirm

}

/*
Expand Down Expand Up @@ -1471,7 +1470,7 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex
}

if ch.confirming {
return ch.confirms.Publish(ctx), nil
return ch.confirms.Publish(), nil
}

return nil, nil
Expand Down
69 changes: 48 additions & 21 deletions confirms.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ func (c *confirms) Listen(l chan Confirmation) {
}

// Publish increments the publishing counter
func (c *confirms) Publish(ctx context.Context) *DeferredConfirmation {
func (c *confirms) Publish() *DeferredConfirmation {
c.publishedMut.Lock()
defer c.publishedMut.Unlock()

c.published++
return c.deferredConfirmations.Add(ctx, c.published)
return c.deferredConfirmations.Add(c.published)
}

// confirm confirms one publishing, increments the expecting delivery tag, and
Expand Down Expand Up @@ -125,12 +125,12 @@ func newDeferredConfirmations() *deferredConfirmations {
}
}

func (d *deferredConfirmations) Add(ctx context.Context, tag uint64) *DeferredConfirmation {
func (d *deferredConfirmations) Add(tag uint64) *DeferredConfirmation {
d.m.Lock()
defer d.m.Unlock()

dc := &DeferredConfirmation{DeliveryTag: tag}
dc.ctx, dc.cancel = context.WithCancel(ctx)
dc.done = make(chan struct{})
d.confirmations[tag] = dc
return dc
}
Expand All @@ -141,10 +141,11 @@ func (d *deferredConfirmations) Confirm(confirmation Confirmation) {

dc, found := d.confirmations[confirmation.DeliveryTag]
if !found {
// we should never receive a confirmation for a tag that hasn't been published, but a test causes this to happen
// We should never receive a confirmation for a tag that hasn't
// been published, but a test causes this to happen.
return
}
dc.Confirm(confirmation.Ack)
dc.setAck(confirmation.Ack)
delete(d.confirmations, confirmation.DeliveryTag)
}

Expand All @@ -154,37 +155,63 @@ func (d *deferredConfirmations) ConfirmMultiple(confirmation Confirmation) {

for k, v := range d.confirmations {
if k <= confirmation.DeliveryTag {
v.Confirm(confirmation.Ack)
v.setAck(confirmation.Ack)
delete(d.confirmations, k)
}
}
}

// Nacks all pending DeferredConfirmations being blocked by dc.Wait()
// Close nacks all pending DeferredConfirmations being blocked by dc.Wait().
func (d *deferredConfirmations) Close() {
d.m.Lock()
defer d.m.Unlock()

for k, v := range d.confirmations {
v.Confirm(false)
v.setAck(false)
delete(d.confirmations, k)
}
}

// Confirm ack confirmation.
func (d *DeferredConfirmation) Confirm(ack bool) {
d.m.Lock()
defer d.m.Unlock()
// setAck sets the acknowledgement status of the confirmation. Note that it must
// not be called more than once.
func (d *DeferredConfirmation) setAck(ack bool) {
d.ack = ack
close(d.done)
}

d.confirmation.Ack = ack
d.cancel()
// Done returns the channel that can be used to wait for the publisher
// confirmation.
func (d *DeferredConfirmation) Done() <-chan struct{} {
return d.done
}

// Waits for publisher confirmation. Returns true if server successfully received the publishing.
func (d *DeferredConfirmation) Wait() bool {
<-d.ctx.Done()
// Acked returns the publisher confirmation in a non-blocking manner. It returns
// false if the confirmation was not acknowledged yet or received negative
// acknowledgement.
func (d *DeferredConfirmation) Acked() bool {
select {
case <-d.done:
default:
return false
}
return d.ack
}

d.m.Lock()
defer d.m.Unlock()
return d.confirmation.Ack
// Wait blocks until the publisher confirmation. It returns true if the server
// successfully received the publishing.
func (d *DeferredConfirmation) Wait() bool {
<-d.done
return d.ack
}

// WaitContext waits until the publisher confirmation. It returns true if the
// server successfully received the publishing. If the context expires before
// that, ctx.Err() is returned.
func (d *DeferredConfirmation) WaitContext(ctx context.Context) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
case <-d.done:
}
return d.ack, nil
}
Loading

0 comments on commit a733bce

Please sign in to comment.