Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-death count not incremented when message expired #10709

Closed
bmflynn opened this issue Mar 8, 2024 · 14 comments
Closed

x-death count not incremented when message expired #10709

bmflynn opened this issue Mar 8, 2024 · 14 comments
Assignees
Labels
Milestone

Comments

@bmflynn
Copy link

bmflynn commented Mar 8, 2024

Describe the bug

When a message with an expiration and an existing x-death (same queue and reason) is re-published to a classic queue with a dead-letter exchange the x-death['count'] is not incremented.

Reproduction steps

import os
import time
from collections import namedtuple
from pprint import pprint

from pika import BasicProperties, BlockingConnection, URLParameters

Msg = namedtuple("Msg", ["method", "properties", "body"])

amqpurl = os.environ["AMQPURL"]
exchange = "amq.topic"

start_queue = "start_queue"
dlx_exchange = "errors"
error_queue = "dead_letters"
routing_key = "test_retries"

data = "XXX"

conn = BlockingConnection(URLParameters(amqpurl))
ch = conn.channel()

ch.exchange_declare(dlx_exchange, "topic")
ch.queue_declare(error_queue)
ch.queue_bind(error_queue, dlx_exchange, "#")

ch.queue_declare(start_queue, arguments={"x-dead-letter-exchange": dlx_exchange})
ch.queue_bind(start_queue, exchange, routing_key)

print(f"first publish {exchange=} {routing_key=} {data=}")
ch.basic_publish(exchange, routing_key, data, properties=BasicProperties(expiration="1000"))


# Wait for message to error out
print("waiting")
time.sleep(5)

first = Msg(*ch.basic_get(error_queue))
ch.basic_ack(first.method.delivery_tag)
assert first.method is not None, f"No message available {error_queue=}"
assert len(first.properties.headers["x-death"]) == 1
pprint(first.properties.headers)
assert first.properties.headers["x-death"][0]["count"] == 1

print("Second publish")
# Publish the message data again with the x-death headers having count=1
first.properties.expiration = "1000"
ch.basic_publish(exchange, routing_key, data, properties=first.properties)

# Wait for message to expire again
print("waiting")
time.sleep(5)
second = Msg(*ch.basic_get(error_queue))
ch.basic_ack(second.method.delivery_tag)
assert len(second.properties.headers["x-death"]) == 1
pprint(second.properties.headers)

ch.close()
conn.close()

Expected behavior

x-death['count'] should be incremented for each death with matching reason and queue per docs.

Additional context

I tested on versions 3.9, 3.10, 3.11, 3.12, & 3.13 using the rabbitmq:<ver>-management docker image and it only fails to increment on 3.13.

@bmflynn bmflynn added the bug label Mar 8, 2024
@kjnilsson
Copy link
Contributor

did you enable the message_containers feature flag?

@bmflynn
Copy link
Author

bmflynn commented Mar 8, 2024

Not explicitly, but it does show as "Enabled" in the management interface.

@lukebakken
Copy link
Collaborator

@ansd and @kjnilsson discussed this internally and have found where the regression was introduced.

@bmflynn
Copy link
Author

bmflynn commented Mar 9, 2024 via email

@kjnilsson kjnilsson added this to the 3.13.1 milestone Mar 10, 2024
@kjnilsson
Copy link
Contributor

@bmflynn could you possibly explain a bit how you are using this feature? Collaborative editing of x- headers is a bit odd in my view. x- headers "belong" to the broker and using it this way feels somewhat odd.

It is very unlikely we'd provide the same feature for other protocols such as AMQP (1.0)

@bmflynn
Copy link
Author

bmflynn commented Mar 11, 2024

Dead-lettering and the x-death['count'] value are used as part of a retry mechanism where a message with expiration is retried until x-death['count'] is greater than the maximum number of retries.

Essentially, when message handling fails the message properties (including headers) and data are published with a message expiration via an error exchange to a queue with the dead-letter exchange set to the original exchange. The message will remain in this queue to expire at which point it gets routed back to the original exchange and is retried. This mechanism continues until the original message is handled successfully or x-death['count'] reaches the configured maximum number of retries.

There is no editing of the x-headers other than the fact that they are part of the original message properties that are used as part of a new message that is sent to an error exchange.

What is the specific feature you are referring to? Do you mean it's unlikely that a death count, or the number of times a message is dead-lettered, will be available in AMQP 1.0?

@ansd
Copy link
Member

ansd commented Mar 14, 2024

@bmflynn Starting with "message containers" in 3.13, we would like to treat any x- headers as belonging to the broker, meaning a client shouldn't publish messages with x- headers. (If a client does publish with x- headers, these headers won't be interpreted in any special way anymore as illustrated by your example).

You are right that this is a behavioural change compared to 3.12 and arguably a breaking change belonging to 4.0.
So, we could "fix" / revert to the 3.12 behaviour by having the broker parse incoming headers and therefore interpreting the x-death header coming from a publisher.
However, we very likely would remove such special interpretation again in RabbitMQ 4.0 (which probably ships end of this year).
Therefore, my question is: Could your client set a custom non x- header and just increment it itself when it re-publishes the message?

Here is an example based on yours:

Code example with custom header
import os
import time
from collections import namedtuple
from pprint import pprint

from pika import BasicProperties, BlockingConnection, URLParameters

Msg = namedtuple("Msg", ["method", "properties", "body"])

amqpurl = "amqp://localhost:5672"
exchange = "amq.topic"

start_queue = "start_queue"
dlx_exchange = "errors"
error_queue = "dead_letters"
routing_key = "test_retries"

data = "XXX"

conn = BlockingConnection(URLParameters(amqpurl))
ch = conn.channel()

ch.exchange_declare(dlx_exchange, "topic")
ch.queue_declare(error_queue)
ch.queue_bind(error_queue, dlx_exchange, "#")

ch.queue_declare(start_queue, arguments={"x-dead-letter-exchange": dlx_exchange})
ch.queue_bind(start_queue, exchange, routing_key)

print(f"first publish {exchange=} {routing_key=} {data=}")
ch.basic_publish(exchange, routing_key, data, properties=BasicProperties(expiration="500"))

# Wait for message to error out
print("waiting\n")
time.sleep(1)

first = Msg(*ch.basic_get(error_queue))
ch.basic_ack(first.method.delivery_tag)
print("Received:")
pprint(first.properties.headers)

print("Second publish")
Key = "my-retry-count"
# Increment the retry count if it exists
first.properties.headers[Key] = first.properties.headers.get(Key, 0) + 1
first.properties.expiration = "500"
ch.basic_publish(exchange, routing_key, data, properties=first.properties)

# Wait for message to expire again
print("waiting\n")
time.sleep(1)
second = Msg(*ch.basic_get(error_queue))
ch.basic_ack(second.method.delivery_tag)
print("Received:")
pprint(second.properties.headers)

print("Third publish")
first.properties.headers[Key] = first.properties.headers.get(Key, 0) + 1
first.properties.expiration = "500"
ch.basic_publish(exchange, routing_key, data, properties=first.properties)

# Wait for message to expire again
print("waiting\n")
time.sleep(1)
second = Msg(*ch.basic_get(error_queue))
ch.basic_ack(second.method.delivery_tag)
print("Received:")
pprint(second.properties.headers)

ch.close()
conn.close()

@bmflynn
Copy link
Author

bmflynn commented Mar 14, 2024

@ansd Absolutely. I've made the change and our retries seem to be passing the same tests.

I think this issue is closable from my perspective. Thanks!

@ansd ansd closed this as completed Mar 14, 2024
@dvovney
Copy link

dvovney commented Apr 18, 2024

I'd like to emphasize that the official Spring Framework documentation regarding the RabbitMQ binder suggests utilizing the x-death header as the primary concept for implementing retry mechanisms in services.

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-rabbit/3.0.6.RELEASE/reference/html/spring-cloud-stream-binder-rabbit.html#_retry_with_the_rabbitmq_binder

The potential impact of this might come as a significant surprise to numerous developers once their existing retry implementations cease to function.

ansd added a commit that referenced this issue Apr 19, 2024
Fixes #10709

This commit fixes the following regression which worked in 3.12.x, but
stopped working in 3.13.0 and 3.13.1:

```
AMQP 0.9.1 client    --publish-->
Q                    --dead-letter-->
DLQ                  --consume-->
AMQP 0.9.1 client (death count is now 1) --republish-same-message-with-headers-as-just-received-->
Q                    --dead-letter-->
DLQ                  --consume -->
AMQP 0.9.1 (death count is now 1, but should be 2)
```

The reason this behaviour stopped to work in 3.13.0 is that the broker
won't specially interpret x-headers in general, and the x-death header
specifically in this case anymore.

In other words, the new desired 3.13 behaviour with message containers
is that "x-headers belong to the broker".

While this is correct, it does break client applications which depended
on the previous use case.
One simple fix is that the client application does not re-publish with
the x-death header, but instead sets its own custom count header to
determine the number of times it retries.

This commit will only be packported to v3.13.x branch.
In other words, 4.0 won't interpret x-headers as done in 3.13.0 and
3.13.1.

The reason we backport this commit to v3.13.x is that the Spring
documentation expliclity recommends re-publishing the message with
x-death header being set:
https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-rabbit/3.0.6.RELEASE/reference/html/spring-cloud-stream-binder-rabbit.html#_retry_with_the_rabbitmq_binder
@ansd
Copy link
Member

ansd commented Apr 19, 2024

@dvovney where exactly does the Spring Framework documentation suggests that a client should re-publish a message containing the x-death header?

@dvovney
Copy link

dvovney commented Apr 26, 2024

@dvovney where exactly does the Spring Framework documentation suggests that a client should re-publish a message containing the x-death header?

I've provide a link,

The loop continue without end, which is fine for transient problems, but you may want to give up after some number of attempts. Fortunately, RabbitMQ provides the x-death header, which lets you determine how many cycles have occurred.

To acknowledge a message after giving up, throw an ImmediateAcknowledgeAmqpException.

Main idea is to rely on x-death header to count how many time message handling has been failed.
While throwing exception during message handling, message end up in DLQ and it's expected that rabbit increase this counter so we could count how many attempts there were already

@michaelklishin
Copy link
Member

@dvovney in the light of a discussion in spring-cloud/spring-cloud-stream#2939 this is does not seem to be as relevant either way but

RabbitMQ provides the x-death header, which lets you determine how many cycles have occurred

specifically hints that RabbitMQ provides the header, not that applications should or even can use it. It has never been the goal to make applications use that header, and no guarantees that such behavior will never change.

AMQP 1.0 separates application metadata from "all other" metadata, possibly in fact for such reasons.

@cressie176
Copy link

cressie176 commented May 28, 2024

@michaelklishin

specifically hints that RabbitMQ provides the header, not that applications should or even can use it.

We disagree. The spring documentation clearly and explicitly recommends the opposite; that you can use the x-death count attribute to conditionally decide to retry or abort after a number of attempts. See below...

The loop continue without end, which is fine for transient problems, but you may want to give up after some number of attempts. Fortunately, RabbitMQ provides the x-death header, which lets you determine how many cycles have occurred

It has never been the goal to make applications use that header

Does the RabbitMQ documentation state that applications should not rely on all or certain x-headers added to the message by the broker? My understanding is that the x- simply indicates headers that are RabbitMQ extensions to the amqp specification, rather than being for RabbitMQ's internal use only. Why would the RabbitMQ documentation go to such lengths to document features users were not expected to rely on?

and no guarantees that such behavior will never change.

Just because there are no guarantees that the behaviour won't change, does not mean the behavior should be broken without due consideration or warning.

in the light of a discussion in spring-cloud/spring-cloud-stream#2939 this is does not seem to be as relevant either way but

Again we disagree. I would argue the issue is even more relevant. spring-cloud/spring-cloud-stream#2939 is still open and shows how disrupting this breaking change has been.

ansd added a commit that referenced this issue May 28, 2024
Fixes #10709
Fixes #11331

This commit fixes the following regression which worked in 3.12.x, but
stopped working in 3.13.0 - 3.13.3:

```
AMQP 0.9.1 client    --publish-->
Q                    --dead-letter-->
DLQ                  --consume-->
AMQP 0.9.1 client (death count is now 1) --republish-same-message-with-headers-as-just-received-->
Q                    --dead-letter-->
DLQ                  --consume -->
AMQP 0.9.1 (death count is now 1, but should be 2)
```

The reason this behaviour stopped to work in 3.13.0 is that the broker
won't specially interpret x-headers in general, and the x-death header
specifically in this case anymore.

In other words, the new desired 3.13 behaviour with message containers
is that "x-headers belong to the broker".

While this is correct, it does break client applications which depended
on the previous use case.
One simple fix is that the client application does not re-publish with
the x-death header, but instead sets its own custom count header to
determine the number of times it retries.

This commit will only be packported to v3.13.x branch.
In other words, 4.0 won't interpret x-headers as done in 3.13.0 - 3.13.3.

The reason we backport this commit to v3.13.x is that the Spring
documentation expliclity recommends re-publishing the message with
x-death header being set:
https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-rabbit/3.0.6.RELEASE/reference/html/spring-cloud-stream-binder-rabbit.html#_retry_with_the_rabbitmq_binder
ansd added a commit that referenced this issue May 28, 2024
Fixes #10709
Fixes #11331

This commit fixes the following regression which worked in 3.12.x, but
stopped working in 3.13.0 - 3.13.2:

```
AMQP 0.9.1 client    --publish-->
Q                    --dead-letter-->
DLQ                  --consume-->
AMQP 0.9.1 client (death count is now 1) --republish-same-message-with-headers-as-just-received-->
Q                    --dead-letter-->
DLQ                  --consume -->
AMQP 0.9.1 (death count is now 1, but should be 2)
```

The reason this behaviour stopped to work in 3.13.0 is that the broker
won't specially interpret x-headers in general, and the x-death header
specifically in this case anymore.

In other words, the new desired 3.13 behaviour with message containers
is that "x-headers belong to the broker".

While this is correct, it does break client applications which depended
on the previous use case.
One simple fix is that the client application does not re-publish with
the x-death header, but instead sets its own custom count header to
determine the number of times it retries.

This commit will only be packported to v3.13.x branch.
In other words, 4.0 won't interpret x-headers as done in 3.13.0 - 3.13.2.

The reason we backport this commit to v3.13.x is that the Spring
documentation expliclity recommends re-publishing the message with
x-death header being set:
https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-rabbit/3.0.6.RELEASE/reference/html/spring-cloud-stream-binder-rabbit.html#_retry_with_the_rabbitmq_binder
@ansd
Copy link
Member

ansd commented May 29, 2024

@bmflynn @cressie176 @dvovney @samragu @artembilan
We merged #11339 yesterday into v3.13.x branch.
This means 3.13.3 will revert to the 3.12 behaviour that the broker interprets the x-death header from publishing AMQP 0.9.1 clients.
This fix will only be available for 3.13.
Starting in RabbitMQ 4.0, the broker will behave like 3.13.0 - 3.13.2, i.e. the broker will not interpret the x-death header from publishing clients. Hence, use a customer header in 4.0.

ansd added a commit that referenced this issue Oct 9, 2024
Starting with RabbitMQ 4.0, the AMQP 0.9.1 x-death header won't be
interpreted anymore by RabbitMQ when clients publish new messages to RabbitMQ.

Relates to
* #10709
* #11331
* #11339
ansd added a commit that referenced this issue Oct 9, 2024
Starting with RabbitMQ 4.0, the AMQP 0.9.1 x-death header won't be
interpreted anymore by RabbitMQ when clients publish new messages to RabbitMQ.

Relates to
* #10709
* #11331
* #11339

(cherry picked from commit 320d8ae)
LoisSotoLopez pushed a commit to cloudamqp/rabbitmq-server that referenced this issue Oct 21, 2024
Starting with RabbitMQ 4.0, the AMQP 0.9.1 x-death header won't be
interpreted anymore by RabbitMQ when clients publish new messages to RabbitMQ.

Relates to
* rabbitmq#10709
* rabbitmq#11331
* rabbitmq#11339
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

7 participants