Skip to content

Commit

Permalink
Fix example client to avoid deadlock in Close (#123)
Browse files Browse the repository at this point in the history
* Fix example client to avoid deadlock in Close

The example was "giving up" on publish confirmation after the
resendDelay. This is not correct because the library will keep track of
this confirmation, and it will try to deliver the confirmation. By
giving up, we are may leave confirmations un-received in the
confirmation channel, which will cause a deadlock during the shut down
sequence in Channel.Close.

We should not give up on the confirmation and simply wait.

Signed-off-by: Aitor Pérez Cedres <[email protected]>

* Fix golangci-lint error

Signed-off-by: Aitor Pérez Cedres <[email protected]>
Co-authored-by: Luke Bakken <[email protected]>
  • Loading branch information
Zerpet and lukebakken authored Oct 1, 2022
1 parent 048b5b2 commit 86bd795
Showing 1 changed file with 6 additions and 12 deletions.
18 changes: 6 additions & 12 deletions example_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func Example_consume() {
}

// This channel will receive a notification when a channel closed event
// happens. This must be different than Client.notifyChanClose because the
// happens. This must be different from Client.notifyChanClose because the
// library sends only one notification and Client.notifyChanClose already has
// a receiver in handleReconnect().
// Recommended to make it buffered to avoid deadlocks
Expand Down Expand Up @@ -273,8 +273,6 @@ func (client *Client) changeChannel(channel *amqp.Channel) {
}

// Push will push data onto the queue, and wait for a confirm.
// If no confirms are received until within the resendTimeout,
// it continuously re-sends messages until a confirm is received.
// This will block until the server sends a confirm. Errors are
// only returned if the push action itself fails, see UnsafePush.
func (client *Client) Push(data []byte) error {
Expand All @@ -292,15 +290,11 @@ func (client *Client) Push(data []byte) error {
}
continue
}
select {
case confirm := <-client.notifyConfirm:
if confirm.Ack {
client.logger.Println("Push confirmed!")
return nil
}
case <-time.After(resendDelay):
confirm := <-client.notifyConfirm
if confirm.Ack {
client.logger.Printf("Push confirmed [%d]!", confirm.DeliveryTag)
return nil
}
client.logger.Println("Push didn't confirm. Retrying...")
}
}

Expand Down Expand Up @@ -357,7 +351,7 @@ func (client *Client) Consume() (<-chan amqp.Delivery, error) {
)
}

// Close will cleanly shutdown the channel and connection.
// Close will cleanly shut down the channel and connection.
func (client *Client) Close() error {
if !client.isReady {
return errAlreadyClosed
Expand Down

0 comments on commit 86bd795

Please sign in to comment.