You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Is your feature request related to a problem? Please describe.
Consumer timeouts in the queue is likely to be more reliable than implemented in the channel and allows for greater flexibility and negates the need to implement for every protocol
Describe the solution you'd like
Consumer timeouts design
rabbit_fifo will record the checkout time for each message that is assigned to a consumer. This timestamp
will be used to detect messages that have been kept longer than the consumer timeout configured.
We do not want to use the RA timeout effect as to do so we'd need to either do expensive and frequent
calculations over the full set of checked out messages or keep lots of timers (one per message).
Instead we'd schedule an aux event every minute which will do a scan over the checked set and if any
consumer has messages with expired timeouts and if so commit a new command eval_consumer_timeouts
to do this work and return messages. This means that will be evaluated some time after the expiry but no more than ~60s
Consumers that let any of their message locks expire should not be assigned any further messages until they send
some kind of command (settlement, lock renewal etc) to show that they are live and responding. They should be treated as "suspected" until it is known that they can reply.
This mean we can probably get rid of the (undesirable but necessary with mnesia) behaviour where when the queue received a DOWN notification with the reason noconnection it would immediately return all messages. With mnesia this was reasonably correct. If there were cluster disconnected (even shortly) typically the rabbit application would restart itself in mysterious ways with the ultimate result that channels were terminated. With khepri this will no longer be the case and the cluster should be able to function normally even if there are short term cluster disconnections.
So going forward when a QQ receives a noconnection for a consumer process it will only mark it as disconnected (so that new messages are not assigned until it comes back) and let the consumer timeout handle the message return in due course. This means it should be able to handle the case of short term disconnections / reconnections in the cluster without messages being returned unnecessarily.
If the consumer is already in cancelled state (cancelled but with pending messages) then all pending messages will be returned and the consumer will be removed. This is the safest option there are potentially faulty clients in the wild that will never ack pending messages after a cancellation.
This also means that locks should be relatively short (max 5 mins but ideally lower).
Single Active Consumer consumers that let their messages time out will have all pending messages returned, as well
as being replaced. This is to ensure ordering invariants with SAC.
Protocol impl:
AMQP can provide a management extension command to renew locks for a messages.
AMQP legacy can configure an auto renew function (that is done by the channel process / queue type) where it will
auto renew the lock n number of times on behalf of the client. This is because the legacy protocol (and other protocols such as MQTT / STOMP) don't have any options for implementing lock renewal.
For AMQP legacy we can default to renew locks to the total of the current consumer_timeout configuration.
Q: Can we do lock renewal without going through Raft log?
When a messages reaches timeout the queue will notify the consumer process with a new Ra event {message_timeout, consumer_tag(), [MsgIds]} - how this is handled may depend on the protocol
implementation. AMQP can emit the released or modified outcome. Other protocols don't have the same
mechanism so for AMQP legacy it is probably best to terminate the channel or initiate a broker side consumer cancellation.
Describe alternatives you've considered
.
Additional context
.
The text was updated successfully, but these errors were encountered:
Is your feature request related to a problem? Please describe.
Consumer timeouts in the queue is likely to be more reliable than implemented in the channel and allows for greater flexibility and negates the need to implement for every protocol
Describe the solution you'd like
Consumer timeouts design
rabbit_fifo
will record the checkout time for each message that is assigned to a consumer. This timestampwill be used to detect messages that have been kept longer than the consumer timeout configured.
We do not want to use the RA
timeout
effect as to do so we'd need to either do expensive and frequentcalculations over the full set of checked out messages or keep lots of timers (one per message).
Instead we'd schedule an aux event every minute which will do a scan over the checked set and if any
consumer has messages with expired timeouts and if so commit a new command
eval_consumer_timeouts
to do this work and return messages. This means that will be evaluated some time after the expiry but no more than ~60s
Consumers that let any of their message locks expire should not be assigned any further messages until they send
some kind of command (settlement, lock renewal etc) to show that they are live and responding. They should be treated as "suspected" until it is known that they can reply.
This mean we can probably get rid of the (undesirable but necessary with mnesia) behaviour where when the queue received a
DOWN
notification with the reasonnoconnection
it would immediately return all messages. With mnesia this was reasonably correct. If there were cluster disconnected (even shortly) typically therabbit
application would restart itself in mysterious ways with the ultimate result that channels were terminated. Withkhepri
this will no longer be the case and the cluster should be able to function normally even if there are short term cluster disconnections.So going forward when a QQ receives a
noconnection
for a consumer process it will only mark it as disconnected (so that new messages are not assigned until it comes back) and let the consumer timeout handle the message return in due course. This means it should be able to handle the case of short term disconnections / reconnections in the cluster without messages being returned unnecessarily.If the consumer is already in
cancelled
state (cancelled but with pending messages) then all pending messages will be returned and the consumer will be removed. This is the safest option there are potentially faulty clients in the wild that will never ack pending messages after a cancellation.This also means that locks should be relatively short (max 5 mins but ideally lower).
Single Active Consumer consumers that let their messages time out will have all pending messages returned, as well
as being replaced. This is to ensure ordering invariants with SAC.
Protocol impl:
AMQP can provide a management extension command to renew locks for a messages.
AMQP legacy can configure an auto renew function (that is done by the channel process / queue type) where it will
auto renew the lock n number of times on behalf of the client. This is because the legacy protocol (and other protocols such as MQTT / STOMP) don't have any options for implementing lock renewal.
For AMQP legacy we can default to renew locks to the total of the current
consumer_timeout
configuration.Q: Can we do lock renewal without going through Raft log?
When a messages reaches timeout the queue will notify the consumer process with a new Ra event
{message_timeout, consumer_tag(), [MsgIds]}
- how this is handled may depend on the protocolimplementation. AMQP can emit the released or modified outcome. Other protocols don't have the same
mechanism so for AMQP legacy it is probably best to terminate the channel or initiate a broker side consumer cancellation.
Describe alternatives you've considered
.
Additional context
.
The text was updated successfully, but these errors were encountered: