From 47ee288e8b0840893c9a0c56bb050e45f6772e07 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 17 Sep 2021 14:53:33 +0100 Subject: [PATCH 1/3] QQ: fix memory leak when cancelling consumer If the queue is empty when a consumer is cancelled it would leave the consumer id inside the service queue. If an application subscribes/unsubscibes in a loop from an empty queue this would cause the service queue to never be cleared up. NB: whenever we make a change to how the quorum queue state machien is calculated we need to consider how this effects determinism as during an upgrade different members may calculate a different service queue state. In this case it should be ok as they will eventually converge on the same state once all "dead" consumer ids have been removed from the queue. In any case it should not affect how messages are assigned to consumers. (cherry picked from commit 5779059bd5b1e14807cdd9d2bcf2180b1ba5e6e4) --- deps/rabbit/src/rabbit_fifo.erl | 27 +++++++++++++------------- deps/rabbit/test/rabbit_fifo_SUITE.erl | 11 +++++++++++ 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index d932d20ebb68..17ede7b12c81 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -1750,25 +1750,26 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0, messages = Messages0, consumers = Cons0} = InitState) -> case priority_queue:out(SQ0) of - {{value, ConsumerId}, SQ1} -> + {{value, ConsumerId}, SQ1} + when is_map_key(ConsumerId, Cons0) -> case take_next_msg(InitState) of {ConsumerMsg, State0} -> %% there are consumers waiting to be serviced %% process consumer checkout - case maps:find(ConsumerId, Cons0) of - {ok, #consumer{credit = 0}} -> + case maps:get(ConsumerId, Cons0) of + #consumer{credit = 0} -> %% no credit but was still on queue %% can happen when draining %% recurse without consumer on queue checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); - {ok, #consumer{status = cancelled}} -> + #consumer{status = cancelled} -> checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); - {ok, #consumer{status = suspected_down}} -> + #consumer{status = suspected_down} -> checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); - {ok, #consumer{checked_out = Checked0, - next_msg_id = Next, - credit = Credit, - delivery_count = DelCnt} = Con0} -> + #consumer{checked_out = Checked0, + next_msg_id = Next, + credit = Credit, + delivery_count = DelCnt} = Con0 -> Checked = maps:put(Next, ConsumerMsg, Checked0), Con = Con0#consumer{checked_out = Checked, next_msg_id = Next + 1, @@ -1795,14 +1796,14 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0, add_bytes_checkout(Header, State1)), M} end, - {success, ConsumerId, Next, Msg, State}; - error -> - %% consumer did not exist but was queued, recurse - checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}) + {success, ConsumerId, Next, Msg, State} end; empty -> {nochange, InitState} end; + {{value, _ConsumerId}, SQ1} -> + %% consumer did not exist but was queued, recurse + checkout_one(Meta, InitState#?MODULE{service_queue = SQ1}); {empty, _} -> case lqueue:len(Messages0) of 0 -> {nochange, InitState}; diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index 2c5b15295f41..1eed6a0d753f 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -386,6 +386,16 @@ return_auto_checked_out_test(_) -> Effects), ok. +cancelled_checkout_empty_queue_test(_) -> + Cid = {<<"cid">>, self()}, + {State1, _} = check_auto(Cid, 2, test_init(test)), + % cancelled checkout should clear out service_queue also, else we'd get a + % build up of these + {State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1), + ?assertEqual(0, map_size(State2#rabbit_fifo.consumers)), + ?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)), + ok. + cancelled_checkout_out_test(_) -> Cid = {<<"cid">>, self()}, {State00, [_, _]} = enq(1, 1, first, test_init(test)), @@ -395,6 +405,7 @@ cancelled_checkout_out_test(_) -> {State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1), ?assertEqual(1, lqueue:len(State2#rabbit_fifo.messages)), ?assertEqual(0, lqueue:len(State2#rabbit_fifo.returns)), + ?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)), {State3, {dequeue, empty}} = apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, settled}, #{}), State2), From a787a9257692c2bbee2af80a39e40380340976be Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 17 Sep 2021 17:09:30 +0100 Subject: [PATCH 2/3] QQ: emit release cursors after consumer cancel If this is not done apps that consume/cancel from empty queues in a loop will grow the raft log in an unbounded manner. This could also be the case for the garbage_collect command. (cherry picked from commit eaa216da8246a2fc79108e3e0780996ab2bc11ac) --- deps/rabbit/src/rabbit_fifo.erl | 15 +++++++++------ deps/rabbit/test/rabbit_fifo_SUITE.erl | 4 +++- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 17ede7b12c81..ddc517a9717e 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -340,10 +340,13 @@ apply(#{index := Index, {State, Reply, Effects} end end; -apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) -> - {State, Effects} = cancel_consumer(Meta, ConsumerId, State0, [], - consumer_cancel), - checkout(Meta, State0, State, Effects); +apply(#{index := Idx} = Meta, + #checkout{spec = cancel, + consumer_id = ConsumerId}, State0) -> + {State1, Effects1} = cancel_consumer(Meta, ConsumerId, State0, [], + consumer_cancel), + {State, Reply, Effects} = checkout(Meta, State0, State1, Effects1), + update_smallest_raft_index(Idx, Reply, State, Effects); apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId}, State0) -> @@ -372,8 +375,8 @@ apply(#{index := Index}, #purge{}, {State, _, Effects} = evaluate_limit(Index, false, State0, State1, Effects0), update_smallest_raft_index(Index, Reply, State, Effects); -apply(_Meta, #garbage_collection{}, State) -> - {State, ok, [{aux, garbage_collection}]}; +apply(#{index := Idx}, #garbage_collection{}, State) -> + update_smallest_raft_index(Idx, ok, State, [{aux, garbage_collection}]); apply(#{system_time := Ts} = Meta, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index 1eed6a0d753f..9cb401fc8eec 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -391,9 +391,11 @@ cancelled_checkout_empty_queue_test(_) -> {State1, _} = check_auto(Cid, 2, test_init(test)), % cancelled checkout should clear out service_queue also, else we'd get a % build up of these - {State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1), + {State2, _, Effects} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1), ?assertEqual(0, map_size(State2#rabbit_fifo.consumers)), ?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)), + ct:pal("Effs: ~p", [Effects]), + ?ASSERT_EFF({release_cursor, _, _}, Effects), ok. cancelled_checkout_out_test(_) -> From dc43970dd665fbebffc90fbdcd534d6126f9db85 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Mon, 20 Sep 2021 12:19:22 +0100 Subject: [PATCH 3/3] Emit release cursor for more commands It should be rare that repeated use of these commands would grow the Raft log excessively but just incase we evaluate the release cursors here anyway so that if the queue is empty we may trigger a snapshot anyway. (cherry picked from commit ee6ef35873e7e9d08ca29700d965c90cb4cc3f98) --- deps/rabbit/src/rabbit_fifo.erl | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index ddc517a9717e..71172fce7075 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -509,13 +509,14 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, checkout(Meta, State0, State, Effects); apply(_, {nodedown, _Node}, State) -> {State, ok}; -apply(Meta, #purge_nodes{nodes = Nodes}, State0) -> +apply(#{index := Idx} = Meta, #purge_nodes{nodes = Nodes}, State0) -> {State, Effects} = lists:foldl(fun(Node, {S, E}) -> purge_node(Meta, Node, S, E) end, {State0, []}, Nodes), - {State, ok, Effects}; -apply(Meta, #update_config{config = Conf}, State) -> - checkout(Meta, State, update_config(Conf, State), []); + update_smallest_raft_index(Idx, ok, State, Effects); +apply(#{index := Idx} = Meta, #update_config{config = Conf}, State0) -> + {State, Reply, Effects} = checkout(Meta, State0, update_config(Conf, State0), []), + update_smallest_raft_index(Idx, Reply, State, Effects); apply(_Meta, {machine_version, 0, 1}, V0State) -> State = convert_v0_to_v1(V0State), {State, ok, []};