diff --git a/deps/rabbit/src/rabbit_queue_type_ra.erl b/deps/rabbit/src/rabbit_queue_type_ra.erl index beb8a5b2f6eb..19014d64b485 100644 --- a/deps/rabbit/src/rabbit_queue_type_ra.erl +++ b/deps/rabbit/src/rabbit_queue_type_ra.erl @@ -33,21 +33,39 @@ -spec status(rabbit_types:vhost(), rabbit_misc:resource_name()) -> [[{binary(), term()}]] | {error, term()}. status(VHost, Name) -> - with_ra_queue(VHost, Name, - fun(Mod, Q) -> Mod:status(Q) end). + with_ra_queue(VHost, Name, fun(Mod, Q) -> Mod:status(Q) end). -spec add_member(rabbit_types:vhost(), rabbit_misc:resource_name(), node(), ra_membership(), timeout()) -> ok | {error, term()}. add_member(VHost, Name, Node, Membership, Timeout) -> - with_ra_queue(VHost, Name, - fun(Mod, Q) -> Mod:add_member(Q, Node, Membership, Timeout) end). + Fun = fun(Mod, Q) -> + case is_queue_member(Q, Node) of + true -> + %% idempotent by design + ok; + false -> + Mod:add_member(Q, Node, Membership, Timeout) + end + end, + with_ra_queue(VHost, Name, Fun). -spec delete_member(rabbit_types:vhost(), rabbit_misc:resource_name(), node()) -> ok | {error, term()}. delete_member(VHost, Name, Node) -> - with_ra_queue(VHost, Name, - fun(Mod, Q) -> Mod:delete_member(Q, Node) end). + Fun = fun(Mod, Q) -> + case is_queue_member(Q, Node) of + true -> + Mod:delete_member(Q, Node); + false -> + %% idempotent by design + ok + end + end, + with_ra_queue(VHost, Name, Fun). + +is_queue_member(Q, Member) -> + lists:member(Member, amqqueue:get_nodes(Q)). with_ra_queue(VHost, Name, Fun) -> QName = rabbit_misc:queue_resource(VHost, Name), diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index befc457baef5..f47fc634464a 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -1371,48 +1371,54 @@ do_add_member(Q, Node, Membership, Timeout) ServerId = {RaName, Node}, Members = members(Q), - MachineVersion = erpc_call(Node, rabbit_fifo, version, [], infinity), - Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion), - case ra:start_server(?RA_SYSTEM, Conf) of - ok -> - ServerIdSpec = maps:with([id, uid, membership], Conf), - case ra:add_member(Members, ServerIdSpec, Timeout) of - {ok, {RaIndex, RaTerm}, Leader} -> - Fun = fun(Q1) -> - Q2 = update_type_state( - Q1, fun(#{nodes := Nodes} = Ts) -> - Ts#{nodes => lists:usort([Node | Nodes])} - end), - amqqueue:set_pid(Q2, Leader) - end, - %% The `ra:member_add/3` call above returns before the - %% change is committed. This is ok for that addition but - %% any follow-up changes to the cluster might be rejected - %% with the `cluster_change_not_permitted` error. - %% - %% Instead of changing other places to wait or retry their - %% cluster membership change, we wait for the current add - %% to be applied using a conditional leader query before - %% proceeding and returning. - {ok, _, _} = ra:leader_query( - Leader, - {erlang, is_list, []}, - #{condition => {applied, {RaIndex, RaTerm}}}), - _ = rabbit_amqqueue:update(QName, Fun), - ?LOG_INFO("Added a replica of quorum ~ts on node ~ts", [rabbit_misc:rs(QName), Node]), - ok; - {timeout, _} -> - _ = ra:force_delete_server(?RA_SYSTEM, ServerId), - _ = ra:remove_member(Members, ServerId), - {error, timeout}; + case erpc_call(Node, rabbit_fifo, version, [], infinity) of + {error, _} = Err -> + Err; + MachineVersion -> + Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion), + case ra:start_server(?RA_SYSTEM, Conf) of + ok -> + ServerIdSpec = maps:with([id, uid, membership], Conf), + case ra:add_member(Members, ServerIdSpec, Timeout) of + {ok, {RaIndex, RaTerm}, Leader} -> + Fun = fun(Q1) -> + Q2 = update_type_state( + Q1, fun(#{nodes := Nodes} = Ts) -> + Ts#{nodes => lists:usort( + [Node | Nodes])} + end), + amqqueue:set_pid(Q2, Leader) + end, + %% The `ra:member_add/3` call above returns before the + %% change is committed. This is ok for that addition but + %% any follow-up changes to the cluster might be rejected + %% with the `cluster_change_not_permitted` error. + %% + %% Instead of changing other places to wait or retry their + %% cluster membership change, we wait for the current add + %% to be applied using a conditional leader query before + %% proceeding and returning. + {ok, _, _} = ra:leader_query( + Leader, + {erlang, is_list, []}, + #{condition => {applied, {RaIndex, RaTerm}}}), + _ = rabbit_amqqueue:update(QName, Fun), + ?LOG_INFO("Added a replica of quorum ~ts on node ~ts", + [rabbit_misc:rs(QName), Node]), + ok; + {timeout, _} -> + _ = ra:force_delete_server(?RA_SYSTEM, ServerId), + _ = ra:remove_member(Members, ServerId), + {error, timeout}; + E -> + _ = ra:force_delete_server(?RA_SYSTEM, ServerId), + E + end; E -> - _ = ra:force_delete_server(?RA_SYSTEM, ServerId), + ?LOG_WARNING("Could not add a replica of quorum ~ts on node ~ts: ~p", + [rabbit_misc:rs(QName), Node, E]), E - end; - E -> - ?LOG_WARNING("Could not add a replica of quorum ~ts on node ~ts: ~p", - [rabbit_misc:rs(QName), Node, E]), - E + end end. delete_member(VHost, Name, Node) -> diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 3e9e94d25b94..88c72aa09087 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -83,7 +83,7 @@ groups() -> consume_in_minority, get_in_minority, reject_after_leader_transfer, - shrink_all, + delete_members, rebalance, node_removal_is_not_quorum_critical, leader_locator_client_local, @@ -340,9 +340,9 @@ init_per_testcase(Testcase, Config) -> recover_from_single_failure when IsMixed -> %% In a 3.8/3.9 cluster this will pass only if the failure occurs on the 3.8 node {skip, "recover_from_single_failure isn't mixed versions compatible"}; - shrink_all when IsMixed -> + delete_members when IsMixed -> %% In a 3.8/3.9 cluster only the first shrink will work as expected - {skip, "skrink_all isn't mixed versions compatible"}; + {skip, "delete_members isn't mixed versions compatible"}; delete_immediately_by_resource when IsMixed andalso ClusterSize == 3 -> {skip, "delete_immediately_by_resource isn't mixed versions compatible"}; queue_ttl when IsMixed andalso ClusterSize == 3 -> @@ -2064,7 +2064,7 @@ reject_after_leader_transfer(Config) -> requeue = true}), ok. -shrink_all(Config) -> +delete_members(Config) -> [Server0, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -2077,14 +2077,17 @@ shrink_all(Config) -> declare(Ch, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), ?awaitMatch([{_, {ok, 2}}, {_, {ok, 2}}], - rpc:call(Server0, rabbit_quorum_queue, shrink_all, [Server2]), + rpc:call(Server0, rabbit_queue_type_ra, delete_members, + [<<".*">>, <<".*">>, Server2, all]), ?DEFAULT_AWAIT), ?awaitMatch([{_, {ok, 1}}, {_, {ok, 1}}], - rpc:call(Server0, rabbit_quorum_queue, shrink_all, [Server1]), + rpc:call(Server0, rabbit_queue_type_ra, delete_members, + [<<".*">>, <<".*">>, Server1, all]), ?DEFAULT_AWAIT), ?awaitMatch([{_, {error, 1, last_node}}, {_, {error, 1, last_node}}], - rpc:call(Server0, rabbit_quorum_queue, shrink_all, [Server0]), + rpc:call(Server0, rabbit_queue_type_ra, delete_members, + [<<".*">>, <<".*">>, Server0, all]), ?DEFAULT_AWAIT), ok. @@ -3082,8 +3085,8 @@ add_member_not_running(Config) -> QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - ?assertEqual({error, node_not_running}, - rpc:call(Server, rabbit_quorum_queue, add_member, + ?assertEqual({error, noconnection}, + rpc:call(Server, rabbit_queue_type_ra, add_member, [<<"/">>, QQ, 'rabbit@burrow', voter, 5000])). add_member_classic(Config) -> @@ -3091,8 +3094,8 @@ add_member_classic(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config, Server), CQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])), - ?assertEqual({error, classic_queue_not_supported}, - rpc:call(Server, rabbit_quorum_queue, add_member, + ?assertEqual({error, {unsupported, rabbit_classic_queue}}, + rpc:call(Server, rabbit_queue_type_ra, add_member, [<<"/">>, CQ, Server, voter, 5000])). add_member_wrong_type(Config) -> @@ -3101,8 +3104,8 @@ add_member_wrong_type(Config) -> SQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', SQ, 0, 0}, declare(Ch, SQ, [{<<"x-queue-type">>, longstr, <<"stream">>}])), - ?assertEqual({error, not_quorum_queue}, - rpc:call(Server, rabbit_quorum_queue, add_member, + ?assertEqual({error, {unsupported, rabbit_stream_queue}}, + rpc:call(Server, rabbit_queue_type_ra, add_member, [<<"/">>, SQ, Server, voter, 5000])). add_member_already_a_member(Config) -> @@ -3113,14 +3116,14 @@ add_member_already_a_member(Config) -> declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), %% idempotent by design ?assertEqual(ok, - rpc:call(Server, rabbit_quorum_queue, add_member, + rpc:call(Server, rabbit_queue_type_ra, add_member, [<<"/">>, QQ, Server, voter, 5000])). add_member_not_found(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), QQ = ?config(queue_name, Config), ?assertEqual({error, not_found}, - rpc:call(Server, rabbit_quorum_queue, add_member, + rpc:call(Server, rabbit_queue_type_ra, add_member, [<<"/">>, QQ, Server, voter, 5000])). add_member(Config) -> @@ -3130,13 +3133,13 @@ add_member(Config) -> QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - ?assertEqual({error, node_not_running}, - rpc:call(Server0, rabbit_quorum_queue, add_member, + ?assertEqual({error, noconnection}, + rpc:call(Server0, rabbit_queue_type_ra, add_member, [<<"/">>, QQ, Server1, voter, 5000])), ok = rabbit_control_helper:command(stop_app, Server1), ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), rabbit_control_helper:command(start_app, Server1), - ?assertEqual(ok, rpc:call(Server1, rabbit_quorum_queue, add_member, + ?assertEqual(ok, rpc:call(Server1, rabbit_queue_type_ra, add_member, [<<"/">>, QQ, Server1, voter, 5000])), Info = rpc:call(Server0, rabbit_quorum_queue, infos, [rabbit_misc:r(<<"/">>, queue, QQ)]), @@ -3154,8 +3157,8 @@ add_member_2(Config) -> ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, {<<"x-quorum-initial-group-size">>, long, 1}])), - ?assertEqual(ok, rpc:call(Server0, rabbit_quorum_queue, add_member, - [<<"/">>, QQ, Server0, 5000])), + ?assertEqual(ok, rpc:call(Server0, rabbit_queue_type_ra, add_member, + [<<"/">>, QQ, Server0, voter, 5000])), Info = rpc:call(Server0, rabbit_quorum_queue, infos, [rabbit_misc:r(<<"/">>, queue, QQ)]), Servers = lists:sort([Server0, Server1]), @@ -3170,7 +3173,7 @@ delete_member_not_running(Config) -> declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), %% it should be possible to delete members that are not online (e.g. decomissioned) ?assertEqual(ok, - rpc:call(Server, rabbit_quorum_queue, delete_member, + rpc:call(Server, rabbit_queue_type_ra, delete_member, [<<"/">>, QQ, 'rabbit@burrow'])). delete_member_classic(Config) -> @@ -3178,8 +3181,8 @@ delete_member_classic(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config, Server), CQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])), - ?assertEqual({error, classic_queue_not_supported}, - rpc:call(Server, rabbit_quorum_queue, delete_member, + ?assertEqual({error, {unsupported, rabbit_classic_queue}}, + rpc:call(Server, rabbit_queue_type_ra, delete_member, [<<"/">>, CQ, Server])). delete_member_wrong_type(Config) -> @@ -3188,15 +3191,15 @@ delete_member_wrong_type(Config) -> SQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', SQ, 0, 0}, declare(Ch, SQ, [{<<"x-queue-type">>, longstr, <<"stream">>}])), - ?assertEqual({error, not_quorum_queue}, - rpc:call(Server, rabbit_quorum_queue, delete_member, + ?assertEqual({error, {unsupported, rabbit_stream_queue}}, + rpc:call(Server, rabbit_queue_type_ra, delete_member, [<<"/">>, SQ, Server])). delete_member_queue_not_found(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), QQ = ?config(queue_name, Config), ?assertEqual({error, not_found}, - rpc:call(Server, rabbit_quorum_queue, delete_member, + rpc:call(Server, rabbit_queue_type_ra, delete_member, [<<"/">>, QQ, Server])). delete_member(Config) -> @@ -3208,7 +3211,7 @@ delete_member(Config) -> declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), ?awaitMatch(NServers, count_online_nodes(Server, <<"/">>, QQ), ?DEFAULT_AWAIT), ?assertEqual(ok, - rpc:call(Server, rabbit_quorum_queue, delete_member, + rpc:call(Server, rabbit_queue_type_ra, delete_member, [<<"/">>, QQ, Server])). delete_member_not_a_member(Config) -> @@ -3220,11 +3223,11 @@ delete_member_not_a_member(Config) -> declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), ?awaitMatch(NServers, count_online_nodes(Server, <<"/">>, QQ), ?DEFAULT_AWAIT), ?assertEqual(ok, - rpc:call(Server, rabbit_quorum_queue, delete_member, + rpc:call(Server, rabbit_queue_type_ra, delete_member, [<<"/">>, QQ, Server])), %% idempotent by design ?assertEqual(ok, - rpc:call(Server, rabbit_quorum_queue, delete_member, + rpc:call(Server, rabbit_queue_type_ra, delete_member, [<<"/">>, QQ, Server])). delete_member_member_already_deleted(Config) -> @@ -3246,7 +3249,7 @@ delete_member_member_already_deleted(Config) -> %% idempotent by design ?assertEqual(ok, - rpc:call(Server, rabbit_quorum_queue, delete_member, + rpc:call(Server, rabbit_queue_type_ra, delete_member, [<<"/">>, QQ, Server2])), queue_utils:assert_number_of_replicas( Config, Server, <<"/">>, QQ, 1), @@ -4194,8 +4197,8 @@ status(Config) -> ] when T1 /= <<>> andalso T2 /= <<>> andalso T3 /= <<>>, - rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, - status, [<<"/">>, QQ])), + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_queue_type_ra, + status, [<<"/">>, QQ])), wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), ok.