Skip to content

Commit

Permalink
Close #10543. Add If-Unused and If-Empty support for delete_queue for…
Browse files Browse the repository at this point in the history
… QQs
  • Loading branch information
deadtrickster committed Mar 11, 2024
1 parent 72b2506 commit 1e972bb
Showing 1 changed file with 44 additions and 47 deletions.
91 changes: 44 additions & 47 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -713,59 +713,56 @@ restart_server({_, _} = Ref) ->
-spec delete(amqqueue:amqqueue(),
boolean(), boolean(),
rabbit_types:username()) ->
{ok, QLen :: non_neg_integer()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
delete(Q, true, _IfEmpty, _ActingUser) when ?amqqueue_is_quorum(Q) ->
{protocol_error, not_implemented,
"cannot delete ~ts. queue.delete operations with if-unused flag set are not supported by quorum queues",
[rabbit_misc:rs(amqqueue:get_name(Q))]};
delete(Q, _IfUnused, true, _ActingUser) when ?amqqueue_is_quorum(Q) ->
{protocol_error, not_implemented,
"cannot delete ~ts. queue.delete operations with if-empty flag set are not supported by quorum queues",
[rabbit_misc:rs(amqqueue:get_name(Q))]};
delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
{ok, QLen :: non_neg_integer()}.
delete(Q, IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
{Name, _} = amqqueue:get_pid(Q),
QName = amqqueue:get_name(Q),
QNodes = get_nodes(Q),
%% TODO Quorum queue needs to support consumer tracking for IfUnused
Timeout = ?DELETE_TIMEOUT,
{ok, ReadyMsgs, _} = stat(Q),
Servers = [{Name, Node} || Node <- QNodes],
case ra:delete_cluster(Servers, Timeout) of
{ok, {_, LeaderNode} = Leader} ->
MRef = erlang:monitor(process, Leader),
receive
{'DOWN', MRef, process, _, _} ->

{ok, ReadyMsgs, Consumers} = stat(Q),
IsEmpty = ReadyMsgs == 0,
IsUnused = Consumers == 0,
if IfEmpty and not(IsEmpty) -> {error, not_empty};
IfUnused and not(IsUnused) -> {error, in_use};
true ->
Timeout = ?DELETE_TIMEOUT,
Servers = [{Name, Node} || Node <- QNodes],
case ra:delete_cluster(Servers, Timeout) of
{ok, {_, LeaderNode} = Leader} ->
MRef = erlang:monitor(process, Leader),
receive
{'DOWN', MRef, process, _, _} ->
ok
after Timeout ->
erlang:demonitor(MRef, [flush]),
ok = force_delete_queue(Servers)
end,
notify_decorators(QName, shutdown),
ok = delete_queue_data(Q, ActingUser),
_ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?RPC_TIMEOUT),
{ok, ReadyMsgs};
{error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
(_) -> false
end, Errs) of
true ->
%% If all ra nodes were already down, the delete
%% has succeed
delete_queue_data(Q, ActingUser),
after Timeout ->
erlang:demonitor(MRef, [flush]),
ok = force_delete_queue(Servers)
end,
notify_decorators(QName, shutdown),
ok = delete_queue_data(Q, ActingUser),
_ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?RPC_TIMEOUT),
{ok, ReadyMsgs};
false ->
%% attempt forced deletion of all servers
rabbit_log:warning(
"Could not delete quorum '~ts', not enough nodes "
" online to reach a quorum: ~255p."
{error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
(_) -> false
end, Errs) of
true ->
%% If all ra nodes were already down, the delete
%% has succeed
delete_queue_data(Q, ActingUser),
{ok, ReadyMsgs};
false ->
%% attempt forced deletion of all servers
rabbit_log:warning(
"Could not delete quorum '~ts', not enough nodes "
" online to reach a quorum: ~255p."
" Attempting force delete.",
[rabbit_misc:rs(QName), Errs]),
ok = force_delete_queue(Servers),
notify_decorators(QName, shutdown),
delete_queue_data(Q, ActingUser),
{ok, ReadyMsgs}
[rabbit_misc:rs(QName), Errs]),
ok = force_delete_queue(Servers),
notify_decorators(QName, shutdown),
delete_queue_data(Q, ActingUser),
{ok, ReadyMsgs}
end
end
end.

Expand Down

0 comments on commit 1e972bb

Please sign in to comment.