Skip to content

Commit

Permalink
Merge pull request #206 from rabbitmq/issue-201
Browse files Browse the repository at this point in the history
Constant for consumer timeout queue argument
  • Loading branch information
Zerpet authored Jun 21, 2023
2 parents e834fc5 + f23ccf7 commit 4f223e6
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 14 deletions.
19 changes: 19 additions & 0 deletions examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,27 @@ func ExampleChannel_QueueDeclare_classicQueueV2() {
false, // exclusive
false, // noWait
amqp.Table{
amqp.QueueTypeArg: amqp.QueueTypeClassic,
amqp.QueueVersionArg: 2,
},
)
log.Printf("Declared Classic Queue v2: %s", q.Name)
}

func ExampleChannel_QueueDeclare_consumerTimeout() {
conn, _ := amqp.Dial("amqp://localhost")
ch, _ := conn.Channel()
// this works only with RabbitMQ 3.12+
q, _ := ch.QueueDeclare(
"my-classic-queue-v2", // queue name
true, // durable
false, // auto-delete
false, // exclusive
false, // noWait
amqp.Table{
amqp.QueueTypeArg: amqp.QueueTypeQuorum, // also works with classic queues
amqp.ConsumerTimeoutArg: 600_000, // 10 minute consumer timeout
},
)
log.Printf("Declared Classic Queue v2: %s", q.Name)
}
3 changes: 2 additions & 1 deletion tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s *tlsServer) Serve(t *testing.T) {
}

func startTLSServer(t *testing.T, cfg *tls.Config) tlsServer {
l, err := tls.Listen("tcp", "127.0.0.1:0", cfg)
l, err := tls.Listen("tcp", "127.0.0.1:3456", cfg)
if err != nil {
t.Fatalf("TLS server Listen error: %+v", err)
}
Expand Down Expand Up @@ -105,6 +105,7 @@ func TestTLSHandshake(t *testing.T) {
c, err := DialTLS(srv.URL, tlsClientConfig(t))
if err != nil {
errs <- fmt.Errorf("expected to open a TLS connection, got err: %v", err)
return
}
defer c.Close()

Expand Down
45 changes: 32 additions & 13 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"time"
)

// DefaultExchange is the default direct exchange that binds every queue by its
// name. Applications can route to a queue using the queue name as routing key.
const DefaultExchange = ""

// Constants for standard AMQP 0-9-1 exchange types.
Expand Down Expand Up @@ -214,36 +216,48 @@ type Decimal struct {
// Most common queue argument keys in queue declaration. For a comprehensive list
// of queue arguments, visit [RabbitMQ Queue docs].
//
// QueueTypeArg queue argument is used to declare quorum and stream queues.
// Accepted values are QueueTypeClassic (default), QueueTypeQuorum and
// QueueTypeStream. [Quorum Queues] accept (almost) all queue arguments as their
// [QueueTypeArg] queue argument is used to declare quorum and stream queues.
// Accepted values are [QueueTypeClassic] (default), [QueueTypeQuorum] and
// [QueueTypeStream]. [Quorum Queues] accept (almost) all queue arguments as their
// Classic Queues counterparts. Check [feature comparison] docs for more
// information.
//
// Queues can define their [max length] using QueueMaxLenArg and
// QueueMaxLenBytesArg queue arguments. Overflow behaviour is set using
// QueueOverflowArg. Accepted values are QueueOverflowDropHead (default),
// QueueOverflowRejectPublish and QueueOverflowRejectPublishDLX.
// Queues can define their [max length] using [QueueMaxLenArg] and
// [QueueMaxLenBytesArg] queue arguments. Overflow behaviour is set using
// [QueueOverflowArg]. Accepted values are [QueueOverflowDropHead] (default),
// [QueueOverflowRejectPublish] and [QueueOverflowRejectPublishDLX].
//
// [Queue TTL] can be defined using QueueTTLArg. That is, the time-to-live for an
// unused queue. [Queue Message TTL] can be defined using QueueMessageTTLArg.
// This will set a time-to-live for **messages** in the queue.
// [Queue TTL] can be defined using [QueueTTLArg]. That is, the time-to-live for an
// unused queue. [Queue Message TTL] can be defined using [QueueMessageTTLArg].
// This will set a time-to-live for messages in the queue.
//
// [Stream retention] can be configured using StreamMaxLenBytesArg, to set the
// [Stream retention] can be configured using [StreamMaxLenBytesArg], to set the
// maximum size of the stream. Please note that stream queues always keep, at
// least, one segment. [Stream retention] can also be set using StreamMaxAgeArg,
// least, one segment. [Stream retention] can also be set using [StreamMaxAgeArg],
// to set time-based retention. Values are string with unit suffix. Valid
// suffixes are Y, M, D, h, m, s. E.g. "7D" for one week. The maximum segment
// size can be set using StreamMaxSegmentSizeBytesArg. The default value is
// size can be set using [StreamMaxSegmentSizeBytesArg]. The default value is
// 500_000_000 bytes ~= 500 megabytes
//
// Starting with RabbitMQ 3.12, consumer timeout can be configured as a queue
// argument. This is the timeout for a consumer to acknowledge a message. The
// value is the time in milliseconds. The timeout is evaluated periodically,
// at one minute intervals. Values lower than one minute are not supported.
// See the [consumer timeout] guide for more information.
//
// [Single Active Consumer] on quorum and classic queues can be configured
// using [SingleActiveConsumerArg]. This argument expects a boolean value. It is
// false by default.
//
// [RabbitMQ Queue docs]: https://rabbitmq.com/queues.html
// [Stream retention]: https://rabbitmq.com/streams.html#retention
// [max length]: https://rabbitmq.com/maxlength.html
// [Queue TTL]: https://rabbitmq.com/ttl.html#queue-ttl
// [Queue Message TTL]: https://rabbitmq.com/ttl.html#per-queue-message-ttl
// [Quorum Queues]: https://rabbitmq.com/quorum-queues.html
// [feature comparison]: https://rabbitmq.com/quorum-queues.html#feature-comparison
// [consumer timeout]: https://rabbitmq.com/consumers.html#acknowledgement-timeout
// [Single Active Consumer]: https://rabbitmq.com/consumers.html#single-active-consumer
const (
QueueTypeArg = "x-queue-type"
QueueMaxLenArg = "x-max-length"
Expand All @@ -256,6 +270,9 @@ const (
StreamMaxSegmentSizeBytesArg = "x-stream-max-segment-size-bytes"
// QueueVersionArg declares the Classic Queue version to use. Expects an integer, either 1 or 2.
QueueVersionArg = "x-queue-version"
// ConsumerTimeoutArg is available in RabbitMQ 3.12+ as a queue argument.
ConsumerTimeoutArg = "x-consumer-timeout"
SingleActiveConsumerArg = "x-single-active-consumer"
)

// Values for queue arguments. Use as values for queue arguments during queue declaration.
Expand All @@ -267,6 +284,8 @@ const (
// amqp.QueueMaxLenArg: 100,
// amqp.QueueTTLArg: 1800000,
// }
//
// Refer to [Channel.QueueDeclare] for more examples.
const (
QueueTypeClassic = "classic"
QueueTypeQuorum = "quorum"
Expand Down

0 comments on commit 4f223e6

Please sign in to comment.