diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 63ebda9a0fb3..ea4fb5d46647 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -713,59 +713,56 @@ restart_server({_, _} = Ref) -> -spec delete(amqqueue:amqqueue(), boolean(), boolean(), rabbit_types:username()) -> - {ok, QLen :: non_neg_integer()} | - {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. -delete(Q, true, _IfEmpty, _ActingUser) when ?amqqueue_is_quorum(Q) -> - {protocol_error, not_implemented, - "cannot delete ~ts. queue.delete operations with if-unused flag set are not supported by quorum queues", - [rabbit_misc:rs(amqqueue:get_name(Q))]}; -delete(Q, _IfUnused, true, _ActingUser) when ?amqqueue_is_quorum(Q) -> - {protocol_error, not_implemented, - "cannot delete ~ts. queue.delete operations with if-empty flag set are not supported by quorum queues", - [rabbit_misc:rs(amqqueue:get_name(Q))]}; -delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) -> + {ok, QLen :: non_neg_integer()}. +delete(Q, IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) -> {Name, _} = amqqueue:get_pid(Q), QName = amqqueue:get_name(Q), QNodes = get_nodes(Q), - %% TODO Quorum queue needs to support consumer tracking for IfUnused - Timeout = ?DELETE_TIMEOUT, - {ok, ReadyMsgs, _} = stat(Q), - Servers = [{Name, Node} || Node <- QNodes], - case ra:delete_cluster(Servers, Timeout) of - {ok, {_, LeaderNode} = Leader} -> - MRef = erlang:monitor(process, Leader), - receive - {'DOWN', MRef, process, _, _} -> + + {ok, ReadyMsgs, Consumers} = stat(Q), + IsEmpty = ReadyMsgs == 0, + IsUnused = Consumers == 0, + if IfEmpty and not(IsEmpty) -> {error, not_empty}; + IfUnused and not(IsUnused) -> {error, in_use}; + true -> + Timeout = ?DELETE_TIMEOUT, + Servers = [{Name, Node} || Node <- QNodes], + case ra:delete_cluster(Servers, Timeout) of + {ok, {_, LeaderNode} = Leader} -> + MRef = erlang:monitor(process, Leader), + receive + {'DOWN', MRef, process, _, _} -> ok - after Timeout -> - erlang:demonitor(MRef, [flush]), - ok = force_delete_queue(Servers) - end, - notify_decorators(QName, shutdown), - ok = delete_queue_data(Q, ActingUser), - _ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName], - ?RPC_TIMEOUT), - {ok, ReadyMsgs}; - {error, {no_more_servers_to_try, Errs}} -> - case lists:all(fun({{error, noproc}, _}) -> true; - (_) -> false - end, Errs) of - true -> - %% If all ra nodes were already down, the delete - %% has succeed - delete_queue_data(Q, ActingUser), + after Timeout -> + erlang:demonitor(MRef, [flush]), + ok = force_delete_queue(Servers) + end, + notify_decorators(QName, shutdown), + ok = delete_queue_data(Q, ActingUser), + _ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName], + ?RPC_TIMEOUT), {ok, ReadyMsgs}; - false -> - %% attempt forced deletion of all servers - rabbit_log:warning( - "Could not delete quorum '~ts', not enough nodes " - " online to reach a quorum: ~255p." + {error, {no_more_servers_to_try, Errs}} -> + case lists:all(fun({{error, noproc}, _}) -> true; + (_) -> false + end, Errs) of + true -> + %% If all ra nodes were already down, the delete + %% has succeed + delete_queue_data(Q, ActingUser), + {ok, ReadyMsgs}; + false -> + %% attempt forced deletion of all servers + rabbit_log:warning( + "Could not delete quorum '~ts', not enough nodes " + " online to reach a quorum: ~255p." " Attempting force delete.", - [rabbit_misc:rs(QName), Errs]), - ok = force_delete_queue(Servers), - notify_decorators(QName, shutdown), - delete_queue_data(Q, ActingUser), - {ok, ReadyMsgs} + [rabbit_misc:rs(QName), Errs]), + ok = force_delete_queue(Servers), + notify_decorators(QName, shutdown), + delete_queue_data(Q, ActingUser), + {ok, ReadyMsgs} + end end end.