Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: remove availability of CQv1 #10656

Merged
merged 1 commit into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,6 @@ rabbitmq_integration_suite(
rabbitmq_integration_suite(
name = "classic_queue_prop_SUITE",
size = "large",
shard_count = 6,
sharding_method = "case",
deps = [
"@proper//:erlang_app",
],
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -2538,8 +2538,8 @@ end}.

{translation, "rabbit.classic_queue_default_version",
fun(Conf) ->
case cuttlefish:conf_get("classic_queue.default_version", Conf, 1) of
1 -> 1;
case cuttlefish:conf_get("classic_queue.default_version", Conf, 2) of
1 -> cuttlefish:invalid("Classic queues v1 are no longer supported");
2 -> 2;
_ -> cuttlefish:unset()
end
Expand Down
6 changes: 1 addition & 5 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -477,12 +477,8 @@ init_queue_mode(Mode, State = #q {backing_queue = BQ,

init_queue_version(Version0, State = #q {backing_queue = BQ,
backing_queue_state = BQS}) ->
%% When the version is undefined we use the default version 1.
%% We want to BQ:set_queue_version in all cases because a v2
%% policy might have been deleted, for example, and we want
%% the queue to go back to v1.
Version = case Version0 of
undefined -> rabbit_misc:get_env(rabbit, classic_queue_default_version, 1);
undefined -> 2;
_ -> Version0
end,
BQS1 = BQ:set_queue_version(Version, BQS),
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_classic_queue_index_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1078,7 +1078,7 @@ sync(State0 = #qi{ confirms = Confirms,
end,
State#qi{ confirms = sets:new([{version,2}]) }.

-spec needs_sync(state()) -> 'false'.
-spec needs_sync(state()) -> 'false' | 'confirms'.

needs_sync(State = #qi{ confirms = Confirms }) ->
?DEBUG("~0p", [State]),
Expand Down
89 changes: 7 additions & 82 deletions deps/rabbit/src/rabbit_queue_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,9 @@ init_for_conversion(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncM
'undefined' | non_neg_integer(), qistate()}.

recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,
ContainsCheckFun, OnSyncFun, OnSyncMsgFun, Context) ->
ContainsCheckFun, OnSyncFun, OnSyncMsgFun,
%% We only allow using this module when converting to v2.
convert) ->
#{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost),
put(segment_entry_count, SegmentEntryCount),
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
Expand All @@ -323,10 +325,10 @@ recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,
CleanShutdown = Terms /= non_clean_shutdown,
case CleanShutdown andalso MsgStoreRecovered of
true -> case proplists:get_value(segments, Terms, non_clean_shutdown) of
non_clean_shutdown -> init_dirty(false, ContainsCheckFun, State1, Context);
non_clean_shutdown -> init_dirty(false, ContainsCheckFun, State1);
RecoveredCounts -> init_clean(RecoveredCounts, State1)
end;
false -> init_dirty(CleanShutdown, ContainsCheckFun, State1, Context)
false -> init_dirty(CleanShutdown, ContainsCheckFun, State1)
end.

-spec terminate(rabbit_types:vhost(), [any()], qistate()) -> qistate().
Expand Down Expand Up @@ -644,7 +646,7 @@ init_clean(RecoveredCounts, State) ->
-define(RECOVER_BYTES, 2).
-define(RECOVER_COUNTER_SIZE, 2).

init_dirty(CleanShutdown, ContainsCheckFun, State, Context) ->
init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% Recover the journal completely. This will also load segments
%% which have entries in the journal and remove duplicates. The
%% counts will correctly reflect the combination of the segment
Expand Down Expand Up @@ -679,84 +681,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State, Context) ->
%% recovery fails with a crash.
State2 = flush_journal(State1 #qistate { segments = Segments1,
dirty_count = DirtyCount }),
case Context of
convert ->
{Count, Bytes, State2};
main ->
%% We try to see if there are segment files from the v2 index.
case rabbit_file:wildcard(".*\\.qi", Dir) of
%% We are recovering a dirty queue that was using the v2 index or in
%% the process of converting from v2 to v1.
[_|_] ->
#resource{virtual_host = VHost, name = QName} = State2#qistate.queue_name,
rabbit_log:info("Queue ~ts in vhost ~ts recovered ~b total messages before resuming convert",
[QName, VHost, Count]),
CountersRef = counters:new(?RECOVER_COUNTER_SIZE, []),
State3 = recover_index_v2_dirty(State2, ContainsCheckFun, CountersRef),
{Count + counters:get(CountersRef, ?RECOVER_COUNT),
Bytes + counters:get(CountersRef, ?RECOVER_BYTES),
State3};
%% Otherwise keep default values.
[] ->
{Count, Bytes, State2}
end
end.

recover_index_v2_dirty(State0 = #qistate { queue_name = Name,
on_sync = OnSyncFun,
on_sync_msg = OnSyncMsgFun },
ContainsCheckFun, CountersRef) ->
#resource{virtual_host = VHost, name = QName} = Name,
rabbit_log:info("Converting queue ~ts in vhost ~ts from v2 to v1 after unclean shutdown", [QName, VHost]),
%% We cannot use the counts/bytes because some messages may be in both
%% the v1 and v2 indexes after a crash.
{_, _, V2State} = rabbit_classic_queue_index_v2:recover(Name, non_clean_shutdown, true,
ContainsCheckFun, OnSyncFun, OnSyncMsgFun,
convert),
State = recover_index_v2_common(State0, V2State, CountersRef),
rabbit_log:info("Queue ~ts in vhost ~ts converted ~b total messages from v2 to v1",
[QName, VHost, counters:get(CountersRef, ?RECOVER_COUNT)]),
State.

%% At this point all messages are persistent because transient messages
%% were dropped during the v2 index recovery.
recover_index_v2_common(State0 = #qistate { queue_name = Name, dir = Dir },
V2State, CountersRef) ->
%% Use a temporary per-queue store state to read embedded messages.
StoreState0 = rabbit_classic_queue_store_v2:init(Name),
%% Go through the v2 index and publish messages to v1 index.
{LoSeqId, HiSeqId, _} = rabbit_classic_queue_index_v2:bounds(V2State),
%% When resuming after a crash we need to double check the messages that are both
%% in the v1 and v2 index (effectively the messages below the upper bound of the
%% v1 index that are about to be written to it).
{_, V1HiSeqId, _} = bounds(State0),
SkipFun = fun
(SeqId, FunState0) when SeqId < V1HiSeqId ->
case read(SeqId, SeqId + 1, FunState0) of
%% Message already exists, skip.
{[_], FunState} ->
{skip, FunState};
%% Message doesn't exist, write.
{[], FunState} ->
{write, FunState}
end;
%% Message is out of bounds of the v1 index.
(_, FunState) ->
{write, FunState}
end,
%% We use a common function also used with conversion on policy change.
{State1, _StoreState} = rabbit_variable_queue:convert_from_v2_to_v1_loop(Name, State0, V2State, StoreState0,
{CountersRef, ?RECOVER_COUNT, ?RECOVER_BYTES},
LoSeqId, HiSeqId, SkipFun),
%% Delete any remaining v2 index files.
OldFiles = rabbit_file:wildcard(".*\\.qi", Dir)
++ rabbit_file:wildcard(".*\\.qs", Dir),
_ = [rabbit_file:delete(filename:join(Dir, F)) || F <- OldFiles],
%% Ensure that everything in the v1 index is written to disk.
State = flush(State1),
%% Clean up all the garbage that we have surely been creating.
garbage_collect(),
State.
{Count, Bytes, State2}.

terminate(State = #qistate { journal_handle = JournalHdl,
segments = Segments }) ->
Expand Down
Loading
Loading