Skip to content

Commit

Permalink
Merge pull request #12358 from rabbitmq/qq-release-cursor-bug
Browse files Browse the repository at this point in the history
QQ: fix off-by-one bug in release cursor effects.
  • Loading branch information
michaelklishin authored Sep 23, 2024
2 parents bf4d573 + 2ae4dbe commit 474d76f
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 2 deletions.
7 changes: 5 additions & 2 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,7 @@ handle_aux(_RaState, cast, tick, #?AUX{name = Name,
undefined ->
[{release_cursor, ra_aux:last_applied(RaAux)}];
Smallest ->
[{release_cursor, Smallest}]
[{release_cursor, Smallest - 1}]
end,
{no_reply, Aux, RaAux, Effs};
handle_aux(_RaState, cast, eol, #?AUX{name = Name} = Aux, RaAux) ->
Expand Down Expand Up @@ -2915,7 +2915,10 @@ release_cursor(LastSmallest, Smallest)
when is_integer(LastSmallest) andalso
is_integer(Smallest) andalso
Smallest > LastSmallest ->
[{release_cursor, Smallest}];
[{release_cursor, Smallest - 1}];
release_cursor(undefined, Smallest)
when is_integer(Smallest) ->
[{release_cursor, Smallest - 1}];
release_cursor(_, _) ->
[].

Expand Down
25 changes: 25 additions & 0 deletions deps/rabbit/test/rabbit_fifo_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2347,6 +2347,31 @@ aux_test(_) ->
?assert(X > 0.0),
ok.

handle_aux_tick_test(Config) ->
_ = ra_machine_ets:start_link(),
Aux0 = init_aux(aux_test),
LastApplied = 1,
MacState0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue, ?FUNCTION_NAME_B),
single_active_consumer_on => false}),
State0 = #{machine_state => MacState0,
log => mock_log,
last_applied => LastApplied},
{MacState1, _} = enq(Config, 1, 1, first, MacState0),
State1 = State0#{machine_state => MacState1},
meck:expect(ra_log, last_index_term, fun (_) -> {1, 0} end),
?assertEqual(1, rabbit_fifo:smallest_raft_index(MacState1)),
%% the release cursor should be 1 lower than the smallest raft index
{no_reply, _, _,
[{release_cursor, 0}]} = handle_aux(leader, cast, tick, Aux0, State1),
timer:sleep(10),

persistent_term:put(quorum_queue_checkpoint_config, {1, 0, 1}),
{no_reply, _, _,
[{checkpoint, 1, _},
{release_cursor, 0}]} = handle_aux(follower, cast, force_checkpoint, Aux0, State1),
ok.


%% machine version conversion test

Expand Down

0 comments on commit 474d76f

Please sign in to comment.