Skip to content

Commit

Permalink
Merge pull request #10656 from rabbitmq/loic-remove-cqv1-option
Browse files Browse the repository at this point in the history
4.x: remove availability of CQv1
  • Loading branch information
lhoguin authored May 13, 2024
2 parents c35a0b8 + ecf4600 commit 0942573
Show file tree
Hide file tree
Showing 13 changed files with 116 additions and 823 deletions.
2 changes: 0 additions & 2 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,6 @@ rabbitmq_integration_suite(
rabbitmq_integration_suite(
name = "classic_queue_prop_SUITE",
size = "large",
shard_count = 6,
sharding_method = "case",
deps = [
"@proper//:erlang_app",
],
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -2538,8 +2538,8 @@ end}.

{translation, "rabbit.classic_queue_default_version",
fun(Conf) ->
case cuttlefish:conf_get("classic_queue.default_version", Conf, 1) of
1 -> 1;
case cuttlefish:conf_get("classic_queue.default_version", Conf, 2) of
1 -> cuttlefish:invalid("Classic queues v1 are no longer supported");
2 -> 2;
_ -> cuttlefish:unset()
end
Expand Down
6 changes: 1 addition & 5 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -477,12 +477,8 @@ init_queue_mode(Mode, State = #q {backing_queue = BQ,

init_queue_version(Version0, State = #q {backing_queue = BQ,
backing_queue_state = BQS}) ->
%% When the version is undefined we use the default version 1.
%% We want to BQ:set_queue_version in all cases because a v2
%% policy might have been deleted, for example, and we want
%% the queue to go back to v1.
Version = case Version0 of
undefined -> rabbit_misc:get_env(rabbit, classic_queue_default_version, 1);
undefined -> 2;
_ -> Version0
end,
BQS1 = BQ:set_queue_version(Version, BQS),
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_classic_queue_index_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1078,7 +1078,7 @@ sync(State0 = #qi{ confirms = Confirms,
end,
State#qi{ confirms = sets:new([{version,2}]) }.

-spec needs_sync(state()) -> 'false'.
-spec needs_sync(state()) -> 'false' | 'confirms'.

needs_sync(State = #qi{ confirms = Confirms }) ->
?DEBUG("~0p", [State]),
Expand Down
89 changes: 7 additions & 82 deletions deps/rabbit/src/rabbit_queue_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,9 @@ init_for_conversion(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncM
'undefined' | non_neg_integer(), qistate()}.

recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,
ContainsCheckFun, OnSyncFun, OnSyncMsgFun, Context) ->
ContainsCheckFun, OnSyncFun, OnSyncMsgFun,
%% We only allow using this module when converting to v2.
convert) ->
#{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost),
put(segment_entry_count, SegmentEntryCount),
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
Expand All @@ -323,10 +325,10 @@ recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,
CleanShutdown = Terms /= non_clean_shutdown,
case CleanShutdown andalso MsgStoreRecovered of
true -> case proplists:get_value(segments, Terms, non_clean_shutdown) of
non_clean_shutdown -> init_dirty(false, ContainsCheckFun, State1, Context);
non_clean_shutdown -> init_dirty(false, ContainsCheckFun, State1);
RecoveredCounts -> init_clean(RecoveredCounts, State1)
end;
false -> init_dirty(CleanShutdown, ContainsCheckFun, State1, Context)
false -> init_dirty(CleanShutdown, ContainsCheckFun, State1)
end.

-spec terminate(rabbit_types:vhost(), [any()], qistate()) -> qistate().
Expand Down Expand Up @@ -644,7 +646,7 @@ init_clean(RecoveredCounts, State) ->
-define(RECOVER_BYTES, 2).
-define(RECOVER_COUNTER_SIZE, 2).

init_dirty(CleanShutdown, ContainsCheckFun, State, Context) ->
init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% Recover the journal completely. This will also load segments
%% which have entries in the journal and remove duplicates. The
%% counts will correctly reflect the combination of the segment
Expand Down Expand Up @@ -679,84 +681,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State, Context) ->
%% recovery fails with a crash.
State2 = flush_journal(State1 #qistate { segments = Segments1,
dirty_count = DirtyCount }),
case Context of
convert ->
{Count, Bytes, State2};
main ->
%% We try to see if there are segment files from the v2 index.
case rabbit_file:wildcard(".*\\.qi", Dir) of
%% We are recovering a dirty queue that was using the v2 index or in
%% the process of converting from v2 to v1.
[_|_] ->
#resource{virtual_host = VHost, name = QName} = State2#qistate.queue_name,
rabbit_log:info("Queue ~ts in vhost ~ts recovered ~b total messages before resuming convert",
[QName, VHost, Count]),
CountersRef = counters:new(?RECOVER_COUNTER_SIZE, []),
State3 = recover_index_v2_dirty(State2, ContainsCheckFun, CountersRef),
{Count + counters:get(CountersRef, ?RECOVER_COUNT),
Bytes + counters:get(CountersRef, ?RECOVER_BYTES),
State3};
%% Otherwise keep default values.
[] ->
{Count, Bytes, State2}
end
end.

recover_index_v2_dirty(State0 = #qistate { queue_name = Name,
on_sync = OnSyncFun,
on_sync_msg = OnSyncMsgFun },
ContainsCheckFun, CountersRef) ->
#resource{virtual_host = VHost, name = QName} = Name,
rabbit_log:info("Converting queue ~ts in vhost ~ts from v2 to v1 after unclean shutdown", [QName, VHost]),
%% We cannot use the counts/bytes because some messages may be in both
%% the v1 and v2 indexes after a crash.
{_, _, V2State} = rabbit_classic_queue_index_v2:recover(Name, non_clean_shutdown, true,
ContainsCheckFun, OnSyncFun, OnSyncMsgFun,
convert),
State = recover_index_v2_common(State0, V2State, CountersRef),
rabbit_log:info("Queue ~ts in vhost ~ts converted ~b total messages from v2 to v1",
[QName, VHost, counters:get(CountersRef, ?RECOVER_COUNT)]),
State.

%% At this point all messages are persistent because transient messages
%% were dropped during the v2 index recovery.
recover_index_v2_common(State0 = #qistate { queue_name = Name, dir = Dir },
V2State, CountersRef) ->
%% Use a temporary per-queue store state to read embedded messages.
StoreState0 = rabbit_classic_queue_store_v2:init(Name),
%% Go through the v2 index and publish messages to v1 index.
{LoSeqId, HiSeqId, _} = rabbit_classic_queue_index_v2:bounds(V2State),
%% When resuming after a crash we need to double check the messages that are both
%% in the v1 and v2 index (effectively the messages below the upper bound of the
%% v1 index that are about to be written to it).
{_, V1HiSeqId, _} = bounds(State0),
SkipFun = fun
(SeqId, FunState0) when SeqId < V1HiSeqId ->
case read(SeqId, SeqId + 1, FunState0) of
%% Message already exists, skip.
{[_], FunState} ->
{skip, FunState};
%% Message doesn't exist, write.
{[], FunState} ->
{write, FunState}
end;
%% Message is out of bounds of the v1 index.
(_, FunState) ->
{write, FunState}
end,
%% We use a common function also used with conversion on policy change.
{State1, _StoreState} = rabbit_variable_queue:convert_from_v2_to_v1_loop(Name, State0, V2State, StoreState0,
{CountersRef, ?RECOVER_COUNT, ?RECOVER_BYTES},
LoSeqId, HiSeqId, SkipFun),
%% Delete any remaining v2 index files.
OldFiles = rabbit_file:wildcard(".*\\.qi", Dir)
++ rabbit_file:wildcard(".*\\.qs", Dir),
_ = [rabbit_file:delete(filename:join(Dir, F)) || F <- OldFiles],
%% Ensure that everything in the v1 index is written to disk.
State = flush(State1),
%% Clean up all the garbage that we have surely been creating.
garbage_collect(),
State.
{Count, Bytes, State2}.

terminate(State = #qistate { journal_handle = JournalHdl,
segments = Segments }) ->
Expand Down
Loading

0 comments on commit 0942573

Please sign in to comment.