Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Channel.ConsumeWithContext to be able to cancel delivering #192

Merged
merged 3 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,8 +1089,70 @@ be dropped.

When the consumer tag is cancelled, all inflight messages will be delivered until
the returned chan is closed.

Deprecated: Use ConsumeWithContext instead.
*/
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
return ch.ConsumeWithContext(context.Background(), queue, consumer, autoAck, exclusive, noLocal, noWait, args)
}

/*
ConsumeWithContext immediately starts delivering queued messages.

Begin receiving on the returned chan Delivery before any other operation on the
Connection or Channel.

Continues deliveries to the returned chan Delivery until Channel.Cancel,
Connection.Close, Channel.Close, or an AMQP exception occurs. Consumers must
range over the chan to ensure all deliveries are received. Unreceived
deliveries will block all methods on the same connection.

All deliveries in AMQP must be acknowledged. It is expected of the consumer to
call Delivery.Ack after it has successfully processed the delivery. If the
consumer is cancelled or the channel or connection is closed any unacknowledged
deliveries will be requeued at the end of the same queue.

The consumer is identified by a string that is unique and scoped for all
consumers on this channel. If you wish to eventually cancel the consumer, use
the same non-empty identifier in Channel.Cancel. An empty string will cause
the library to generate a unique identity. The consumer identity will be
included in every Delivery in the ConsumerTag field

When autoAck (also known as noAck) is true, the server will acknowledge
deliveries to this consumer prior to writing the delivery to the network. When
autoAck is true, the consumer should not call Delivery.Ack. Automatically
acknowledging deliveries means that some deliveries may get lost if the
consumer is unable to process them after the server delivers them.
See http://www.rabbitmq.com/confirms.html for more details.

When exclusive is true, the server will ensure that this is the sole consumer
from this queue. When exclusive is false, the server will fairly distribute
deliveries across multiple consumers.

The noLocal flag is not supported by RabbitMQ.

It's advisable to use separate connections for
Channel.Publish and Channel.Consume so not to have TCP pushback on publishing
affect the ability to consume messages, so this parameter is here mostly for
completeness.

When noWait is true, do not wait for the server to confirm the request and
immediately begin deliveries. If it is not possible to consume, a channel
exception will be raised and the channel will be closed.

Optional arguments can be provided that have specific semantics for the queue
or server.

Inflight messages, limited by Channel.Qos will be buffered until received from
the returned chan.

When the Channel or Connection is closed, all buffered and inflight messages will
be dropped.

When the consumer tag is cancelled, all inflight messages will be delivered until
the returned chan is closed.
*/
func (ch *Channel) ConsumeWithContext(ctx context.Context, queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
// When we return from ch.call, there may be a delivery already for the
// consumer that hasn't been added to the consumer hash yet. Because of
// this, we never rely on the server picking a consumer tag for us.
Expand Down Expand Up @@ -1123,6 +1185,17 @@ func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal,
return nil, err
}

go func() {
select {
case <-ch.consumers.closed:
return
case <-ctx.Done():
if ch != nil {
_ = ch.Cancel(consumer, false)
}
}
}()

return deliveries, nil
}

Expand Down
44 changes: 44 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package amqp091

import (
"bytes"
"context"
devrand "crypto/rand"
"encoding/binary"
"fmt"
Expand Down Expand Up @@ -819,6 +820,49 @@ func TestIntegrationConsumeCancel(t *testing.T) {
}
}

func TestIntegrationConsumeCancelWithContext(t *testing.T) {
queue := "test.integration.consume-cancel-with-context"

c := integrationConnection(t, "pub")

if c != nil {
defer c.Close()

ch, _ := c.Channel()

if _, e := ch.QueueDeclare(queue, false, true, false, false, nil); e != nil {
t.Fatalf("error declaring queue %s: %v", queue, e)
}

defer integrationQueueDelete(t, ch, queue)

ctx, cancel := context.WithCancel(context.Background())
messages, _ := ch.ConsumeWithContext(ctx, queue, "integration-tag-with-context", false, false, false, false, nil)

if e := ch.Publish("", queue, false, false, Publishing{Body: []byte("1")}); e != nil {
t.Fatalf("error publishing: %v", e)
}

assertConsumeBody(t, messages, []byte("1"))

cancel()
<-time.After(100 * time.Millisecond) // wait to call cancel asynchronously

if e := ch.Publish("", queue, false, false, Publishing{Body: []byte("2")}); e != nil {
t.Fatalf("error publishing: %v", e)
}

select {
case <-time.After(100 * time.Millisecond):
t.Fatalf("Timeout on Close")
case _, ok := <-messages:
if ok {
t.Fatalf("Extra message on consumer when consumer should have been closed")
}
}
}
}

func (c *Connection) Generate(_ *rand.Rand, _ int) reflect.Value {
urlStr := amqpURL

Expand Down