Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example client can deadlock in Close due to unconsumed confirmations #122

Closed
Zerpet opened this issue Sep 30, 2022 Discussed in #121 · 0 comments · Fixed by #123
Closed

Example client can deadlock in Close due to unconsumed confirmations #122

Zerpet opened this issue Sep 30, 2022 Discussed in #121 · 0 comments · Fixed by #123
Assignees

Comments

@Zerpet
Copy link
Contributor

Zerpet commented Sep 30, 2022

Discussed in #121

Originally posted by mgdotson September 28, 2022
Using the document example running against a local docker container: docker run -p5672:5672 -p 15672:15672 -p5671:5671 -p15692:15692 rabbitmq:3.9.17-management, putting the server into a blocking situation causes the channel.Close() to hang and not finish.

Steps to reproduce:

  1. Start the container with docker run above
  2. Start the server code and watch a couple of message confirmations
  3. In the docker image, run: rabbitmqctl set_vm_memory_high_watermark 0.00000001
  4. Watch the server code display "Push didn't confirm. Retrying..." (let 4 or so pass)
  5. In the docker image, run: rabbitmqctl set_vm_memory_high_watermark 0.4
  6. Notice Push confirmations happen
  7. Context will time out and run the queue.Close()

At this point, the code will close the done channel but will hang on the queue.channel.Close() command.

Even if we wrap this in a context timeout to allow the calling code finish, if this is a long running process, this could cause leaks over time, especially if there are multiple channels that end up in this situation due to a blocked server.

This is also not the only scenario that can case a channel.Close() call to hang.

Best practices? TCP settings?

Analysis

During a memory alarm, RabbitMQ won't read from the publisher channel; therefore, it does not send a confirmation before the client example "gives up" on the confirmation:

select {
case confirm := <-client.notifyConfirm:
if confirm.Ack {
client.logger.Println("Push confirmed!")
return nil
}
case <-time.After(resendDelay):
}

The problem is that the client sends a new message and does not wait for any previous "given up" confirmation. This is not correct. The documentation of Channel.NotifyPublish(), which works very similar to Channel.NotifyConfirm(), states:

It's advisable to wait for all Confirmations to arrive before calling Channel.Close() or Connection.Close().

It is also advisable for the caller to consume from the channel returned till it is closed to avoid possible deadlocks

The current implementation of the example client, does, in fact, deadlock in the situation described in the repro steps, as one Go routine is trying to deliver a confirmation, grabs a lock on the confirms struct and sends a notification to a chan amqp.Confirm, which nobody is listening to. Then, during the close sequence, Channel.Close() tries to confirms.Close(), which blocks on acquiring a lock on the confirms struct. Because nobody is receiving on the chan amqp.Confirm, this is a deadlock.

Go routines dump (only relevant two):

Goroutine 5
  runtime.gopark (/Users/acedres/go/go1.18.6/src/runtime/proc.go:362)
  runtime.chansend (/Users/acedres/go/go1.18.6/src/runtime/chan.go:258)
  runtime.chansend1 (/Users/acedres/go/go1.18.6/src/runtime/chan.go:144)
  github.com/rabbitmq/amqp091-go.(*confirms).confirm (/Users/acedres/workspace/amqp091-go/confirms.go:56)
  github.com/rabbitmq/amqp091-go.(*confirms).One (/Users/acedres/workspace/amqp091-go/confirms.go:82)
  github.com/rabbitmq/amqp091-go.(*Channel).dispatch (/Users/acedres/workspace/amqp091-go/channel.go:336)
  github.com/rabbitmq/amqp091-go.(*Channel).recvMethod (/Users/acedres/workspace/amqp091-go/channel.go:373)
  github.com/rabbitmq/amqp091-go.(*Connection).dispatchN (/Users/acedres/workspace/amqp091-go/connection.go:545)
  github.com/rabbitmq/amqp091-go.(*Connection).demux (/Users/acedres/workspace/amqp091-go/connection.go:500)
  github.com/rabbitmq/amqp091-go.(*Connection).reader (/Users/acedres/workspace/amqp091-go/connection.go:600)
  github.com/rabbitmq/amqp091-go.Open.func1 (/Users/acedres/workspace/amqp091-go/connection.go:265)
  runtime.goexit (/Users/acedres/go/go1.18.6/src/runtime/asm_arm64.s:1270)
  created at: github.com/rabbitmq/amqp091-go.Open (/Users/acedres/workspace/amqp091-go/connection.go:265)

Goroutine 35
  runtime.gopark (/Users/acedres/go/go1.18.6/src/runtime/proc.go:362)
  runtime.goparkunlock (/Users/acedres/go/go1.18.6/src/runtime/proc.go:367)
  runtime.semacquire1 (/Users/acedres/go/go1.18.6/src/runtime/sema.go:144)
  sync.runtime_SemacquireMutex (/Users/acedres/go/go1.18.6/src/runtime/sema.go:71)
  sync.(*Mutex).lockSlow (/Users/acedres/go/go1.18.6/src/sync/mutex.go:162)
  sync.(*Mutex).Lock (/Users/acedres/go/go1.18.6/src/sync/mutex.go:81)
  github.com/rabbitmq/amqp091-go.(*confirms).Close (/Users/acedres/workspace/amqp091-go/confirms.go:105)
  github.com/rabbitmq/amqp091-go.(*Channel).shutdown.func1 (/Users/acedres/workspace/amqp091-go/channel.go:148)
  sync.(*Once).doSlow (/Users/acedres/go/go1.18.6/src/sync/once.go:68)
  sync.(*Once).Do (/Users/acedres/go/go1.18.6/src/sync/once.go:59)
  github.com/rabbitmq/amqp091-go.(*Channel).shutdown (/Users/acedres/workspace/amqp091-go/channel.go:102)
  github.com/rabbitmq/amqp091-go.(*Connection).shutdown.func1 (/Users/acedres/workspace/amqp091-go/connection.go:483)
  sync.(*Once).doSlow (/Users/acedres/go/go1.18.6/src/sync/once.go:68)
  sync.(*Once).Do (/Users/acedres/go/go1.18.6/src/sync/once.go:59)
  github.com/rabbitmq/amqp091-go.(*Connection).shutdown (/Users/acedres/workspace/amqp091-go/connection.go:453)
  github.com/rabbitmq/amqp091-go.(*Connection).send.func1 (/Users/acedres/workspace/amqp091-go/connection.go:433)
  runtime.goexit (/Users/acedres/go/go1.18.6/src/runtime/asm_arm64.s:1270)
  created at: github.com/rabbitmq/amqp091-go.(*Connection).send (/Users/acedres/workspace/amqp091-go/connection.go:433)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant