Skip to content

Commit

Permalink
Unwind Consume function
Browse files Browse the repository at this point in the history
After #192, Consume was calling the new ConsumeWithContext function with
a context.Background context. Whilst this would work, it would also pile
up routines that would only return when the AMQP channel is closed. This
is unnecessary, and some code duplication is preferrable over piling up
blocked routines.

Some clarifications to function documentation, and added new paragraphs
to ConsumeWithContext function to explain the semantics.

Signed-off-by: Aitor Pérez Cedres <[email protected]>
  • Loading branch information
Zerpet committed Jun 20, 2023
1 parent 13dde10 commit 0bee96f
Showing 1 changed file with 99 additions and 5 deletions.
104 changes: 99 additions & 5 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -1085,21 +1085,109 @@ Inflight messages, limited by Channel.Qos will be buffered until received from
the returned chan.
When the Channel or Connection is closed, all buffered and inflight messages will
be dropped.
be dropped. RabbitMQ will requeue messages not acknowledged. In other words, dropped
messages in this way won't be lost.
When the consumer tag is cancelled, all inflight messages will be delivered until
the returned chan is closed.
*/
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
return ch.ConsumeWithContext(context.Background(), queue, consumer, autoAck, exclusive, noLocal, noWait, args)
// When we return from ch.call, there may be a delivery already for the
// consumer that hasn't been added to the consumer hash yet. Because of
// this, we never rely on the server picking a consumer tag for us.

if err := args.Validate(); err != nil {
return nil, err
}

if consumer == "" {
consumer = uniqueConsumerTag()
}

req := &basicConsume{
Queue: queue,
ConsumerTag: consumer,
NoLocal: noLocal,
NoAck: autoAck,
Exclusive: exclusive,
NoWait: noWait,
Arguments: args,
}
res := &basicConsumeOk{}

deliveries := make(chan Delivery)

ch.consumers.add(consumer, deliveries)

if err := ch.call(req, res); err != nil {
ch.consumers.cancel(consumer)
return nil, err
}

return deliveries, nil
}

/*
ConsumeWithContext immediately starts delivering queued messages.
This is similar to Consume() function but has different semantics.
The caller can cancel via the given context, then call ch.Cancel() and stop
receiving messages.
This function is similar to Channel.Consume, and accepts a context to control
consumer lifecycle. When the context passed to this function is canceled, the
consumer associated with the deliveries channel will be canceled too. When the
context passed to this function is cancelled, the deliveries channel will be closed.
An application is advised to keep on receiving messages from the delivery channel
until the channel is empty. This is specially important to avoid memory leaks from
unconsumed messages from the delivery channel.
Begin receiving on the returned chan Delivery before any other operation on the
Connection or Channel.
Continues deliveries to the returned chan Delivery until Channel.Cancel,
Connection.Close, Channel.Close, context is cancelled, or an AMQP exception
occurs. Consumers must range over the chan to ensure all deliveries are
received. Unreceived deliveries will block all methods on the same connection.
All deliveries in AMQP must be acknowledged. It is expected of the consumer to
call Delivery.Ack after it has successfully processed the delivery. If the
consumer is cancelled or the channel or connection is closed any unacknowledged
deliveries will be requeued at the end of the same queue.
The consumer is identified by a string that is unique and scoped for all
consumers on this channel. If you wish to eventually cancel the consumer, use
the same non-empty identifier in Channel.Cancel. An empty string will cause
the library to generate a unique identity. The consumer identity will be
included in every Delivery in the ConsumerTag field
When autoAck (also known as noAck) is true, the server will acknowledge
deliveries to this consumer prior to writing the delivery to the network. When
autoAck is true, the consumer should not call Delivery.Ack. Automatically
acknowledging deliveries means that some deliveries may get lost if the
consumer is unable to process them after the server delivers them.
See http://www.rabbitmq.com/confirms.html for more details.
When exclusive is true, the server will ensure that this is the sole consumer
from this queue. When exclusive is false, the server will fairly distribute
deliveries across multiple consumers.
The noLocal flag is not supported by RabbitMQ.
It's advisable to use separate connections for Channel.Publish and
Channel.Consume so not to have TCP pushback on publishing affect the ability to
consume messages, so this parameter is here mostly for completeness.
When noWait is true, do not wait for the server to confirm the request and
immediately begin deliveries. If it is not possible to consume, a channel
exception will be raised and the channel will be closed.
Optional arguments can be provided that have specific semantics for the queue
or server.
Inflight messages, limited by Channel.Qos will be buffered until received from
the returned chan.
When the Channel or Connection is closed, all buffered and inflight messages will
be dropped. RabbitMQ will requeue messages not acknowledged. In other words, dropped
messages in this way won't be lost.
*/
func (ch *Channel) ConsumeWithContext(ctx context.Context, queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
// When we return from ch.call, there may be a delivery already for the
Expand All @@ -1125,6 +1213,12 @@ func (ch *Channel) ConsumeWithContext(ctx context.Context, queue, consumer strin
}
res := &basicConsumeOk{}

select {
default:
case <-ctx.Done():
return nil, ctx.Err()
}

deliveries := make(chan Delivery)

ch.consumers.add(consumer, deliveries)
Expand Down

0 comments on commit 0bee96f

Please sign in to comment.