diff --git a/deps/rabbit/docs/rabbitmq-queues.8 b/deps/rabbit/docs/rabbitmq-queues.8 index 41196752c0f5..5713449b2bde 100644 --- a/deps/rabbit/docs/rabbitmq-queues.8 +++ b/deps/rabbit/docs/rabbitmq-queues.8 @@ -97,16 +97,16 @@ Displays general help and commands supported by .\" ------------------------------------ .It Cm grow Ar node Ar selector Fl -vhost-pattern Ar pattern Fl -queue-pattern Ar pattern Fl -errors-only .Pp -Adds a new replica on the given node for all or a half of matching quorum queues. +Adds a new replica on the given node for all or a half of matching queues. .Pp Supported .Ar selector values are: .Bl -tag -width Ds .It Dv Sy all -Selects all quorum queues +Selects all replicated queues .It Dv Sy even -Selects quorum queues with an even number of replicas +Selects replicated queues with an even number of replicas .El .Pp Example: @@ -137,7 +137,7 @@ Example: .\" ------------------------------------ .It Cm shrink Ar node .Pp -Shrinks quorum queue clusters by removing any members (replicas) on the given node. +Shrinks queue clusters by removing any members (replicas) on the given node. .Pp Example: .Sp @@ -149,7 +149,7 @@ Example: .\" ------------------------------------ .It Cm add_member Ar queue Ar node Fl -vhost Ar virtual-host .Pp -Adds a quorum queue member (replica) on the given node. +Adds a queue member (replica) on the given node. .Pp Example: .Sp @@ -157,7 +157,7 @@ Example: .\" ------------------------------------ .It Cm delete_member Ar queue Ar node Fl -vhost Ar virtual-host .Pp -Removes a quorum queue member (replica) on the given node. +Removes a queue member (replica) on the given node. .Pp Example: .Sp @@ -169,7 +169,7 @@ Example: .\" ------------------------------------ .It Cm quorum_status Ar queue Fl -vhost Ar virtual-host .Pp -Displays quorum status of a quorum queue. +Displays quorum status of a queue. .Pp Example: .Sp diff --git a/deps/rabbit/src/rabbit_queue_type_ra.erl b/deps/rabbit/src/rabbit_queue_type_ra.erl new file mode 100644 index 000000000000..beb8a5b2f6eb --- /dev/null +++ b/deps/rabbit/src/rabbit_queue_type_ra.erl @@ -0,0 +1,153 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. + +-module(rabbit_queue_type_ra). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +-export([status/2, + add_member/5, + add_members/5, + delete_member/3, + delete_members/4, + all_members_stable/2]). + +-export_type([ra_membership/0]). + +-type ra_membership() :: voter | non_voter | promotable. + +-define(RA_MEMBERS_TIMEOUT, 60_000). + +-callback status(amqqueue:amqqueue()) -> + [[{binary(), term()}]]. + +-callback add_member(amqqueue:amqqueue(), node(), ra_membership(), timeout()) -> + ok | {error, term()}. + +-callback delete_member(amqqueue:amqqueue(), node()) -> + ok | {error, term()}. + +-spec status(rabbit_types:vhost(), rabbit_misc:resource_name()) -> + [[{binary(), term()}]] | {error, term()}. +status(VHost, Name) -> + with_ra_queue(VHost, Name, + fun(Mod, Q) -> Mod:status(Q) end). + +-spec add_member(rabbit_types:vhost(), rabbit_misc:resource_name(), + node(), ra_membership(), timeout()) -> + ok | {error, term()}. +add_member(VHost, Name, Node, Membership, Timeout) -> + with_ra_queue(VHost, Name, + fun(Mod, Q) -> Mod:add_member(Q, Node, Membership, Timeout) end). + +-spec delete_member(rabbit_types:vhost(), rabbit_misc:resource_name(), node()) -> + ok | {error, term()}. +delete_member(VHost, Name, Node) -> + with_ra_queue(VHost, Name, + fun(Mod, Q) -> Mod:delete_member(Q, Node) end). + +with_ra_queue(VHost, Name, Fun) -> + QName = rabbit_misc:queue_resource(VHost, Name), + case rabbit_amqqueue:lookup(QName) of + {ok, Q} -> + Mod = amqqueue:get_type(Q), + case is_ra_based(Mod) of + true -> + Fun(Mod, Q); + false -> + {error, {unsupported, Mod}} + end; + {error, not_found} = Err -> + Err + end. + +%% For each Ra-based queue matching VHostSpec and QueueSpec, add a member on Node. +-spec add_members(binary(), binary(), node(), all | even, ra_membership()) -> + [{rabbit_amqqueue:name(), {ok, pos_integer()} | {error, pos_integer(), term()}}]. +add_members(VHostSpec, QueueSpec, Node, Strategy, Membership) -> + FilterFun = fun(Q) -> not lists:member(Node, amqqueue:get_nodes(Q)) end, + Fun = fun(Mod, Q, Size) -> + case Mod:add_member(Q, Node, Membership, ?RA_MEMBERS_TIMEOUT) of + ok -> + {ok, Size + 1}; + {error, Reason} -> + {error, Size, Reason} + end + end, + modify_members_matching(VHostSpec, QueueSpec, Node, Strategy, FilterFun, Fun). + +%% For each Ra-based queue matching VHostSpec and QueueSpec, delete a member on Node. +-spec delete_members(binary(), binary(), node(), all | even) -> + [{rabbit_amqqueue:name(), {ok, pos_integer()} | {error, pos_integer(), term()}}]. +delete_members(VHostSpec, QueueSpec, Node, Strategy) -> + FilterFun = fun(Q) -> lists:member(Node, amqqueue:get_nodes(Q)) end, + Fun = fun(Mod, Q, Size) -> + case Mod:delete_member(Q, Node) of + ok -> + {ok, Size - 1}; + {error, Reason} -> + {error, Size, Reason} + end + end, + modify_members_matching(VHostSpec, QueueSpec, Node, Strategy, FilterFun, Fun). + +modify_members_matching(VHostSpec, QueueSpec, Node, Strategy, FilterFun, OperationFun) -> + case lists:member(Node, rabbit_nodes:list_running()) of + true -> + [begin + Mod = amqqueue:get_type(Q), + QName = amqqueue:get_name(Q), + QNodes = amqqueue:get_nodes(Q), + Size = length(QNodes), + {ok, RaName} = rabbit_queue_type_util:qname_to_internal_name(QName), + Res = case all_members_stable(RaName, QNodes) of + true -> + OperationFun(Mod, Q, Size); + false -> + {error, Size, {error, non_stable_members}} + end, + {QName, Res} + end + || Q <- rabbit_amqqueue:list(), + FilterFun(Q), + matches_strategy(Strategy, amqqueue:get_nodes(Q)), + is_ra_based(amqqueue:get_type(Q)), + is_match(amqqueue:get_vhost(Q), VHostSpec), + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)]; + false -> + {error, {node_not_running, Node}} + end. + +is_ra_based(Mod) -> + lists:any(fun({behaviour, Bs}) -> lists:member(?MODULE, Bs); + ({behavior, Bs}) -> lists:member(?MODULE, Bs); + (_) -> false + end, Mod:module_info(attributes)). + +%% Check that all Ra members are stable (voter or non_voter, not promotable). +%% This is used to ensure that we don't add/remove members while another +%% membership change is in progress. +-spec all_members_stable(atom(), [node()]) -> boolean(). +all_members_stable(RaName, QNodes) -> + Result = erpc:multicall(QNodes, ets, lookup, [ra_state, RaName], ?RA_MEMBERS_TIMEOUT), + lists:all(fun({ok, [{_RaName, _RaState, Membership}]}) + when Membership =:= voter orelse + Membership =:= non_voter -> + true; + (_) -> + false + end, Result). + +matches_strategy(all, _Members) -> + true; +matches_strategy(even, Members) -> + length(Members) rem 2 =:= 0. + +is_match(Subject, RE) -> + match =:= re:run(Subject, RE, [{capture, none}]). + +get_resource_name(#resource{name = Name}) -> + Name. diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 0a6b8a806809..98f74a28ed70 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -9,6 +9,7 @@ -feature(maybe_expr, enable). -behaviour(rabbit_queue_type). +-behaviour(rabbit_queue_type_ra). -behaviour(rabbit_policy_validator). -behaviour(rabbit_policy_merge_strategy). @@ -30,7 +31,7 @@ -export([supports_stateful_delivery/0, deliver/3]). -export([dead_letter_publish/5]). --export([cluster_state/1, status/2]). +-export([cluster_state/1, status/1, status/2]). -export([update_consumer_handler/8, update_consumer/9]). -export([cancel_consumer_handler/2, cancel_consumer/3]). -export([become_leader/2, handle_tick/3, spawn_deleter/1]). @@ -119,7 +120,6 @@ -type msg_id() :: non_neg_integer(). -type qmsg() :: {rabbit_types:r('queue'), pid(), msg_id(), boolean(), mc:state()}. --type membership() :: voter | non_voter | promotable. %% see ra_membership() in Ra. -type replica_states() :: #{atom() => replica_state()}. -type replica_state() :: leader | follower | non_voter | promotable. @@ -1249,69 +1249,74 @@ status(Vhost, QueueName) -> {ok, Q} when ?amqqueue_is_classic(Q) -> {error, classic_queue_not_supported}; {ok, Q} when ?amqqueue_is_quorum(Q) -> - {RName, _} = amqqueue:get_pid(Q), - Nodes = lists:sort(get_nodes(Q)), - [begin - ServerId = {RName, N}, - case erpc_call(N, ?MODULE, key_metrics_rpc, [ServerId], ?RPC_TIMEOUT) of - #{state := RaftState, - membership := Membership, - commit_index := Commit, - term := Term, - last_index := Last, - last_applied := LastApplied, - last_written_index := LastWritten, - snapshot_index := SnapIdx, - machine_version := MacVer} -> - [{<<"Node Name">>, N}, - {<<"Raft State">>, RaftState}, - {<<"Membership">>, Membership}, - {<<"Last Log Index">>, Last}, - {<<"Last Written">>, LastWritten}, - {<<"Last Applied">>, LastApplied}, - {<<"Commit Index">>, Commit}, - {<<"Snapshot Index">>, SnapIdx}, - {<<"Term">>, Term}, - {<<"Machine Version">>, MacVer} - ]; - #{state := noproc, - membership := unknown, - machine_version := MacVer} -> - [{<<"Node Name">>, N}, - {<<"Raft State">>, noproc}, - {<<"Membership">>, <<>>}, - {<<"Last Log Index">>, <<>>}, - {<<"Last Written">>, <<>>}, - {<<"Last Applied">>, <<>>}, - {<<"Commit Index">>, <<>>}, - {<<"Snapshot Index">>, <<>>}, - {<<"Term">>, <<>>}, - {<<"Machine Version">>, MacVer} - ]; - {error, Reason} -> - State = case is_atom(Reason) of - true -> Reason; - false -> unknown - end, - [{<<"Node Name">>, N}, - {<<"Raft State">>, State}, - {<<"Membership">>, <<>>}, - {<<"Last Log Index">>, <<>>}, - {<<"Last Written">>, <<>>}, - {<<"Last Applied">>, <<>>}, - {<<"Commit Index">>, <<>>}, - {<<"Snapshot Index">>, <<>>}, - {<<"Term">>, <<>>}, - {<<"Machine Version">>, <<>>} - ] - end - end || N <- Nodes]; + status(Q); {ok, _Q} -> {error, not_quorum_queue}; {error, not_found} = E -> E end. +-spec status(amqqueue:amqqueue()) -> + [[{binary(), term()}]]. +status(Q) when ?amqqueue_is_quorum(Q) -> + {RName, _} = amqqueue:get_pid(Q), + Nodes = lists:sort(get_nodes(Q)), + [begin + ServerId = {RName, N}, + case erpc_call(N, ?MODULE, key_metrics_rpc, [ServerId], ?RPC_TIMEOUT) of + #{state := RaftState, + membership := Membership, + commit_index := Commit, + term := Term, + last_index := Last, + last_applied := LastApplied, + last_written_index := LastWritten, + snapshot_index := SnapIdx, + machine_version := MacVer} -> + [{<<"Node Name">>, N}, + {<<"Raft State">>, RaftState}, + {<<"Membership">>, Membership}, + {<<"Last Log Index">>, Last}, + {<<"Last Written">>, LastWritten}, + {<<"Last Applied">>, LastApplied}, + {<<"Commit Index">>, Commit}, + {<<"Snapshot Index">>, SnapIdx}, + {<<"Term">>, Term}, + {<<"Machine Version">>, MacVer} + ]; + #{state := noproc, + membership := unknown, + machine_version := MacVer} -> + [{<<"Node Name">>, N}, + {<<"Raft State">>, noproc}, + {<<"Membership">>, <<>>}, + {<<"Last Log Index">>, <<>>}, + {<<"Last Written">>, <<>>}, + {<<"Last Applied">>, <<>>}, + {<<"Commit Index">>, <<>>}, + {<<"Snapshot Index">>, <<>>}, + {<<"Term">>, <<>>}, + {<<"Machine Version">>, MacVer} + ]; + {error, Reason} -> + State = case is_atom(Reason) of + true -> Reason; + false -> unknown + end, + [{<<"Node Name">>, N}, + {<<"Raft State">>, State}, + {<<"Membership">>, <<>>}, + {<<"Last Log Index">>, <<>>}, + {<<"Last Written">>, <<>>}, + {<<"Last Applied">>, <<>>}, + {<<"Commit Index">>, <<>>}, + {<<"Snapshot Index">>, <<>>}, + {<<"Term">>, <<>>}, + {<<"Machine Version">>, <<>>} + ] + end + end || N <- Nodes]. + add_member(VHost, Name, Node, Membership, Timeout) when is_binary(VHost) andalso is_binary(Name) andalso @@ -1344,6 +1349,8 @@ add_member(VHost, Name, Node, Membership, Timeout) E end. +add_member(Q, Node, Membership, Timeout) when ?amqqueue_is_quorum(Q) -> + do_add_member(Q, Node, Membership, Timeout); add_member(VHost, Name, Node, Timeout) when is_binary(VHost) -> %% NOTE needed to pass mixed cluster tests. add_member(VHost, Name, Node, promotable, Timeout). @@ -1429,7 +1436,8 @@ delete_member(VHost, Name, Node) -> E end. - +-spec delete_member(amqqueue:amqqueue(), node()) -> + ok | {error, term()}. delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> QName = amqqueue:get_name(Q), {RaName, _} = amqqueue:get_pid(Q), @@ -1519,7 +1527,7 @@ shrink_all(Node) -> grow(Node, VhostSpec, QueueSpec, Strategy) -> grow(Node, VhostSpec, QueueSpec, Strategy, promotable). --spec grow(node() | integer(), binary(), binary(), all | even, membership()) -> +-spec grow(node() | integer(), binary(), binary(), all | even, rabbit_queue_type_ra:ra_membership()) -> [{rabbit_amqqueue:name(), {ok, pos_integer()} | {error, pos_integer(), term()}}]. grow(Node, VhostSpec, QueueSpec, Strategy, Membership) when is_atom(Node) -> @@ -1585,7 +1593,7 @@ maybe_grow(Q, Node, Membership, Size) -> maybe_grow(Q, Node, Membership, Size, QNodes) -> QName = amqqueue:get_name(Q), {ok, RaName} = qname_to_internal_name(QName), - case check_all_memberships(RaName, QNodes, voter) of + case rabbit_queue_type_ra:all_members_stable(RaName, QNodes) of true -> ?LOG_INFO("~ts: adding a new member (replica) on node ~w", [rabbit_misc:rs(QName), Node]), @@ -1606,22 +1614,6 @@ maybe_grow(Q, Node, Membership, Size, QNodes) -> {QName, {error, Size, Err}} end. -%% Compare local membership states of all nodes in parallel. -%% -%% Note a few things: -%% 1. This function intentionally queries local member state and not the leader -%% 2. ra:key_metrics/1 is sequential and not parallel -%% 3. ra:key_metrics/1 is not multicall-friendly because it relies on erlang:node/0 -check_all_memberships(RaName, QNodes, CompareMembership) -> - case rpc:multicall(QNodes, ets, lookup, [ra_state, RaName]) of - {Result, []} -> - lists:all( - fun(M) -> M == CompareMembership end, - [Membership || [{_RaName, _RaState, Membership}] <- Result]); - _ -> - false - end. - -spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}. transfer_leadership(Q, Destination) -> diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index fe41526bb340..3e9e94d25b94 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -1897,20 +1897,6 @@ grow_queue(Config) -> %% grow queues to node 'Server1': non_voter rpc:call(Server0, rabbit_quorum_queue, grow, [Server1, <<"/">>, <<".*">>, all, non_voter]), - assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount), - - %% grow queues to node 'Server2': fail, non_voters found - Result6 = rpc:call(Server0, rabbit_quorum_queue, grow, [Server2, <<"/">>, <<".*">>, all, voter]), - %% [{{resource,<<"/">>,queue,<<"grow_queue">>},{error, 2, {error, non_voters_found}}, - %% {{resource,<<"/">>,queue,<<"grow_queue_alt">>},{error, 2, {error, non_voters_found}},...] - ?assert(lists:all( - fun({_, Err}) -> Err =:= {error, TargetClusterSize_2, {error, non_voters_found}} end, Result6)), - assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount), - - %% grow queues to target quorum cluster size '5': fail, non_voters found - Result7 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_5, <<"/">>, <<".*">>, all]), - ?assert(lists:all( - fun({_, Err}) -> Err =:= {error, TargetClusterSize_2, {error, non_voters_found}} end, Result7)), assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount). assert_grown_queues(Qs, Node, TargetClusterSize, MsgCount) -> diff --git a/deps/rabbit/test/rabbitmq_queues_cli_integration_SUITE.erl b/deps/rabbit/test/rabbitmq_queues_cli_integration_SUITE.erl index c6989c67ead7..f5ab8ebd61be 100644 --- a/deps/rabbit/test/rabbitmq_queues_cli_integration_SUITE.erl +++ b/deps/rabbit/test/rabbitmq_queues_cli_integration_SUITE.erl @@ -22,19 +22,15 @@ groups() -> {tests, [], [ shrink, grow, - grow_invalid_node_filtered + grow_invalid_node_filtered, + status_add_delete_member ]} ]. init_per_suite(Config) -> - case rabbit_ct_helpers:is_mixed_versions() of - false -> - rabbit_ct_helpers:log_environment(), - Config1 = rabbit_ct_helpers:run_setup_steps(Config), - rabbit_ct_helpers:ensure_rabbitmq_queues_cmd(Config1); - _ -> - {skip, "growing and shrinking cannot be done in mixed mode"} - end. + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:run_setup_steps(Config), + rabbit_ct_helpers:ensure_rabbitmq_queues_cmd(Config1). end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). @@ -54,7 +50,16 @@ end_per_group(tests, Config) -> rabbit_ct_broker_helpers:teardown_steps()). init_per_testcase(Testcase, Config0) -> - rabbit_ct_helpers:testcase_started(Config0, Testcase). + case {Testcase, rabbit_ct_helpers:is_mixed_versions()} of + {shrink, true} -> + {skip, "shrink cannot be done in mixed mode"}; + {grow, true} -> + {skip, "grow cannot be done in mixed mode"}; + {grow_invalid_node_filtered, true} -> + {skip, "grow cannot be done in mixed mode"}; + _ -> + rabbit_ct_helpers:testcase_started(Config0, Testcase) + end. end_per_testcase(Testcase, Config0) -> rabbit_ct_helpers:testcase_finished(Config0, Testcase). @@ -124,6 +129,48 @@ grow_invalid_node_filtered(Config) -> {error, _ExitCode, _} = rabbitmq_queues(Config, 0, ["grow", DummyNode, "all"]), ok. +%% Test the following CLI commands: +%% rabbitmq-queues quorum_status +%% rabbitmq-queues add_member +%% rabbitmq-queues delete_member +status_add_delete_member(Config) -> + Nodename0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Nodename2 = rabbit_ct_broker_helpers:get_node_config(Config, 2, nodename), + QName = "my-qq", + + Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + + Args = [{<<"x-quorum-initial-group-size">>, long, 1}], + #'queue.declare_ok'{} = declare_qq(Ch, QName, Args), + + publish_confirm(Ch, QName), + + %% Target new and old node in mixed version mode. + {ok, StatusOut0} = rabbitmq_queues_json(Config, 0, ["quorum_status", QName]), + {ok, StatusOut1} = rabbitmq_queues_json(Config, 1, ["quorum_status", QName]), + Status0 = parse_json_status(StatusOut0), + Status1 = parse_json_status(StatusOut1), + ?assertEqual([Nodename0], get_members(Status0)), + ?assertEqual([Nodename0], get_members(Status1)), + + {ok, _AddOut} = rabbitmq_queues(Config, 1, ["add_member", QName, Nodename2]), + + publish_confirm(Ch, QName), + + {ok, StatusOut2} = rabbitmq_queues_json(Config, 1, ["quorum_status", QName]), + Status2 = parse_json_status(StatusOut2), + ?assertEqual([Nodename0, Nodename2], + lists:sort(get_members(Status2))), + + {ok, _DeleteOut} = rabbitmq_queues(Config, 1, ["delete_member", QName, Nodename0]), + + publish_confirm(Ch, QName), + + {ok, StatusOut3} = rabbitmq_queues_json(Config, 1, ["quorum_status", QName]), + Status3 = parse_json_status(StatusOut3), + ?assertEqual([Nodename2], get_members(Status3)). + parse_result(S) -> Lines = string:split(S, "\n", all), maps:from_list( @@ -147,6 +194,16 @@ declare_qq(Ch, Q) -> rabbitmq_queues(Config, N, Args) -> rabbit_ct_broker_helpers:rabbitmq_queues(Config, N, ["--silent" | Args]). +rabbitmq_queues_json(Config, N, Args) -> + rabbit_ct_broker_helpers:rabbitmq_queues(Config, N, ["--silent", "--formatter", "json" | Args]). + +parse_json_status(JsonStr) -> + rabbit_json:decode(list_to_binary(JsonStr)). + +get_members(StatusList) when is_list(StatusList) -> + [binary_to_atom(Member) || #{<<"Node Name">> := Member} <- StatusList]. + + publish_confirm(Ch, QName) -> ok = amqp_channel:cast(Ch, #'basic.publish'{routing_key = list_to_binary(QName)}, diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/add_member_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/add_member_command.ex index 1c23fe05cee8..9ee48575ad41 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/add_member_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/add_member_command.ex @@ -2,7 +2,7 @@ ## License, v. 2.0. If a copy of the MPL was not distributed with this ## file, You can obtain one at https://mozilla.org/MPL/2.0/. ## -## Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +## Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommand do alias RabbitMQ.CLI.Core.{DocGuide, Validators} @@ -73,21 +73,15 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommand do [name, node] = _args, %{vhost: vhost, node: node_name, timeout: timeout, membership: membership} ) do - args = [vhost, name, to_atom(node)] - - args = - case to_atom(membership) do - :promotable -> args ++ [timeout] - other -> args ++ [other, timeout] - end - - case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :add_member, args) do - {:error, :classic_queue_not_supported} -> - {:error, "Cannot add members to a classic queue"} - + args = [vhost, name, to_atom(node), to_atom(membership), timeout] + case :rabbit_misc.rpc_call(node_name, :rabbit_queue_type_ra, :add_member, args) do {:error, :not_found} -> {:error, {:not_found, :queue, vhost, name}} + {:badrpc, {:EXIT, {:undef, _}}} -> + # Fallback for mixed version clusters with older nodes + :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :add_member, args) + other -> other end @@ -99,8 +93,8 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommand do def usage_additional do [ - ["", "quorum queue name"], - ["", "node to add a new replica on"], + ["", "queue name"], + ["", "node to add a new member on"], ["--membership ", "add a promotable non-voter (default) or full voter"] ] end @@ -113,11 +107,11 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommand do def help_section, do: :replication - def description, do: "Adds a quorum queue member (replica) on the given node." + def description, do: "Adds a queue member/replica on the given node." def banner([name, node], _) do [ - "Adding a replica for queue #{name} on node #{node}..." + "Adding a member for queue #{name} on node #{node}..." ] end end diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/delete_member_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/delete_member_command.ex index 443aecd1997d..0c77c6c4e16b 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/delete_member_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/delete_member_command.ex @@ -18,17 +18,15 @@ defmodule RabbitMQ.CLI.Queues.Commands.DeleteMemberCommand do use RabbitMQ.CLI.Core.RequiresRabbitAppRunning def run([name, node] = _args, %{vhost: vhost, node: node_name}) do - case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :delete_member, [ - vhost, - name, - to_atom(node) - ]) do - {:error, :classic_queue_not_supported} -> - {:error, "Cannot delete members from a classic queue"} - + args = [vhost, name, to_atom(node)] + case :rabbit_misc.rpc_call(node_name, :rabbit_queue_type_ra, :delete_member, args) do {:error, :not_found} -> {:error, {:not_found, :queue, vhost, name}} + {:badrpc, {:EXIT, {:undef, _}}} -> + # Fallback for mixed version clusters with older nodes + :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :delete_member, args) + other -> other end @@ -40,7 +38,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.DeleteMemberCommand do def usage_additional do [ - ["", "quorum queue name"], + ["", "queue name"], ["", "node to remove a new replica on"] ] end @@ -53,7 +51,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.DeleteMemberCommand do def help_section, do: :replication - def description, do: "Removes a quorum queue member (replica) on the given node." + def description, do: "Removes a queue member (replica) on the given node." def banner([name, node], _) do "Removing a replica of queue #{name} on node #{node}..." diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex index 5bac1dc84965..f0726a0c16f3 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex @@ -67,18 +67,22 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do membership: membership, errors_only: errors_only }) do - args = [to_atom(node), vhost_pat, queue_pat, to_atom(strategy)] - args = - case to_atom(membership) do - :promotable -> args - other -> args ++ [other] - end - - case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :grow, args) do + args = [vhost_pat, queue_pat, to_atom(node), to_atom(strategy), to_atom(membership)] + case :rabbit_misc.rpc_call(node_name, :rabbit_queue_type_ra, :add_members, args) do {:error, _} = error -> error + {:badrpc, {:EXIT, {:undef, _}}} -> + # Fallback for mixed version clusters with older nodes + :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :grow, [ + to_atom(node), + vhost_pat, + queue_pat, + to_atom(strategy), + to_atom(membership) + ]) + {:badrpc, _} = error -> error @@ -134,10 +138,10 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do def description, do: - "Grows quorum queue clusters by adding a member (replica) on the specified node for all matching queues" + "Grows all matching queues by adding a member (replica) on the specified node" def banner([node, strategy], _) do - "Growing #{strategy} quorum queues on #{node}..." + "Growing #{strategy} queues on #{node}..." end # diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/quorum_status_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/quorum_status_command.ex index 772f48cc2b2f..bc3012dd9e98 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/quorum_status_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/quorum_status_command.ex @@ -16,9 +16,14 @@ defmodule RabbitMQ.CLI.Queues.Commands.QuorumStatusCommand do use RabbitMQ.CLI.Core.RequiresRabbitAppRunning def run([name] = _args, %{node: node_name, vhost: vhost}) do - case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :status, [vhost, name]) do - {:error, :classic_queue_not_supported} -> - {:error, "Cannot get quorum status of a classic queue"} + args = [vhost, name] + case :rabbit_misc.rpc_call(node_name, :rabbit_queue_type_ra, :status, args) do + {:error, :not_found} -> + {:error, {:not_found, :queue, vhost, name}} + + {:badrpc, {:EXIT, {:undef, _}}} -> + # Fallback for mixed version clusters with older nodes + :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :status, args) other -> other @@ -47,8 +52,8 @@ defmodule RabbitMQ.CLI.Queues.Commands.QuorumStatusCommand do def help_section(), do: :observability_and_health_checks - def description(), do: "Displays quorum status of a quorum queue" + def description(), do: "Displays the quorum status of a queue" def banner([name], %{node: node_name}), - do: "Status of quorum queue #{name} on node #{node_name} ..." + do: "Status of queue #{name} on node #{node_name} ..." end diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/shrink_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/shrink_command.ex index e14b1ef47715..d69562ff4875 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/shrink_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/shrink_command.ex @@ -22,10 +22,15 @@ defmodule RabbitMQ.CLI.Queues.Commands.ShrinkCommand do use RabbitMQ.CLI.Core.RequiresRabbitAppRunning def run([node], %{node: node_name, errors_only: errs}) do - case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :shrink_all, [to_atom(node)]) do + args = [".*", ".*", to_atom(node), :all] + case :rabbit_misc.rpc_call(node_name, :rabbit_queue_type_ra, :delete_members, args) do {:error, _} = error -> error + {:badrpc, {:EXIT, {:undef, _}}} -> + # Fallback for mixed version clusters with older nodes + :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :shrink_all, [to_atom(node)]) + {:badrpc, _} = error -> error