Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DeferredConfirmation.Wait() can hang indefinitely #182

Closed
ghost opened this issue Mar 15, 2023 · 7 comments · Fixed by #183
Closed

DeferredConfirmation.Wait() can hang indefinitely #182

ghost opened this issue Mar 15, 2023 · 7 comments · Fixed by #183
Assignees
Milestone

Comments

@ghost
Copy link

ghost commented Mar 15, 2023

I've noticed that DeferredConfirmation.Wait() can hang indefinitely under some circumstances. Some experimentation has revealed that RabbitMQ is receiving the published message (and delivering it to any queues/consumers), but the publishing channel does not always get a confirmation.

I have not yet been able to isolate whether RabbitMQ is indeed not confirming the message, or whether I might have found an issue in this library that is only revealed under particular circumstances. I have a packet capture which would likely reveal this, but I am not knowledgeable enough to analyze it fruitfully.

This behaviour can be reproduced by creating multiple channels on a single connection, each in confirm mode, and proceeding to publish rapidly on each of them. I'm publishing using Channel.PublishWithDeferredConfirmWithContext(), followed by Wait() on the returned DeferredConfirmation.

I'm aware that I can pass a context with a timeout to work around this, and indeed that's the only thing that reliably works. However, I then have to deal with the possibility of a message getting published multiple times (when I retry).

Things that have not worked:

  • Creating a separate connection for publishers and consumers
  • Using Channel.NotifyPublish in addition to DeferredConfirmation.Wait()

Things that do work:

  • Limiting myself to a single channel for publishing
  • Avoid using DeferredConfirmation.Wait() and only use Channel.NotifyPublish()
  • Giving up the psychological need for confirmation and assume it works

I have a program I've used to demonstrate the issue. The default settings usually cause at least one indefinite hang on DeferredConfirmation.Wait(), or at least it has done so using at least 2 servers I've tried (one Docker container, one installed via package manager).

Code:
package main

import (
	"context"
	"flag"
	"log"
	"os"
	"strconv"
	"sync"
	"sync/atomic"
	"time"

	"github.com/rabbitmq/amqp091-go"
)

func quitOn(err error) {
	if err != nil {
		log.Printf("%T: %v", err, err)
		os.Exit(1)
	}
}

func main() {
	var serverURL string
	var messageCount int
	var exchangeName string
	var routingKey string
	var publisherCount int
	var bufferSize int
	var useNotifyPublish bool
	var skipWait bool

	// Load Variables
	flag.StringVar(&serverURL, "url", "amqp://guest:guest@localhost/", "AMQP Server URL")
	flag.IntVar(&messageCount, "n", 10000, "Number of messages")
	flag.StringVar(&exchangeName, "exchange", "amq.direct", "Exchange for publishing")
	flag.StringVar(&routingKey, "routing-key", "foo.bar.baz", "Routing Key for publishing")
	flag.IntVar(&publisherCount, "publishers", 10, "Number of publishers")
	flag.IntVar(&bufferSize, "buffer-size", 100, "Internal pre-publisher chan buffer size")
	flag.BoolVar(&useNotifyPublish, "notify-publish", false, "Use Channel.NotifyPublish()")
	flag.BoolVar(&skipWait, "skip-wait", false, "Skip DeferredConfirmation.Wait()")
	pHelp := flag.Bool("h", false, "Show Help")
	flag.Parse()
	if *pHelp {
		flag.Usage()
		os.Exit(0)
	}

	// Connect to the server (one for publish, one for consume)
	log.Printf("Connecting to %s\n", serverURL)
	connPub, err := amqp091.Dial(serverURL)
	quitOn(err)
	defer connPub.Close()
	connCon, err := amqp091.Dial(serverURL)
	quitOn(err)
	defer connCon.Close()

	// Chan for publishers to receive from
	toSend := make(chan int, bufferSize)

	// Create a WaitGroup for publishers/consumers to finish their work
	wg := new(sync.WaitGroup)

	// Declare and bind a Queue (default direct queue)
	consumer, err := connCon.Channel()
	quitOn(err)
	queue, err := consumer.QueueDeclare("", false, true, true, false, nil)
	quitOn(err)
	err = consumer.QueueBind(queue.Name, routingKey, exchangeName, false, nil)
	quitOn(err)

	// Consume the Queue
	wg.Add(1)
	go func() {
		defer wg.Done()
		messages, err := consumer.Consume(queue.Name, "", true, true, false, false, nil)
		quitOn(err)
		deliveryCount := 0
		for range messages {
			deliveryCount++
			if deliveryCount == messageCount {
				break
			}
		}
		log.Printf("Consumer: %d messages received\n", messageCount)
		if err := consumer.Close(); err != nil {
			log.Printf("Consumer: %s\n", err)
		}
	}()

	// Keep track of total confirms on NotifyPublish
	totalNotifyConfirms := new(atomic.Int32)

	// Create all the publishers
	for i := 0; i < publisherCount; i++ {
		pubNumber := i + 1
		pub, err := connPub.Channel()
		quitOn(err)
		err = pub.Confirm(false)
		quitOn(err)
		chConfirm := make(chan amqp091.Confirmation)
		// It doesn't seem to matter whether we use this or not!
		if useNotifyPublish {
			pub.NotifyPublish(chConfirm)
			wg.Add(1)
			go func() {
				defer wg.Done()
				confirmCount := 0
				for range chConfirm {
					confirmCount += 1
				}
				log.Printf("Publisher %d got %d confirms (NotifyConfirm)\n", pubNumber, confirmCount)
				totalNotifyConfirms.Add(int32(confirmCount))
			}()
		}
		wg.Add(1)
		go func() {
			defer wg.Done()
			okCount := 0
			errCount := 0
			for val := range toSend {
				msg := amqp091.Publishing{
					ContentType:     "text/plain",
					ContentEncoding: "ascii",
					Body:            []byte(strconv.Itoa(val)),
				}
				confirm, err := pub.PublishWithDeferredConfirmWithContext(
					context.Background(),
					exchangeName,
					routingKey,
					false,
					false,
					msg,
				)
				quitOn(err)
				if !skipWait {
					wasOK := confirm.Wait()
					if !wasOK {
						log.Printf("Value %d was not confirmed\n", val)
						errCount += 1
					} else {
						okCount += 1
					}
				}
			}
			log.Printf("Publisher %d done (%d OK, %d err)\n", pubNumber, okCount, errCount)
			time.Sleep(time.Second) // Wait for confirmations a bit.
			if err := pub.Close(); err != nil {
				log.Printf("Publisher %d: %s\n", pubNumber, err)
			}
		}()
	}

	log.Printf("Pushing %d messages to %d confirming publishers\n", messageCount, publisherCount)
	// Push all the messages to the publishers
	for i := 1; i <= messageCount; i++ {
		toSend <- i
	}
	close(toSend)

	log.Printf("Done sending, waiting for publishers/consumers")
	wg.Wait()
	if useNotifyPublish {
		log.Printf("Confirmations using NotifyPublish(): %d\n", totalNotifyConfirms.Load())
	}
}
@lukebakken
Copy link
Contributor

lukebakken commented Mar 15, 2023

RabbitMQ is indeed not confirming the message

Extremely unlikely!

I might have found an issue in this library that is only revealed under particular circumstances

Much more likely! Thanks for the code and detailed information. We'll take a look.

@lukebakken lukebakken added this to the 1.8.0 milestone Mar 15, 2023
@ghost
Copy link
Author

ghost commented Mar 15, 2023

I've found that, if I add a log.Printf() on line 149 in confirms.go, that the if block which should not be entered, is in fact entered when I get a hanging DeferredConfirmation.Wait(). (Sorry, I don't know how to reference code very well here in GitHub - the line number might be off by one due to the extra "log" import in my copy?).

I'm wondering whether the confirmation message is somehow getting dispatched to the wrong channel, causing the confirmation on the intended channel to never get the confirmation?

Here's what I changed:

func (d *deferredConfirmations) Confirm(confirmation Confirmation) {
	d.m.Lock()
	defer d.m.Unlock()

	dc, found := d.confirmations[confirmation.DeliveryTag]
	if !found {
		log.Printf("Confirmed unpublished tag: %d\n", confirmation.DeliveryTag)
		// We should never receive a confirmation for a tag that hasn't
		// been published, but a test causes this to happen.
		return
	}
	dc.setAck(confirmation.Ack)
	delete(d.confirmations, confirmation.DeliveryTag)
}

Or perhaps more helpful, when I panic instead of merely logging the output, I get the following:

go run main.go 
2023/03/16 00:00:58 Connecting to amqp://guest:guest@localhost/
2023/03/16 00:00:58 Pushing 10000 messages to 10 confirming publishers
panic: Confirmed unpublished tag: 53


goroutine 21 [running]:
github.com/rabbitmq/amqp091-go.(*deferredConfirmations).Confirm(0xc0000a03c0, {0x13f?, 0x36?})
	/home/dec/projects/amqp091-go/confirms.go:150 +0x136
github.com/rabbitmq/amqp091-go.(*confirms).One(0xc0000b64b0, {0x0?, 0x58?})
	/home/dec/projects/amqp091-go/confirms.go:81 +0x94
github.com/rabbitmq/amqp091-go.(*Channel).dispatch(0xc0000c26c0, {0x6a4e40, 0xc00027d5c0})
	/home/dec/projects/amqp091-go/channel.go:352 +0x71e
github.com/rabbitmq/amqp091-go.(*Channel).recvMethod(0xc0000c26c0, {0x6a45c0?, 0xc00027a780})
	/home/dec/projects/amqp091-go/channel.go:389 +0x146
github.com/rabbitmq/amqp091-go.(*Connection).dispatchN(0xc0001a2000, {0x6a45c0?, 0xc00027a780?})
	/home/dec/projects/amqp091-go/connection.go:615 +0x1d2
github.com/rabbitmq/amqp091-go.(*Connection).demux(0xc00005ef28?, {0x6a45c0, 0xc00027a780})
	/home/dec/projects/amqp091-go/connection.go:566 +0x5b
github.com/rabbitmq/amqp091-go.(*Connection).reader(0xc0001a2000, {0x7f4243495868?, 0xc0000b2070?})
	/home/dec/projects/amqp091-go/connection.go:670 +0x23d
created by github.com/rabbitmq/amqp091-go.Open
	/home/dec/projects/amqp091-go/connection.go:268 +0x34c
exit status 2

@ghost
Copy link
Author

ghost commented Mar 17, 2023

After some more determined testing, my hypothesis about messages dispatched to incorrect channels was false.

It appears to be a timing issue, in which the goroutine doing publishing gets interrupted between the sending the basicPublish (ch.send) and generating the DeferredConfirmation (ch.confirms.Publish()) when publishing.

My fix creates the DeferredConfirmation in advance, and rolls it back on error.

The only alternative I can think of would be to lock the reading part / message dispatch to the channel until the publish / confirmation creating is done.

In retrospect, I must admit that the code which can reproduce the error above was always tested against a local instance (although on different machines).

This issue is probably even less likely to come up in production instances (I suppose?) due to network latency.

@lukebakken
Copy link
Contributor

Thank you @calloway-jacob! I've been working on customer issues, and will review this soon. Have a great weekend ☘️

@lukebakken
Copy link
Contributor

@calloway-jacob that's interesting. So the publish/confirmation happens so fast it "beats" the call to ch.confirms.Publish()? Makes sense to me.

@ghost
Copy link
Author

ghost commented Mar 20, 2023

@lukebakken That's what appears to be happening. I determined this by putting some log statements in a few places and passing the channel id into the confirmations code to test the hypothesis of channel mismatch was true (thankfully, it wasn't). With the logging (erased of course in the commit) I was able to see that sometimes the confirm would be called before the ch.confirms.Publish().

@lukebakken
Copy link
Contributor

@calloway-jacob I wish every OSS user took the time like you did to dig into an issue. Thanks a lot.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants