From 6cde2222acc430f36db105c4d161ebd014829233 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Wed, 26 Nov 2025 16:49:17 +0000 Subject: [PATCH 1/4] extend qq force shrink operations to use regx for queue names on shrink globally or per vhost (cherry picked from commit 0d54c12c4295b96b03212a606560b624f7fb1493) --- deps/rabbit/src/rabbit_quorum_queue.erl | 31 ++++++++++++++++++------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 536a68e2de93..c9a2e7c59851 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -75,7 +75,9 @@ -export([force_shrink_member_to_current_member/2, force_vhost_queues_shrink_member_to_current_member/1, - force_all_queues_shrink_member_to_current_member/0]). + force_vhost_queues_shrink_member_to_current_member/2, + force_all_queues_shrink_member_to_current_member/0, + force_all_queues_shrink_member_to_current_member/1]). -export([policy_apply_to_name/0, drain/1, @@ -2085,22 +2087,31 @@ force_shrink_member_to_current_member(VHost, Name) -> end. force_vhost_queues_shrink_member_to_current_member(VHost) when is_binary(VHost) -> - ?LOG_WARNING("Shrinking all quorum queues in vhost '~ts' to a single node: ~ts", [VHost, node()]), + force_vhost_queues_shrink_member_to_current_member(VHost, <<".*">>). + +force_vhost_queues_shrink_member_to_current_member(VHost, QueueSpec) + when is_binary(VHost), is_binary(QueueSpec) -> + rabbit_log:warning("Shrinking all quorum queues matching '~ts' in vhost '~ts' to a single node: ~ts", + [QueueSpec, VHost, node()]), ListQQs = fun() -> rabbit_amqqueue:list(VHost) end, - force_all_queues_shrink_member_to_current_member(ListQQs). + force_all_queues_shrink_member_to_current_member(ListQQs, QueueSpec). force_all_queues_shrink_member_to_current_member() -> - ?LOG_WARNING("Shrinking all quorum queues to a single node: ~ts", [node()]), + force_all_queues_shrink_member_to_current_member(<<".*">>). + +force_all_queues_shrink_member_to_current_member(QueueSpec) when is_binary(QueueSpec) -> + rabbit_log:warning("Shrinking all quorum queues matching '~ts' to a single node: ~ts", + [QueueSpec, node()]), ListQQs = fun() -> rabbit_amqqueue:list() end, - force_all_queues_shrink_member_to_current_member(ListQQs). + force_all_queues_shrink_member_to_current_member(ListQQs, QueueSpec). -force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(ListQQFun) -> +force_all_queues_shrink_member_to_current_member(ListQQFun, QueueSpec) when is_function(ListQQFun) -> Node = node(), _ = [begin QName = amqqueue:get_name(Q), {RaName, _} = amqqueue:get_pid(Q), OtherNodes = lists:delete(Node, get_nodes(Q)), - ?LOG_WARNING("Shrinking queue ~ts to a single node: ~ts", [rabbit_misc:rs(QName), Node]), + rabbit_log:warning("Shrinking queue '~ts' to a single node: ~ts", [rabbit_misc:rs(QName), Node]), ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}), Fun = fun (QQ) -> TS0 = amqqueue:get_type_state(QQ), @@ -2109,8 +2120,10 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis end, _ = rabbit_amqqueue:update(QName, Fun), _ = [ra:force_delete_server(?RA_SYSTEM, {RaName, N}) || N <- OtherNodes] - end || Q <- ListQQFun(), amqqueue:get_type(Q) == ?MODULE], - ?LOG_WARNING("Shrinking finished"), + end || Q <- ListQQFun(), + amqqueue:get_type(Q) == ?MODULE, + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)], + rabbit_log:warning("Shrinking finished"), ok. force_checkpoint_on_queue(QName) -> From a1b1e4cfc7fe458a1cc7f78e5f60c8328a792d27 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Wed, 26 Nov 2025 16:49:29 +0000 Subject: [PATCH 2/4] add tests for qq force shrink operations by queue name regx (cherry picked from commit a2712701ec87db8933374f85a21d73c2e9c5c1a7) --- deps/rabbit/test/quorum_queue_SUITE.erl | 43 +++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index e46a6e9789bb..481d5aafaa38 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -1362,6 +1362,21 @@ force_all_queues_shrink_member_to_current_member(Config) -> Config, Server0, <<"/">>, Q, 3) end || Q <- QQs], + %% match QQ only in shrink + QQSpec = <>, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_all_queues_shrink_member_to_current_member, [QQSpec]), + + wait_for_messages_ready([Server0], ra_name(QQ), 3), + queue_utils:assert_number_of_replicas( + Config, Server0, <<"/">>, QQ, 1), + + wait_for_messages_ready([Server0], ra_name(AQ), 3), + queue_utils:assert_number_of_replicas( + Config, Server0, <<"/">>, AQ, 3), + + %% match all queues on shrink + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, force_all_queues_shrink_member_to_current_member, []), @@ -1425,6 +1440,34 @@ force_vhost_queues_shrink_member_to_current_member(Config) -> Config, Server0, VHost, Q, 3) end || Q <- QQs, VHost <- VHosts], + % match QQ only in VHost2 on shrink + QQSpec = <>, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_vhost_queues_shrink_member_to_current_member, [VHost2, QQSpec]), + + [begin + QQRes = rabbit_misc:r(VHost2, queue, Q), + {ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]), + wait_for_messages_ready([Server0], RaName, 3), + case Q of + QQ -> + queue_utils:assert_number_of_replicas( + Config, Server0, VHost2, Q, 1); + AQ -> + queue_utils:assert_number_of_replicas( + Config, Server0, VHost2, Q, 3) + end + end || Q <- QQs], + + [begin + QQRes = rabbit_misc:r(VHost1, queue, Q), + {ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]), + wait_for_messages_ready([Server0], RaName, 3), + queue_utils:assert_number_of_replicas( + Config, Server0, VHost1, Q, 3) + end || Q <- QQs], + + % match all queues in VHost2 on shrink rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, force_vhost_queues_shrink_member_to_current_member, [VHost2]), From 39e82447578e79b170dcf42aba8c4e0bb8e89d72 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Wed, 26 Nov 2025 17:02:10 +0000 Subject: [PATCH 3/4] use ?LOG_WARNING instead of rabbit_log:warning/{1,2} (cherry picked from commit d2f552f5d2cb07eead9cdb297ec6c1fe0e4207b2) --- deps/rabbit/src/rabbit_quorum_queue.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index c9a2e7c59851..cf32ab0cb311 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -2091,7 +2091,7 @@ force_vhost_queues_shrink_member_to_current_member(VHost) when is_binary(VHost) force_vhost_queues_shrink_member_to_current_member(VHost, QueueSpec) when is_binary(VHost), is_binary(QueueSpec) -> - rabbit_log:warning("Shrinking all quorum queues matching '~ts' in vhost '~ts' to a single node: ~ts", + ?LOG_WARNING("Shrinking all quorum queues matching '~ts' in vhost '~ts' to a single node: ~ts", [QueueSpec, VHost, node()]), ListQQs = fun() -> rabbit_amqqueue:list(VHost) end, force_all_queues_shrink_member_to_current_member(ListQQs, QueueSpec). @@ -2100,7 +2100,7 @@ force_all_queues_shrink_member_to_current_member() -> force_all_queues_shrink_member_to_current_member(<<".*">>). force_all_queues_shrink_member_to_current_member(QueueSpec) when is_binary(QueueSpec) -> - rabbit_log:warning("Shrinking all quorum queues matching '~ts' to a single node: ~ts", + ?LOG_WARNING("Shrinking all quorum queues matching '~ts' to a single node: ~ts", [QueueSpec, node()]), ListQQs = fun() -> rabbit_amqqueue:list() end, force_all_queues_shrink_member_to_current_member(ListQQs, QueueSpec). @@ -2111,7 +2111,7 @@ force_all_queues_shrink_member_to_current_member(ListQQFun, QueueSpec) when is_f QName = amqqueue:get_name(Q), {RaName, _} = amqqueue:get_pid(Q), OtherNodes = lists:delete(Node, get_nodes(Q)), - rabbit_log:warning("Shrinking queue '~ts' to a single node: ~ts", [rabbit_misc:rs(QName), Node]), + ?LOG_WARNING("Shrinking queue '~ts' to a single node: ~ts", [rabbit_misc:rs(QName), Node]), ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}), Fun = fun (QQ) -> TS0 = amqqueue:get_type_state(QQ), @@ -2123,7 +2123,7 @@ force_all_queues_shrink_member_to_current_member(ListQQFun, QueueSpec) when is_f end || Q <- ListQQFun(), amqqueue:get_type(Q) == ?MODULE, is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)], - rabbit_log:warning("Shrinking finished"), + ?LOG_WARNING("Shrinking finished"), ok. force_checkpoint_on_queue(QName) -> From df20938b4d71d6dec2ea6bcbdbf60df6ecd3f1cd Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Tue, 16 Dec 2025 13:05:19 +0000 Subject: [PATCH 4/4] optimize for faster shinking for all queues per vhost and/or globally within a node (cherry picked from commit a79a911670352f6e9c38f50bf4b5aad42119aa36) --- deps/rabbit/src/rabbit_quorum_queue.erl | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index cf32ab0cb311..2d0ca0865199 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -2087,25 +2087,34 @@ force_shrink_member_to_current_member(VHost, Name) -> end. force_vhost_queues_shrink_member_to_current_member(VHost) when is_binary(VHost) -> - force_vhost_queues_shrink_member_to_current_member(VHost, <<".*">>). + ?LOG_WARNING("Shrinking all quorum queues in vhost '~ts' to a single node: ~ts", + [VHost, node()]), + ListQQFun = fun() -> rabbit_amqqueue:list(VHost) end, + force_all_queues_shrink_member_to_current_member(ListQQFun, _MatchFun = fun(_) -> true end). force_vhost_queues_shrink_member_to_current_member(VHost, QueueSpec) when is_binary(VHost), is_binary(QueueSpec) -> ?LOG_WARNING("Shrinking all quorum queues matching '~ts' in vhost '~ts' to a single node: ~ts", [QueueSpec, VHost, node()]), - ListQQs = fun() -> rabbit_amqqueue:list(VHost) end, - force_all_queues_shrink_member_to_current_member(ListQQs, QueueSpec). + ListQQFun = fun() -> rabbit_amqqueue:list(VHost) end, + MatchFun = fun(Q) -> is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) end, + force_all_queues_shrink_member_to_current_member(ListQQFun, MatchFun). force_all_queues_shrink_member_to_current_member() -> - force_all_queues_shrink_member_to_current_member(<<".*">>). + ?LOG_WARNING("Shrinking all quorum queues matching to a single node: ~ts", + [node()]), + ListQQFun = fun() -> rabbit_amqqueue:list() end, + force_all_queues_shrink_member_to_current_member(ListQQFun, _MatchFun = fun(_) -> true end). force_all_queues_shrink_member_to_current_member(QueueSpec) when is_binary(QueueSpec) -> ?LOG_WARNING("Shrinking all quorum queues matching '~ts' to a single node: ~ts", [QueueSpec, node()]), - ListQQs = fun() -> rabbit_amqqueue:list() end, - force_all_queues_shrink_member_to_current_member(ListQQs, QueueSpec). + ListQQFun = fun() -> rabbit_amqqueue:list() end, + MatchFun = fun(Q) -> is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) end, + force_all_queues_shrink_member_to_current_member(ListQQFun, MatchFun). -force_all_queues_shrink_member_to_current_member(ListQQFun, QueueSpec) when is_function(ListQQFun) -> +force_all_queues_shrink_member_to_current_member(ListQQFun, MatchFun) + when is_function(ListQQFun), is_function(MatchFun) -> Node = node(), _ = [begin QName = amqqueue:get_name(Q), @@ -2122,7 +2131,7 @@ force_all_queues_shrink_member_to_current_member(ListQQFun, QueueSpec) when is_f _ = [ra:force_delete_server(?RA_SYSTEM, {RaName, N}) || N <- OtherNodes] end || Q <- ListQQFun(), amqqueue:get_type(Q) == ?MODULE, - is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)], + MatchFun(Q)], ?LOG_WARNING("Shrinking finished"), ok.