Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Constant for consumer timeout queue argument #206

Merged
merged 2 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,27 @@ func ExampleChannel_QueueDeclare_classicQueueV2() {
false, // exclusive
false, // noWait
amqp.Table{
amqp.QueueTypeArg: amqp.QueueTypeClassic,
amqp.QueueVersionArg: 2,
},
)
log.Printf("Declared Classic Queue v2: %s", q.Name)
}

func ExampleChannel_QueueDeclare_consumerTimeout() {
conn, _ := amqp.Dial("amqp://localhost")
ch, _ := conn.Channel()
// this works only with RabbitMQ 3.12+
q, _ := ch.QueueDeclare(
"my-classic-queue-v2", // queue name
true, // durable
false, // auto-delete
false, // exclusive
false, // noWait
amqp.Table{
amqp.QueueTypeArg: amqp.QueueTypeQuorum, // also works with classic queues
amqp.ConsumerTimeoutArg: 600_000, // 10 minute consumer timeout
},
)
log.Printf("Declared Classic Queue v2: %s", q.Name)
}
3 changes: 2 additions & 1 deletion tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s *tlsServer) Serve(t *testing.T) {
}

func startTLSServer(t *testing.T, cfg *tls.Config) tlsServer {
l, err := tls.Listen("tcp", "127.0.0.1:0", cfg)
l, err := tls.Listen("tcp", "127.0.0.1:3456", cfg)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is necessary, but it seems to resolve flakes with tests in Windows container.

if err != nil {
t.Fatalf("TLS server Listen error: %+v", err)
}
Expand Down Expand Up @@ -105,6 +105,7 @@ func TestTLSHandshake(t *testing.T) {
c, err := DialTLS(srv.URL, tlsClientConfig(t))
if err != nil {
errs <- fmt.Errorf("expected to open a TLS connection, got err: %v", err)
return
}
defer c.Close()

Expand Down
45 changes: 32 additions & 13 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"time"
)

// DefaultExchange is the default direct exchange that binds every queue by its
// name. Applications can route to a queue using the queue name as routing key.
const DefaultExchange = ""

// Constants for standard AMQP 0-9-1 exchange types.
Expand Down Expand Up @@ -214,36 +216,48 @@ type Decimal struct {
// Most common queue argument keys in queue declaration. For a comprehensive list
// of queue arguments, visit [RabbitMQ Queue docs].
//
// QueueTypeArg queue argument is used to declare quorum and stream queues.
// Accepted values are QueueTypeClassic (default), QueueTypeQuorum and
// QueueTypeStream. [Quorum Queues] accept (almost) all queue arguments as their
// [QueueTypeArg] queue argument is used to declare quorum and stream queues.
// Accepted values are [QueueTypeClassic] (default), [QueueTypeQuorum] and
// [QueueTypeStream]. [Quorum Queues] accept (almost) all queue arguments as their
// Classic Queues counterparts. Check [feature comparison] docs for more
// information.
//
// Queues can define their [max length] using QueueMaxLenArg and
// QueueMaxLenBytesArg queue arguments. Overflow behaviour is set using
// QueueOverflowArg. Accepted values are QueueOverflowDropHead (default),
// QueueOverflowRejectPublish and QueueOverflowRejectPublishDLX.
// Queues can define their [max length] using [QueueMaxLenArg] and
// [QueueMaxLenBytesArg] queue arguments. Overflow behaviour is set using
// [QueueOverflowArg]. Accepted values are [QueueOverflowDropHead] (default),
// [QueueOverflowRejectPublish] and [QueueOverflowRejectPublishDLX].
//
// [Queue TTL] can be defined using QueueTTLArg. That is, the time-to-live for an
// unused queue. [Queue Message TTL] can be defined using QueueMessageTTLArg.
// This will set a time-to-live for **messages** in the queue.
// [Queue TTL] can be defined using [QueueTTLArg]. That is, the time-to-live for an
// unused queue. [Queue Message TTL] can be defined using [QueueMessageTTLArg].
// This will set a time-to-live for messages in the queue.
//
// [Stream retention] can be configured using StreamMaxLenBytesArg, to set the
// [Stream retention] can be configured using [StreamMaxLenBytesArg], to set the
// maximum size of the stream. Please note that stream queues always keep, at
// least, one segment. [Stream retention] can also be set using StreamMaxAgeArg,
// least, one segment. [Stream retention] can also be set using [StreamMaxAgeArg],
// to set time-based retention. Values are string with unit suffix. Valid
// suffixes are Y, M, D, h, m, s. E.g. "7D" for one week. The maximum segment
// size can be set using StreamMaxSegmentSizeBytesArg. The default value is
// size can be set using [StreamMaxSegmentSizeBytesArg]. The default value is
// 500_000_000 bytes ~= 500 megabytes
//
// Starting with RabbitMQ 3.12, consumer timeout can be configured as a queue
// argument. This is the timeout for a consumer to acknowledge a message. The
// value is the time in milliseconds. The timeout is evaluated periodically,
// at one minute intervals. Values lower than one minute are not supported.
// See the [consumer timeout] guide for more information.
//
// [Single Active Consumer] on quorum and classic queues can be configured
// using [SingleActiveConsumerArg]. This argument expects a boolean value. It is
// false by default.
//
// [RabbitMQ Queue docs]: https://rabbitmq.com/queues.html
// [Stream retention]: https://rabbitmq.com/streams.html#retention
// [max length]: https://rabbitmq.com/maxlength.html
// [Queue TTL]: https://rabbitmq.com/ttl.html#queue-ttl
// [Queue Message TTL]: https://rabbitmq.com/ttl.html#per-queue-message-ttl
// [Quorum Queues]: https://rabbitmq.com/quorum-queues.html
// [feature comparison]: https://rabbitmq.com/quorum-queues.html#feature-comparison
// [consumer timeout]: https://rabbitmq.com/consumers.html#acknowledgement-timeout
// [Single Active Consumer]: https://rabbitmq.com/consumers.html#single-active-consumer
const (
QueueTypeArg = "x-queue-type"
QueueMaxLenArg = "x-max-length"
Expand All @@ -256,6 +270,9 @@ const (
StreamMaxSegmentSizeBytesArg = "x-stream-max-segment-size-bytes"
// QueueVersionArg declares the Classic Queue version to use. Expects an integer, either 1 or 2.
QueueVersionArg = "x-queue-version"
// ConsumerTimeoutArg is available in RabbitMQ 3.12+ as a queue argument.
ConsumerTimeoutArg = "x-consumer-timeout"
SingleActiveConsumerArg = "x-single-active-consumer"
)

// Values for queue arguments. Use as values for queue arguments during queue declaration.
Expand All @@ -267,6 +284,8 @@ const (
// amqp.QueueMaxLenArg: 100,
// amqp.QueueTTLArg: 1800000,
// }
//
// Refer to [Channel.QueueDeclare] for more examples.
const (
QueueTypeClassic = "classic"
QueueTypeQuorum = "quorum"
Expand Down