diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 80891cf7382c..8571bc0b8caf 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -319,8 +319,6 @@ rabbitmq_integration_suite( rabbitmq_integration_suite( name = "classic_queue_prop_SUITE", size = "large", - shard_count = 6, - sharding_method = "case", deps = [ "@proper//:erlang_app", ], diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 226a86e96531..53d4556094cf 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2538,8 +2538,8 @@ end}. {translation, "rabbit.classic_queue_default_version", fun(Conf) -> - case cuttlefish:conf_get("classic_queue.default_version", Conf, 1) of - 1 -> 1; + case cuttlefish:conf_get("classic_queue.default_version", Conf, 2) of + 1 -> cuttlefish:invalid("Classic queues v1 are no longer supported"); 2 -> 2; _ -> cuttlefish:unset() end diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index f645faef88b9..cd74ccc393b3 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -477,12 +477,8 @@ init_queue_mode(Mode, State = #q {backing_queue = BQ, init_queue_version(Version0, State = #q {backing_queue = BQ, backing_queue_state = BQS}) -> - %% When the version is undefined we use the default version 1. - %% We want to BQ:set_queue_version in all cases because a v2 - %% policy might have been deleted, for example, and we want - %% the queue to go back to v1. Version = case Version0 of - undefined -> rabbit_misc:get_env(rabbit, classic_queue_default_version, 1); + undefined -> 2; _ -> Version0 end, BQS1 = BQ:set_queue_version(Version, BQS), diff --git a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl index 4f8a2403e121..ef0f042c6214 100644 --- a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl @@ -1078,7 +1078,7 @@ sync(State0 = #qi{ confirms = Confirms, end, State#qi{ confirms = sets:new([{version,2}]) }. --spec needs_sync(state()) -> 'false'. +-spec needs_sync(state()) -> 'false' | 'confirms'. needs_sync(State = #qi{ confirms = Confirms }) -> ?DEBUG("~0p", [State]), diff --git a/deps/rabbit/src/rabbit_queue_index.erl b/deps/rabbit/src/rabbit_queue_index.erl index 346b39abec78..249e870af775 100644 --- a/deps/rabbit/src/rabbit_queue_index.erl +++ b/deps/rabbit/src/rabbit_queue_index.erl @@ -313,7 +313,9 @@ init_for_conversion(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncM 'undefined' | non_neg_integer(), qistate()}. recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered, - ContainsCheckFun, OnSyncFun, OnSyncMsgFun, Context) -> + ContainsCheckFun, OnSyncFun, OnSyncMsgFun, + %% We only allow using this module when converting to v2. + convert) -> #{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost), put(segment_entry_count, SegmentEntryCount), VHostDir = rabbit_vhost:msg_store_dir_path(VHost), @@ -323,10 +325,10 @@ recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered, CleanShutdown = Terms /= non_clean_shutdown, case CleanShutdown andalso MsgStoreRecovered of true -> case proplists:get_value(segments, Terms, non_clean_shutdown) of - non_clean_shutdown -> init_dirty(false, ContainsCheckFun, State1, Context); + non_clean_shutdown -> init_dirty(false, ContainsCheckFun, State1); RecoveredCounts -> init_clean(RecoveredCounts, State1) end; - false -> init_dirty(CleanShutdown, ContainsCheckFun, State1, Context) + false -> init_dirty(CleanShutdown, ContainsCheckFun, State1) end. -spec terminate(rabbit_types:vhost(), [any()], qistate()) -> qistate(). @@ -644,7 +646,7 @@ init_clean(RecoveredCounts, State) -> -define(RECOVER_BYTES, 2). -define(RECOVER_COUNTER_SIZE, 2). -init_dirty(CleanShutdown, ContainsCheckFun, State, Context) -> +init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% Recover the journal completely. This will also load segments %% which have entries in the journal and remove duplicates. The %% counts will correctly reflect the combination of the segment @@ -679,84 +681,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State, Context) -> %% recovery fails with a crash. State2 = flush_journal(State1 #qistate { segments = Segments1, dirty_count = DirtyCount }), - case Context of - convert -> - {Count, Bytes, State2}; - main -> - %% We try to see if there are segment files from the v2 index. - case rabbit_file:wildcard(".*\\.qi", Dir) of - %% We are recovering a dirty queue that was using the v2 index or in - %% the process of converting from v2 to v1. - [_|_] -> - #resource{virtual_host = VHost, name = QName} = State2#qistate.queue_name, - rabbit_log:info("Queue ~ts in vhost ~ts recovered ~b total messages before resuming convert", - [QName, VHost, Count]), - CountersRef = counters:new(?RECOVER_COUNTER_SIZE, []), - State3 = recover_index_v2_dirty(State2, ContainsCheckFun, CountersRef), - {Count + counters:get(CountersRef, ?RECOVER_COUNT), - Bytes + counters:get(CountersRef, ?RECOVER_BYTES), - State3}; - %% Otherwise keep default values. - [] -> - {Count, Bytes, State2} - end - end. - -recover_index_v2_dirty(State0 = #qistate { queue_name = Name, - on_sync = OnSyncFun, - on_sync_msg = OnSyncMsgFun }, - ContainsCheckFun, CountersRef) -> - #resource{virtual_host = VHost, name = QName} = Name, - rabbit_log:info("Converting queue ~ts in vhost ~ts from v2 to v1 after unclean shutdown", [QName, VHost]), - %% We cannot use the counts/bytes because some messages may be in both - %% the v1 and v2 indexes after a crash. - {_, _, V2State} = rabbit_classic_queue_index_v2:recover(Name, non_clean_shutdown, true, - ContainsCheckFun, OnSyncFun, OnSyncMsgFun, - convert), - State = recover_index_v2_common(State0, V2State, CountersRef), - rabbit_log:info("Queue ~ts in vhost ~ts converted ~b total messages from v2 to v1", - [QName, VHost, counters:get(CountersRef, ?RECOVER_COUNT)]), - State. - -%% At this point all messages are persistent because transient messages -%% were dropped during the v2 index recovery. -recover_index_v2_common(State0 = #qistate { queue_name = Name, dir = Dir }, - V2State, CountersRef) -> - %% Use a temporary per-queue store state to read embedded messages. - StoreState0 = rabbit_classic_queue_store_v2:init(Name), - %% Go through the v2 index and publish messages to v1 index. - {LoSeqId, HiSeqId, _} = rabbit_classic_queue_index_v2:bounds(V2State), - %% When resuming after a crash we need to double check the messages that are both - %% in the v1 and v2 index (effectively the messages below the upper bound of the - %% v1 index that are about to be written to it). - {_, V1HiSeqId, _} = bounds(State0), - SkipFun = fun - (SeqId, FunState0) when SeqId < V1HiSeqId -> - case read(SeqId, SeqId + 1, FunState0) of - %% Message already exists, skip. - {[_], FunState} -> - {skip, FunState}; - %% Message doesn't exist, write. - {[], FunState} -> - {write, FunState} - end; - %% Message is out of bounds of the v1 index. - (_, FunState) -> - {write, FunState} - end, - %% We use a common function also used with conversion on policy change. - {State1, _StoreState} = rabbit_variable_queue:convert_from_v2_to_v1_loop(Name, State0, V2State, StoreState0, - {CountersRef, ?RECOVER_COUNT, ?RECOVER_BYTES}, - LoSeqId, HiSeqId, SkipFun), - %% Delete any remaining v2 index files. - OldFiles = rabbit_file:wildcard(".*\\.qi", Dir) - ++ rabbit_file:wildcard(".*\\.qs", Dir), - _ = [rabbit_file:delete(filename:join(Dir, F)) || F <- OldFiles], - %% Ensure that everything in the v1 index is written to disk. - State = flush(State1), - %% Clean up all the garbage that we have surely been creating. - garbage_collect(), - State. + {Count, Bytes, State2}. terminate(State = #qistate { journal_handle = JournalHdl, segments = Segments }) -> diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl index 9e54844c1940..0c5e59b73c91 100644 --- a/deps/rabbit/src/rabbit_variable_queue.erl +++ b/deps/rabbit/src/rabbit_variable_queue.erl @@ -21,9 +21,9 @@ -export([start/2, stop/1]). -%% Used during dirty recovery to resume conversion between versions. +%% This function is used by rabbit_classic_queue_index_v2 +%% to convert v1 queues to v2 after an upgrade to 4.0. -export([convert_from_v1_to_v2_loop/8]). --export([convert_from_v2_to_v1_loop/8]). %% exported for testing only -export([start_msg_store/3, stop_msg_store/1, init/5]). @@ -52,20 +52,13 @@ %% %% Messages are persisted using a queue index and a message store. %% A few different scenarios may play out depending on the message -%% size and the queue-version argument. +%% size: %% -%% - queue-version=1, size < qi_msgs_embed_below: both the message -%% metadata and content are stored in rabbit_queue_index -%% -%% - queue-version=1, size >= qi_msgs_embed_below: the metadata -%% is stored in rabbit_queue_index, while the content is stored -%% in the per-vhost shared rabbit_msg_store -%% -%% - queue-version=2, size < qi_msgs_embed_below: the metadata +%% - size < qi_msgs_embed_below: the metadata %% is stored in rabbit_classic_queue_index_v2, while the content %% is stored in the per-queue rabbit_classic_queue_store_v2 %% -%% - queue-version=2, size >= qi_msgs_embed_below: the metadata +%% - size >= qi_msgs_embed_below: the metadata %% is stored in rabbit_classic_queue_index_v2, while the content %% is stored in the per-vhost shared rabbit_msg_store %% @@ -179,7 +172,7 @@ ram_pending_ack, %% msgs still in RAM disk_pending_ack, %% msgs in store, paged out qi_pending_ack, %% Unused. - index_mod, + index_mod, %% Unused. index_state, store_state, msg_store_clients, @@ -222,7 +215,7 @@ %% default queue or lazy queue mode, %% Unused. - version = 1, + version = 2, %% Unused. %% Fast path for confirms handling. Instead of having %% index/store keep track of confirms separately and %% doing intersect/subtract/union we just put the messages @@ -311,7 +304,6 @@ ram_pending_ack :: map(), disk_pending_ack :: map(), qi_pending_ack :: undefined, - index_mod :: rabbit_queue_index | rabbit_classic_queue_index_v2, index_state :: any(), store_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, @@ -346,7 +338,7 @@ io_batch_size :: pos_integer(), mode :: 'default' | 'lazy', - version :: 1 | 2, + version :: 2, unconfirmed_simple :: sets:set()}. -define(BLANK_DELTA, #delta { start_seq_id = undefined, @@ -431,14 +423,11 @@ init(Queue, Recover, Callback) -> init(Q, new, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqueue(Q) -> QueueName = amqqueue:get_name(Q), IsDurable = amqqueue:is_durable(Q), - %% We resolve the queue version immediately to avoid converting - %% between queue versions unnecessarily. - IndexMod = index_mod(Q), - IndexState = IndexMod:init(QueueName, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), + IndexState = rabbit_classic_queue_index_v2:init(QueueName, + MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), StoreState = rabbit_classic_queue_store_v2:init(QueueName), VHost = QueueName#resource.virtual_host, - init(queue_version(Q), - IsDurable, IndexMod, IndexState, StoreState, 0, 0, [], + init(IsDurable, IndexState, StoreState, 0, 0, [], case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, MsgOnDiskFun, VHost); @@ -466,10 +455,8 @@ init(Q, Terms, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqu end, TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, VHost), - %% We MUST resolve the queue version immediately in order to recover. - IndexMod = index_mod(Q), {DeltaCount, DeltaBytes, IndexState} = - IndexMod:recover( + rabbit_classic_queue_index_v2:recover( QueueName, RecoveryTerms, rabbit_vhost_msg_store:successfully_recovered_state( VHost, @@ -477,8 +464,8 @@ init(Q, Terms, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqu ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun, main), StoreState = rabbit_classic_queue_store_v2:init(QueueName), - init(queue_version(Q), - IsDurable, IndexMod, IndexState, StoreState, DeltaCount, DeltaBytes, RecoveryTerms, + init(IsDurable, IndexState, StoreState, + DeltaCount, DeltaBytes, RecoveryTerms, PersistentClient, TransientClient, VHost). process_recovery_terms(Terms=non_clean_shutdown) -> @@ -489,27 +476,12 @@ process_recovery_terms(Terms) -> PRef -> {PRef, Terms} end. -queue_version(Q) -> - Resolve = fun(_, ArgVal) -> ArgVal end, - case rabbit_queue_type_util:args_policy_lookup(<<"queue-version">>, Resolve, Q) of - undefined -> rabbit_misc:get_env(rabbit, classic_queue_default_version, 1); - Vsn when is_integer(Vsn) -> Vsn; - Vsn -> binary_to_integer(Vsn) - end. - -index_mod(Q) -> - case queue_version(Q) of - 1 -> rabbit_queue_index; - 2 -> rabbit_classic_queue_index_v2 - end. - terminate(_Reason, State) -> State1 = #vqstate { virtual_host = VHost, next_seq_id = NextSeqId, next_deliver_seq_id = NextDeliverSeqId, persistent_count = PCount, persistent_bytes = PBytes, - index_mod = IndexMod, index_state = IndexState, store_state = StoreState, msg_store_clients = {MSCStateP, MSCStateT} } = @@ -526,7 +498,7 @@ terminate(_Reason, State) -> {persistent_count, PCount}, {persistent_bytes, PBytes}], a(State1#vqstate { - index_state = IndexMod:terminate(VHost, Terms, IndexState), + index_state = rabbit_classic_queue_index_v2:terminate(VHost, Terms, IndexState), store_state = rabbit_classic_queue_store_v2:terminate(StoreState), msg_store_clients = undefined }). @@ -649,8 +621,7 @@ ack([SeqId], State) -> end; ack(AckTags, State) -> {{IndexOnDiskSeqIds, MsgIdsByStore, SeqIdsInStore, AllMsgIds}, - State1 = #vqstate { index_mod = IndexMod, - index_state = IndexState, + State1 = #vqstate { index_state = IndexState, store_state = StoreState0, ack_out_counter = AckOutCount }} = lists:foldl( @@ -664,7 +635,7 @@ ack(AckTags, State) -> {accumulate_ack(MsgStatus, Acc), State3} end end, {accumulate_ack_init(), State}, AckTags), - {DeletedSegments, IndexState1} = IndexMod:ack(IndexOnDiskSeqIds, IndexState), + {DeletedSegments, IndexState1} = rabbit_classic_queue_index_v2:ack(IndexOnDiskSeqIds, IndexState), StoreState1 = rabbit_classic_queue_store_v2:delete_segments(DeletedSegments, StoreState0), StoreState = lists:foldl(fun rabbit_classic_queue_store_v2:remove/2, StoreState1, SeqIdsInStore), State2 = remove_vhost_msgs_by_id(MsgIdsByStore, State1), @@ -762,36 +733,32 @@ ram_duration(State) -> State1 = update_rates(State), {infinity, State1}. -needs_timeout(#vqstate { index_mod = IndexMod, - index_state = IndexState, +needs_timeout(#vqstate { index_state = IndexState, unconfirmed_simple = UCS }) -> - case {IndexMod:needs_sync(IndexState), sets:is_empty(UCS)} of + case {rabbit_classic_queue_index_v2:needs_sync(IndexState), sets:is_empty(UCS)} of {false, false} -> timed; {confirms, _} -> timed; - {other, _} -> idle; {false, true} -> false end. -timeout(State = #vqstate { index_mod = IndexMod, - index_state = IndexState0, +timeout(State = #vqstate { index_state = IndexState0, store_state = StoreState0, unconfirmed_simple = UCS, confirmed = C }) -> - IndexState = IndexMod:sync(IndexState0), + IndexState = rabbit_classic_queue_index_v2:sync(IndexState0), StoreState = rabbit_classic_queue_store_v2:sync(StoreState0), State #vqstate { index_state = IndexState, store_state = StoreState, unconfirmed_simple = sets:new([{version,2}]), confirmed = sets:union(C, UCS) }. -handle_pre_hibernate(State = #vqstate { index_mod = IndexMod, - index_state = IndexState0, +handle_pre_hibernate(State = #vqstate { index_state = IndexState0, store_state = StoreState0, msg_store_clients = MSCState0, unconfirmed_simple = UCS, confirmed = C }) -> MSCState = msg_store_pre_hibernate(MSCState0), - IndexState = IndexMod:flush(IndexState0), + IndexState = rabbit_classic_queue_index_v2:flush(IndexState0), StoreState = rabbit_classic_queue_store_v2:sync(StoreState0), State #vqstate { index_state = IndexState, store_state = StoreState, @@ -843,7 +810,6 @@ info(disk_writes, #vqstate{disk_write_count = Count}) -> info(backing_queue_status, #vqstate { delta = Delta, q3 = Q3, mode = Mode, - version = Version, len = Len, target_ram_count = TargetRamCount, next_seq_id = NextSeqId, @@ -852,7 +818,6 @@ info(backing_queue_status, #vqstate { disk_pending_ack = DPA, unconfirmed = UC, unconfirmed_simple = UCS, - index_mod = IndexMod, index_state = IndexState, store_state = StoreState, rates = #rates { in = AvgIngressRate, @@ -860,7 +825,7 @@ info(backing_queue_status, #vqstate { ack_in = AvgAckIngressRate, ack_out = AvgAckEgressRate }}) -> [ {mode , Mode}, - {version , Version}, + {version , 2}, {q1 , 0}, {q2 , 0}, {delta , Delta}, @@ -876,7 +841,7 @@ info(backing_queue_status, #vqstate { {avg_egress_rate , AvgEgressRate}, {avg_ack_ingress_rate, AvgAckIngressRate}, {avg_ack_egress_rate , AvgAckEgressRate} ] - ++ IndexMod:info(IndexState) + ++ rabbit_classic_queue_index_v2:info(IndexState) ++ rabbit_classic_queue_store_v2:info(StoreState); info(_, _) -> ''. @@ -896,94 +861,12 @@ zip_msgs_and_acks(Msgs, AckTags, Accumulator, _State) -> [{Id, AckTag} | Acc] end, Accumulator, lists:zip(Msgs, AckTags)). -%% No change. -set_queue_version(Version, State = #vqstate { version = Version }) -> - State; -%% v2 -> v1. -set_queue_version(1, State0 = #vqstate { version = 2 }) -> - %% We call timeout/1 so that we sync to disk and get the confirms - %% handled before we do the conversion. This is necessary because - %% v2 now has a simpler confirms code path. - State = timeout(State0), - convert_from_v2_to_v1(State #vqstate { version = 1 }); -%% v1 -> v2. -set_queue_version(2, State0 = #vqstate { version = 1 }) -> - %% We call timeout/1 so that we sync to disk and get the confirms - %% handled before we do the conversion. This is necessary because - %% v2 now has a simpler confirms code path. - State = timeout(State0), - convert_from_v1_to_v2(State #vqstate { version = 2 }). - --define(CONVERT_COUNT, 1). --define(CONVERT_BYTES, 2). %% Unused. --define(CONVERT_COUNTER_SIZE, 2). - -%% We move messages from the v1 index to the v2 index. The message payload -%% is moved to the v2 store if it was embedded, and left in the per-vhost -%% store otherwise. -convert_from_v1_to_v2(State0 = #vqstate{ index_mod = rabbit_queue_index, - index_state = V1Index, - store_state = V2Store0 }) -> - {QueueName, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun} = rabbit_queue_index:init_args(V1Index), - #resource{virtual_host = VHost, name = QName} = QueueName, - rabbit_log:info("Converting running queue ~ts in vhost ~ts from v1 to v2", [QName, VHost]), - State = convert_from_v1_to_v2_in_memory(State0), - V2Index0 = rabbit_classic_queue_index_v2:init_for_conversion(QueueName, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), - %% We do not need to init the v2 per-queue store because we already did so in the queue init. - {LoSeqId, HiSeqId, _} = rabbit_queue_index:bounds(V1Index), - CountersRef = counters:new(?CONVERT_COUNTER_SIZE, []), - {V2Index, V2Store} = convert_from_v1_to_v2_loop(QueueName, V1Index, V2Index0, V2Store0, - {CountersRef, ?CONVERT_COUNT, ?CONVERT_BYTES}, - LoSeqId, HiSeqId, - %% Write all messages. - fun (_, FunState) -> {write, FunState} end), - %% We have already deleted segments files but not the journal. - rabbit_queue_index:delete_journal(V1Index), - rabbit_log:info("Queue ~ts in vhost ~ts converted ~b total messages from v1 to v2", - [QName, VHost, counters:get(CountersRef, ?CONVERT_COUNT)]), - State#vqstate{ index_mod = rabbit_classic_queue_index_v2, - index_state = V2Index, - store_state = V2Store }. - -convert_from_v1_to_v2_in_memory(State = #vqstate{ q1 = Q1b, - q2 = Q2b, - q3 = Q3b, - q4 = Q4b, - ram_pending_ack = RPAb, - disk_pending_ack = DPAb }) -> - Q1 = convert_from_v1_to_v2_queue(Q1b), - Q2 = convert_from_v1_to_v2_queue(Q2b), - Q3 = convert_from_v1_to_v2_queue(Q3b), - Q4 = convert_from_v1_to_v2_queue(Q4b), - %% We also must convert the #msg_status entries in the pending_ack fields. - RPA = convert_from_v1_to_v2_map(RPAb), - DPA = convert_from_v1_to_v2_map(DPAb), - State#vqstate{ q1 = Q1, - q2 = Q2, - q3 = Q3, - q4 = Q4, - ram_pending_ack = RPA, - disk_pending_ack = DPA }. - -%% We change where the message is expected to be persisted to. -%% We do not need to worry about the message location because -%% it will only be in memory or in the per-vhost store. -convert_from_v1_to_v2_queue(Q) -> - List0 = ?QUEUE:to_list(Q), - List = lists:map(fun (MsgStatus) -> convert_from_v1_to_v2_msg_status(MsgStatus) end, List0), - ?QUEUE:from_list(List). - -convert_from_v1_to_v2_map(T) -> - maps:map(fun (_, MsgStatus) -> convert_from_v1_to_v2_msg_status(MsgStatus) end, T). - -convert_from_v1_to_v2_msg_status(MsgStatus) -> - case MsgStatus of - #msg_status{ persist_to = queue_index } -> - MsgStatus#msg_status{ persist_to = queue_store }; - _ -> - MsgStatus - end. +%% Queue version now ignored; only v2 is available. +set_queue_version(_, State) -> + State. +%% This function is used by rabbit_classic_queue_index_v2 +%% to convert v1 queues to v2 after an upgrade to 4.0. convert_from_v1_to_v2_loop(_, _, V2Index, V2Store, _, HiSeqId, HiSeqId, _) -> {V2Index, V2Store}; convert_from_v1_to_v2_loop(QueueName, V1Index0, V2Index0, V2Store0, @@ -1036,159 +919,6 @@ convert_from_v1_to_v2_loop(QueueName, V1Index0, V2Index0, V2Store0, [Name, VHost, length(Messages)]), convert_from_v1_to_v2_loop(QueueName, V1Index, V2Index, V2Store, Counters, UpSeqId, HiSeqId, SkipFun). -%% We move messages from the v1 index to the v2 index. The message payload -%% is moved to the v2 store if it was embedded, and left in the per-vhost -%% store otherwise. -convert_from_v2_to_v1(State0 = #vqstate{ index_mod = rabbit_classic_queue_index_v2, - index_state = V2Index }) -> - {QueueName, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun} = rabbit_classic_queue_index_v2:init_args(V2Index), - #resource{virtual_host = VHost, name = QName} = QueueName, - rabbit_log:info("Converting running queue ~ts in vhost ~ts from v2 to v1", [QName, VHost]), - State = convert_from_v2_to_v1_in_memory(State0), - %% We may have read from the per-queue store state and opened FDs. - #vqstate{ store_state = V2Store0 } = State, - V1Index0 = rabbit_queue_index:init_for_conversion(QueueName, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), - {LoSeqId, HiSeqId, _} = rabbit_classic_queue_index_v2:bounds(V2Index), - CountersRef = counters:new(?CONVERT_COUNTER_SIZE, []), - {V1Index, V2Store} = convert_from_v2_to_v1_loop(QueueName, V1Index0, V2Index, V2Store0, - {CountersRef, ?CONVERT_COUNT, ?CONVERT_BYTES}, - LoSeqId, HiSeqId, - %% Write all messages. - fun (_, FunState) -> {write, FunState} end), - rabbit_log:info("Queue ~ts in vhost ~ts converted ~b total messages from v2 to v1", - [QName, VHost, counters:get(CountersRef, ?CONVERT_COUNT)]), - %% We have already closed the v2 index/store FDs when deleting the files. - State#vqstate{ index_mod = rabbit_queue_index, - index_state = V1Index, - store_state = rabbit_classic_queue_store_v2:terminate(V2Store) }. - -convert_from_v2_to_v1_in_memory(State0 = #vqstate{ q1 = Q1b, - q2 = Q2b, - q3 = Q3b, - q4 = Q4b, - ram_pending_ack = RPAb, - disk_pending_ack = DPAb }) -> - {Q1, State1} = convert_from_v2_to_v1_queue(Q1b, State0), - {Q2, State2} = convert_from_v2_to_v1_queue(Q2b, State1), - {Q3, State3} = convert_from_v2_to_v1_queue(Q3b, State2), - {Q4, State4} = convert_from_v2_to_v1_queue(Q4b, State3), - %% We also must convert the #msg_status entries in the pending_ack fields. - %% We must separate entries in the queue index from other entries as - %% that is what is expected from the v1 index. - {RPA, State5} = convert_from_v2_to_v1_map(RPAb, State4), - {DPA, State6} = convert_from_v2_to_v1_map(DPAb, State5), - State6#vqstate{ q1 = Q1, - q2 = Q2, - q3 = Q3, - q4 = Q4, - ram_pending_ack = RPA, - disk_pending_ack = DPA }. - -%% We fetch the message from the per-queue store if necessary -%% and mark all messages as delivered to make the v1 index happy. -convert_from_v2_to_v1_queue(Q, State0) -> - List0 = ?QUEUE:to_list(Q), - {List, State} = lists:mapfoldl(fun (MsgStatus, State1) -> - convert_from_v2_to_v1_msg_status(MsgStatus, State1, true) - end, State0, List0), - {?QUEUE:from_list(List), State}. - -convert_from_v2_to_v1_map(T, State) -> - convert_from_v2_to_v1_map_loop(maps:iterator(T), #{}, State). - -convert_from_v2_to_v1_map_loop(Iterator0, Acc, State0) -> - case maps:next(Iterator0) of - none -> - {Acc, State0}; - {Key, Value0, Iterator} -> - {Value, State} = convert_from_v2_to_v1_msg_status(Value0, State0, false), - convert_from_v2_to_v1_map_loop(Iterator, maps:put(Key, Value, Acc), State) - end. - -convert_from_v2_to_v1_msg_status(MsgStatus0, State1 = #vqstate{ store_state = StoreState0, - ram_msg_count = RamMsgCount, - ram_bytes = RamBytes }, Ready) -> - case MsgStatus0 of - #msg_status{ seq_id = SeqId, - msg = undefined, - msg_location = MsgLocation = {rabbit_classic_queue_store_v2, _, _} } -> - {Msg, StoreState} = rabbit_classic_queue_store_v2:read(SeqId, MsgLocation, StoreState0), - MsgStatus = MsgStatus0#msg_status{ msg = Msg, - msg_location = memory, - is_delivered = true, - persist_to = queue_index }, - %% We have read the message into memory. We must also update the stats. - {MsgStatus, State1#vqstate{ store_state = StoreState, - ram_msg_count = RamMsgCount + one_if(Ready), - ram_bytes = RamBytes + msg_size(MsgStatus) }}; - #msg_status{ persist_to = queue_store } -> - {MsgStatus0#msg_status{ is_delivered = true, - persist_to = queue_index }, State1}; - _ -> - {MsgStatus0#msg_status{ is_delivered = true }, State1} - end. - -convert_from_v2_to_v1_loop(_, V1Index, _, V2Store, _, HiSeqId, HiSeqId, _) -> - {V1Index, V2Store}; -convert_from_v2_to_v1_loop(QueueName, V1Index0, V2Index0, V2Store0, - Counters = {CountersRef, CountIx, BytesIx}, - LoSeqId, HiSeqId, SkipFun) -> - UpSeqId = lists:min([rabbit_classic_queue_index_v2:next_segment_boundary(LoSeqId), - HiSeqId]), - {Messages, V2Index1} = rabbit_classic_queue_index_v2:read(LoSeqId, UpSeqId, V2Index0), - {V1Index3, V2Store3} = lists:foldl(fun - %% Read per-queue store messages before writing to the index. - ({_MsgId, SeqId, Location = {rabbit_classic_queue_store_v2, _, _}, Props, IsPersistent}, - {V1Index1, V2Store1}) -> - {Msg, V2Store2} = rabbit_classic_queue_store_v2:read(SeqId, Location, V2Store1), - %% When we are resuming the conversion the messages may have already been written to disk. - %% We do NOT want them written again: this is an error that leads to a corrupted index - %% (because it uses a journal it cannot know whether there's been a double write). - %% We therefore check first if the entry exists and if we need to write it. - V1Index2 = case SkipFun(SeqId, V1Index1) of - {skip, V1Index1a} -> - V1Index1a; - {write, V1Index1a} -> - counters:add(CountersRef, CountIx, 1), - counters:add(CountersRef, BytesIx, Props#message_properties.size), - V1Index1b = rabbit_queue_index:publish(Msg, SeqId, rabbit_queue_index, Props, IsPersistent, infinity, V1Index1a), - rabbit_queue_index:deliver([SeqId], V1Index1b) - end, - {V1Index2, V2Store2}; - %% Keep messages in the per-vhost store where they are. - ({MsgId, SeqId, rabbit_msg_store, Props, IsPersistent}, - {V1Index1, V2Store1}) -> - %% See comment in previous clause. - V1Index2 = case SkipFun(SeqId, V1Index1) of - {skip, V1Index1a} -> - V1Index1a; - {write, V1Index1a} -> - counters:add(CountersRef, CountIx, 1), - counters:add(CountersRef, BytesIx, Props#message_properties.size), - V1Index1b = rabbit_queue_index:publish(MsgId, SeqId, rabbit_msg_store, Props, IsPersistent, infinity, V1Index1a), - rabbit_queue_index:deliver([SeqId], V1Index1b) - end, - {V1Index2, V2Store1}; - %% Ignore messages that are in memory and had an entry written in the index. - %% @todo Remove this clause some time after CMQs get removed as this will become dead code. - ({undefined, _, memory, _, _}, {V1Index1, V2Store1}) -> - {V1Index1, V2Store1} - end, {V1Index0, V2Store0}, Messages), - %% Flush to disk to avoid keeping too much in memory between segments. - V1Index = rabbit_queue_index:flush(V1Index3), - %% We do a garbage collect because the old index may have created a lot of garbage. - garbage_collect(), - %% We have written everything to disk. We can delete the old segment file - %% to free up much needed space, to avoid doubling disk usage during the upgrade. - {DeletedSegments, V2Index} = rabbit_classic_queue_index_v2:delete_segment_file_for_seq_id(LoSeqId, V2Index1), - V2Store = rabbit_classic_queue_store_v2:delete_segments(DeletedSegments, V2Store3), - %% Log some progress to keep the user aware of what's going on, as moving - %% embedded messages can take quite some time. - #resource{virtual_host = VHost, name = Name} = QueueName, - rabbit_log:info("Queue ~ts in vhost ~ts converted ~b messages from v2 to v1", - [Name, VHost, length(Messages)]), - convert_from_v2_to_v1_loop(QueueName, V1Index, V2Index, V2Store, Counters, UpSeqId, HiSeqId, SkipFun). - %% Get the Timestamp property of the first msg, if present. This is %% the one with the oldest timestamp among the heads of the pending %% acks and unread queues. We can't check disk_pending_acks as these @@ -1296,7 +1026,7 @@ one_if(false) -> 0. cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. -msg_status(Version, IsPersistent, IsDelivered, SeqId, +msg_status(IsPersistent, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize) -> MsgId = mc:get_annotation(id, Msg), #msg_status{seq_id = SeqId, @@ -1308,7 +1038,7 @@ msg_status(Version, IsPersistent, IsDelivered, SeqId, is_delivered = IsDelivered, msg_location = memory, index_on_disk = false, - persist_to = determine_persist_to(Version, Msg, MsgProps, IndexMaxSize), + persist_to = determine_persist_to(Msg, MsgProps, IndexMaxSize), msg_props = MsgProps}. beta_msg_status({MsgId, SeqId, MsgLocation, MsgProps, IsPersistent}) @@ -1464,9 +1194,9 @@ expand_delta(_SeqId, #delta { count = Count, %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -init(QueueVsn, IsDurable, IndexMod, IndexState, StoreState, DeltaCount, DeltaBytes, Terms, +init(IsDurable, IndexState, StoreState, DeltaCount, DeltaBytes, Terms, PersistentClient, TransientClient, VHost) -> - {LowSeqId, HiSeqId, IndexState1} = IndexMod:bounds(IndexState), + {LowSeqId, HiSeqId, IndexState1} = rabbit_classic_queue_index_v2:bounds(IndexState), {NextSeqId, NextDeliverSeqId, DeltaCount1, DeltaBytes1} = case Terms of @@ -1504,7 +1234,6 @@ init(QueueVsn, IsDurable, IndexMod, IndexState, StoreState, DeltaCount, DeltaByt next_deliver_seq_id = NextDeliverSeqId, ram_pending_ack = #{}, disk_pending_ack = #{}, - index_mod = IndexMod, index_state = IndexState1, store_state = StoreState, msg_store_clients = {PersistentClient, TransientClient}, @@ -1540,7 +1269,6 @@ init(QueueVsn, IsDurable, IndexMod, IndexState, StoreState, DeltaCount, DeltaByt io_batch_size = IoBatchSize, mode = default, - version = QueueVsn, virtual_host = VHost}, a(maybe_deltas_to_betas(State)). @@ -1733,13 +1461,12 @@ remove_from_disk(#msg_status { is_persistent = IsPersistent, msg_location = MsgLocation, index_on_disk = IndexOnDisk }, - State = #vqstate {index_mod = IndexMod, - index_state = IndexState1, + State = #vqstate {index_state = IndexState1, store_state = StoreState0, msg_store_clients = MSCState}) -> {DeletedSegments, IndexState2} = case IndexOnDisk of - true -> IndexMod:ack([SeqId], IndexState1); + true -> rabbit_classic_queue_index_v2:ack([SeqId], IndexState1); false -> {[], IndexState1} end, {StoreState1, State1} = case MsgLocation of @@ -1890,11 +1617,10 @@ purge_and_index_reset(State) -> purge1(AfterFun, State) -> a(purge_betas_and_deltas(AfterFun, State)). -reset_qi_state(State = #vqstate{ index_mod = IndexMod, - index_state = IndexState0, +reset_qi_state(State = #vqstate{ index_state = IndexState0, store_state = StoreState0 }) -> StoreState = rabbit_classic_queue_store_v2:terminate(StoreState0), - IndexState = IndexMod:reset_state(IndexState0), + IndexState = rabbit_classic_queue_index_v2:reset_state(IndexState0), State#vqstate{ index_state = IndexState, store_state = StoreState }. @@ -1946,12 +1672,9 @@ remove_queue_entries1( process_delivers_and_acks_fun(deliver_and_ack) -> %% @todo Make a clause for empty Acks list? - fun (NextDeliverSeqId, Acks, State = #vqstate { index_mod = IndexMod, - index_state = IndexState, + fun (NextDeliverSeqId, Acks, State = #vqstate { index_state = IndexState, store_state = StoreState0}) -> - %% We do not send delivers to the v1 index because - %% we've already done so when publishing. - {DeletedSegments, IndexState1} = IndexMod:ack(Acks, IndexState), + {DeletedSegments, IndexState1} = rabbit_classic_queue_index_v2:ack(Acks, IndexState), StoreState = rabbit_classic_queue_store_v2:delete_segments(DeletedSegments, StoreState0), @@ -1976,7 +1699,6 @@ publish1(Msg, IsDelivered, _ChPid, _Flow, PersistFun, State = #vqstate { q3 = Q3, delta = Delta = #delta { count = DeltaCount }, len = Len, - version = Version, qi_embed_msgs_below = IndexMaxSize, next_seq_id = SeqId, next_deliver_seq_id = NextDeliverSeqId, @@ -1988,7 +1710,7 @@ publish1(Msg, MsgId = mc:get_annotation(id, Msg), IsPersistent = mc:is_persistent(Msg), IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(Version, IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), + MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), %% We allow from 1 to 2048 messages in memory depending on the consume rate. The lower %% limit is at 1 because the queue process will need to access this message to know %% expiration information. @@ -2006,7 +1728,7 @@ publish1(Msg, stats_published_disk(MsgStatus1, State2) end, {UC1, UCS1} = maybe_needs_confirming(NeedsConfirming, persist_to(MsgStatus), - Version, MsgId, UC, UCS), + MsgId, UC, UCS), State3#vqstate{ next_seq_id = SeqId + 1, next_deliver_seq_id = maybe_next_deliver_seq_id(SeqId, NextDeliverSeqId, IsDelivered), in_counter = InCount + 1, @@ -2027,8 +1749,7 @@ publish_delivered1(Msg, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, _ChPid, _Flow, PersistFun, - State = #vqstate { version = Version, - qi_embed_msgs_below = IndexMaxSize, + State = #vqstate { qi_embed_msgs_below = IndexMaxSize, next_seq_id = SeqId, next_deliver_seq_id = NextDeliverSeqId, in_counter = InCount, @@ -2039,11 +1760,11 @@ publish_delivered1(Msg, MsgId = mc:get_annotation(id, Msg), IsPersistent = mc:is_persistent(Msg), IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(Version, IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), + MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), {UC1, UCS1} = maybe_needs_confirming(NeedsConfirming, persist_to(MsgStatus), - Version, MsgId, UC, UCS), + MsgId, UC, UCS), {SeqId, stats_published_pending_acks(MsgStatus1, State2#vqstate{ next_seq_id = SeqId + 1, @@ -2053,14 +1774,14 @@ publish_delivered1(Msg, unconfirmed = UC1, unconfirmed_simple = UCS1 })}. -maybe_needs_confirming(false, _, _, _, UC, UCS) -> +maybe_needs_confirming(false, _, _, UC, UCS) -> {UC, UCS}; %% When storing to the v2 queue store we take the simple confirms %% path because we don't need to track index and store separately. -maybe_needs_confirming(true, queue_store, 2, MsgId, UC, UCS) -> +maybe_needs_confirming(true, queue_store, MsgId, UC, UCS) -> {UC, sets:add_element(MsgId, UCS)}; %% Otherwise we keep tracking as it used to be. -maybe_needs_confirming(true, _, _, MsgId, UC, UCS) -> +maybe_needs_confirming(true, _, MsgId, UC, UCS) -> {sets:add_element(MsgId, UC), UCS}. batch_publish_delivered1({Msg, MsgProps}, {ChPid, Flow, SeqIds, State}) -> @@ -2115,7 +1836,6 @@ maybe_batch_write_index_to_disk(Force, State = #vqstate { target_ram_count = TargetRamCount, disk_write_count = DiskWriteCount, - index_mod = IndexMod, index_state = IndexState}) when Force orelse IsPersistent -> {MsgOrId, DiskWriteCount1} = @@ -2124,19 +1844,9 @@ maybe_batch_write_index_to_disk(Force, queue_store -> {MsgId, DiskWriteCount}; queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1} end, - IndexState1 = case IndexMod of - %% The old index needs IsDelivered to apply some of its optimisations. - %% But because the deliver tracking is now in the queue we always pass 'true'. - %% It also does not need the location so it is not given here. - rabbit_queue_index -> - IndexMod:pre_publish( - MsgOrId, SeqId, MsgProps, IsPersistent, true, - TargetRamCount, IndexState); - _ -> - IndexMod:pre_publish( - MsgOrId, SeqId, MsgLocation, MsgProps, IsPersistent, - TargetRamCount, IndexState) - end, + IndexState1 = rabbit_classic_queue_index_v2:pre_publish( + MsgOrId, SeqId, MsgLocation, MsgProps, + IsPersistent, TargetRamCount, IndexState), {MsgStatus#msg_status{index_on_disk = true}, State#vqstate{index_state = IndexState1, disk_write_count = DiskWriteCount1}}; @@ -2155,7 +1865,6 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status { msg_props = MsgProps}, State = #vqstate{target_ram_count = TargetRamCount, disk_write_count = DiskWriteCount, - index_mod = IndexMod, index_state = IndexState}) when Force orelse IsPersistent -> {MsgOrId, DiskWriteCount1} = @@ -2164,21 +1873,12 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status { queue_store -> {MsgId, DiskWriteCount}; queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1} end, - IndexState2 = IndexMod:publish( + IndexState2 = rabbit_classic_queue_index_v2:publish( MsgOrId, SeqId, MsgLocation, MsgProps, IsPersistent, persist_to(MsgStatus) =:= msg_store, TargetRamCount, IndexState), - %% We always deliver messages when the old index is used. - %% We are actually tracking message deliveries per-queue - %% but the old index expects delivers to be handled - %% per-message. Always delivering on publish prevents - %% issues related to delivers. - IndexState3 = case IndexMod of - rabbit_queue_index -> IndexMod:deliver([SeqId], IndexState2); - _ -> IndexState2 - end, {MsgStatus#msg_status{index_on_disk = true}, - State#vqstate{index_state = IndexState3, + State#vqstate{index_state = IndexState2, disk_write_count = DiskWriteCount1}}; maybe_write_index_to_disk(_Force, MsgStatus, State) -> @@ -2188,20 +1888,19 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1). -maybe_prepare_write_to_disk(ForceMsg, ForceIndex0, MsgStatus, State = #vqstate{ version = Version }) -> +maybe_prepare_write_to_disk(ForceMsg, ForceIndex0, MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), %% We want messages written to the v2 per-queue store to also %% be written to the index for proper accounting. The situation %% where a message can be in the store but not in the index can %% only occur when going through this function (not via maybe_write_to_disk). - ForceIndex = case {Version, persist_to(MsgStatus)} of - {2, queue_store} -> true; + ForceIndex = case persist_to(MsgStatus) of + queue_store -> true; _ -> ForceIndex0 end, maybe_batch_write_index_to_disk(ForceIndex, MsgStatus1, State1). -determine_persist_to(Version, - Msg, +determine_persist_to(Msg, #message_properties{size = BodySize}, IndexMaxSize) -> %% The >= is so that you can set the env to 0 and never persist @@ -2224,9 +1923,8 @@ determine_persist_to(Version, false -> Est = MetaSize + BodySize, case Est >= IndexMaxSize of - true -> msg_store; - false when Version =:= 1 -> queue_index; - false when Version =:= 2 -> queue_store + true -> msg_store; + false -> queue_store end end. @@ -2283,14 +1981,13 @@ remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, end. purge_pending_ack(KeepPersistent, - State = #vqstate { index_mod = IndexMod, - index_state = IndexState, + State = #vqstate { index_state = IndexState, store_state = StoreState0 }) -> {IndexOnDiskSeqIds, MsgIdsByStore, SeqIdsInStore, State1} = purge_pending_ack1(State), case KeepPersistent of true -> remove_transient_msgs_by_id(MsgIdsByStore, State1); false -> {DeletedSegments, IndexState1} = - IndexMod:ack(IndexOnDiskSeqIds, IndexState), + rabbit_classic_queue_index_v2:ack(IndexOnDiskSeqIds, IndexState), StoreState1 = lists:foldl(fun rabbit_classic_queue_store_v2:remove/2, StoreState0, SeqIdsInStore), StoreState = rabbit_classic_queue_store_v2:delete_segments(DeletedSegments, StoreState1), State2 = remove_vhost_msgs_by_id(MsgIdsByStore, State1), @@ -2299,12 +1996,11 @@ purge_pending_ack(KeepPersistent, end. purge_pending_ack_delete_and_terminate( - State = #vqstate { index_mod = IndexMod, - index_state = IndexState, + State = #vqstate { index_state = IndexState, store_state = StoreState }) -> {_, MsgIdsByStore, _SeqIdsInStore, State1} = purge_pending_ack1(State), StoreState1 = rabbit_classic_queue_store_v2:terminate(StoreState), - IndexState1 = IndexMod:delete_and_terminate(IndexState), + IndexState1 = rabbit_classic_queue_index_v2:delete_and_terminate(IndexState), State2 = remove_vhost_msgs_by_id(MsgIdsByStore, State1), State2 #vqstate { index_state = IndexState1, store_state = StoreState1 }. @@ -2510,8 +2206,8 @@ next({delta, #delta{start_seq_id = SeqId, end_seq_id = SeqId}, State}, IndexState) -> next(istate(delta, State), IndexState); next({delta, #delta{start_seq_id = SeqId, - end_seq_id = SeqIdEnd} = Delta, State = #vqstate{index_mod = IndexMod}}, IndexState) -> - SeqIdB = IndexMod:next_segment_boundary(SeqId), + end_seq_id = SeqIdEnd} = Delta, State}, IndexState) -> + SeqIdB = rabbit_classic_queue_index_v2:next_segment_boundary(SeqId), %% It may make sense to limit this based on rate. But this %% is not called outside of CMQs so I will leave it alone %% for the time being. @@ -2520,15 +2216,9 @@ next({delta, #delta{start_seq_id = SeqId, %% otherwise the queue will attempt to read up to segment_entry_count() %% messages from the index each time. The value %% chosen here is arbitrary. - %% @todo We have a problem where reduce_memory_usage puts messages back to 0, - %% and then this or the maybe_deltas_to_betas function is called and it - %% fetches 2048 messages again. This is not good. Maybe the reduce_memory_usage - %% function should reduce the number of messages we fetch at once at the - %% same time (start at 2048, divide by 2 every time we reduce, or something). - %% Maybe expiration does that? SeqId + 2048, SeqIdEnd]), - {List, IndexState1} = IndexMod:read(SeqId, SeqId1, IndexState), + {List, IndexState1} = rabbit_classic_queue_index_v2:read(SeqId, SeqId1, IndexState), next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1); next({delta, Delta, [], State}, IndexState) -> next({delta, Delta, State}, IndexState); @@ -2610,7 +2300,6 @@ maybe_deltas_to_betas(DelsAndAcksFun, State = #vqstate { delta = Delta, q3 = Q3, - index_mod = IndexMod, index_state = IndexState, store_state = StoreState, msg_store_clients = {MCStateP, MCStateT}, @@ -2618,30 +2307,20 @@ maybe_deltas_to_betas(DelsAndAcksFun, ram_bytes = RamBytes, disk_read_count = DiskReadCount, delta_transient_bytes = DeltaTransientBytes, - transient_threshold = TransientThreshold, - version = Version }, + transient_threshold = TransientThreshold }, MemoryLimit, WhatToRead) -> #delta { start_seq_id = DeltaSeqId, count = DeltaCount, transient = Transient, end_seq_id = DeltaSeqIdEnd } = Delta, - %% For v1 we always want to read messages up to the next segment boundary. - %% This is because v1 is not optimised for multiple reads from the same - %% segment: every time we read messages from a segment it has to read - %% and parse the entire segment from disk, filtering the messages we - %% requested afterwards. - %% %% For v2 we want to limit the number of messages read at once to lower %% the memory footprint. We use the consume rate to determine how many %% messages we read. - DeltaSeqLimit = case Version of - 1 -> DeltaSeqIdEnd; - 2 -> DeltaSeqId + MemoryLimit - end, + DeltaSeqLimit = DeltaSeqId + MemoryLimit, DeltaSeqId1 = - lists:min([IndexMod:next_segment_boundary(DeltaSeqId), + lists:min([rabbit_classic_queue_index_v2:next_segment_boundary(DeltaSeqId), DeltaSeqLimit, DeltaSeqIdEnd]), - {List0, IndexState1} = IndexMod:read(DeltaSeqId, DeltaSeqId1, IndexState), + {List0, IndexState1} = rabbit_classic_queue_index_v2:read(DeltaSeqId, DeltaSeqId1, IndexState), {List, StoreState3, MCStateP3, MCStateT3} = case WhatToRead of messages -> %% We try to read messages from disk all at once instead of @@ -2769,10 +2448,9 @@ merge_sh_read_msgs(MTail, _Reads) -> MTail. %% Flushes queue index batch caches and updates queue index state. -ui(#vqstate{index_mod = IndexMod, - index_state = IndexState, +ui(#vqstate{index_state = IndexState, target_ram_count = TargetRamCount} = State) -> - IndexState1 = IndexMod:flush_pre_publish_cache( + IndexState1 = rabbit_classic_queue_index_v2:flush_pre_publish_cache( TargetRamCount, IndexState), State#vqstate{index_state = IndexState1}. diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index d758adf45fc5..ae37a4a366dd 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -67,8 +67,7 @@ groups() -> {backing_queue_tests, [], [ msg_store, msg_store_file_scan, - {backing_queue_v2, [], Common ++ V2Only}, - {backing_queue_v1, [], Common} + {backing_queue_v2, [], Common ++ V2Only} ]} ]. @@ -124,14 +123,6 @@ init_per_group1(backing_queue_tests, Config) -> "Backing queue module not supported by this test group: ~tp~n", [Module])} end; -init_per_group1(backing_queue_v1, Config) -> - ok = rabbit_ct_broker_helpers:rpc(Config, 0, - application, set_env, [rabbit, classic_queue_default_version, 1]), - Config; -init_per_group1(backing_queue_v2, Config) -> - ok = rabbit_ct_broker_helpers:rpc(Config, 0, - application, set_env, [rabbit, classic_queue_default_version, 2]), - Config; init_per_group1(backing_queue_embed_limit_0, Config) -> ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbit, queue_index_embed_msgs_below, 0]), @@ -176,12 +167,6 @@ end_per_group1(backing_queue_tests, Config) -> rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, teardown_backing_queue_test_group, [Config]); end_per_group1(Group, Config) -when Group =:= backing_queue_v1 -orelse Group =:= backing_queue_v2 -> - ok = rabbit_ct_broker_helpers:rpc(Config, 0, - application, unset_env, [rabbit, classic_queue_default_version]), - Config; -end_per_group1(Group, Config) when Group =:= backing_queue_embed_limit_0 orelse Group =:= backing_queue_embed_limit_1024 -> ok = rabbit_ct_broker_helpers:rpc(Config, 0, @@ -745,10 +730,7 @@ bq_queue_index(Config) -> ?MODULE, bq_queue_index1, [Config]). index_mod() -> - case application:get_env(rabbit, classic_queue_default_version) of - {ok, 1} -> rabbit_queue_index; - {ok, 2} -> rabbit_classic_queue_index_v2 - end. + rabbit_classic_queue_index_v2. bq_queue_index1(_Config) -> init_queue_index(), @@ -761,10 +743,7 @@ bq_queue_index1(_Config) -> SeqIdsC = lists:seq(0, trunc(SegmentSize/2)), SeqIdsD = lists:seq(0, SegmentSize*4), - VerifyReadWithPublishedFun = case IndexMod of - rabbit_queue_index -> fun verify_read_with_published_v1/3; - rabbit_classic_queue_index_v2 -> fun verify_read_with_published_v2/3 - end, + VerifyReadWithPublishedFun = fun verify_read_with_published_v2/3, with_empty_test_queue( fun (Qi0, QName) -> @@ -854,8 +833,7 @@ bq_queue_index1(_Config) -> end), %% d) get messages in all states to a segment, then flush, then do - %% the same again, don't flush and read. CQ v1: this will hit all - %% possibilities in combining the segment with the journal. + %% the same again, don't flush and read. with_empty_test_queue( fun (Qi0, _QName) -> {Qi1, [Seven,Five,Four|_]} = queue_index_publish([0,1,2,4,5,7], @@ -882,8 +860,7 @@ bq_queue_index1(_Config) -> Qi10 end), - %% e) as for (d), but use terminate instead of read, which (CQ v1) will - %% exercise journal_minus_segment, not segment_plus_journal. + %% e) as for (d), but use terminate instead of read. with_empty_test_queue( fun (Qi0, QName) -> {Qi1, _SeqIdsMsgIdsE} = queue_index_publish([0,1,2,4,5,7], @@ -909,15 +886,6 @@ bq_queue_index1(_Config) -> passed. -verify_read_with_published_v1(_Persistent, [], _) -> - ok; -verify_read_with_published_v1(Persistent, - [{MsgId, SeqId, _Location, _Props, Persistent}|Read], - [{SeqId, MsgId}|Published]) -> - verify_read_with_published_v1(Persistent, Read, Published); -verify_read_with_published_v1(_Persistent, _Read, _Published) -> - ko. - %% The v2 index does not store the MsgId unless required. %% We therefore do not check it. verify_read_with_published_v2(_Persistent, [], _) -> diff --git a/deps/rabbit/test/classic_queue_prop_SUITE.erl b/deps/rabbit/test/classic_queue_prop_SUITE.erl index 848f9036bbad..baf103fbc89c 100644 --- a/deps/rabbit/test/classic_queue_prop_SUITE.erl +++ b/deps/rabbit/test/classic_queue_prop_SUITE.erl @@ -23,7 +23,6 @@ -record(cq, { amq = undefined :: amqqueue:amqqueue(), name :: atom(), - version :: 1 | 2, %% We have one queue per way of publishing messages (such as channels). %% We can only confirm the publish order on a per-channel level because @@ -73,19 +72,12 @@ %% Common Test. all() -> - [{group, classic_queue_tests}, {group, classic_queue_regressions}]. + [{group, classic_queue_tests}]. groups() -> [{classic_queue_tests, [], [ % manual%, - classic_queue_v1, classic_queue_v2 - ]}, - {classic_queue_regressions, [], [ - reg_v1_full_recover_only_journal, - reg_v1_no_del_jif, - reg_v1_no_del_idx, - reg_v1_no_del_idx_unclean ]} ]. @@ -136,10 +128,10 @@ instrs_to_manual([Instrs]) -> io:format("~ndo_manual(Config) ->~n~n"), lists:foreach(fun ({init, CQ}) -> - #cq{name=Name, version=Version} = CQ, - io:format(" St0 = #cq{name=~0p, version=~0p,~n" + #cq{name=Name} = CQ, + io:format(" St0 = #cq{name=~0p,~n" " config=minimal_config(Config)},~n~n", - [Name, Version]); + [Name]); ({set, {var,Var}, {call, ?MODULE, cmd_setup_queue, _}}) -> Res = "Res" ++ integer_to_list(Var), PrevSt = "St" ++ integer_to_list(Var - 1), @@ -197,15 +189,6 @@ manual(Config) -> do_manual(Config) -> Config =:= Config. -classic_queue_v1(Config) -> - true = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, do_classic_queue_v1, [Config]). - -do_classic_queue_v1(Config) -> - true = proper:quickcheck(prop_classic_queue_v1(Config), - [{on_output, on_output_fun()}, - {numtests, ?NUM_TESTS}]). - classic_queue_v2(Config) -> true = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, do_classic_queue_v2, [Config]). @@ -225,16 +208,11 @@ on_output_fun() -> %% Properties. -prop_classic_queue_v1(Config) -> - {ok, LimiterPid} = rabbit_limiter:start_link(no_id), - InitialState = #cq{name=?FUNCTION_NAME, version=1, - config=minimal_config(Config), limiter=LimiterPid}, - prop_common(InitialState). - prop_classic_queue_v2(Config) -> {ok, LimiterPid} = rabbit_limiter:start_link(no_id), - InitialState = #cq{name=?FUNCTION_NAME, version=2, - config=minimal_config(Config), limiter=LimiterPid}, + InitialState = #cq{name=?FUNCTION_NAME, + config=minimal_config(Config), + limiter=LimiterPid}, prop_common(InitialState). prop_common(InitialState) -> @@ -343,8 +321,8 @@ next_state(St=#cq{q=Q0, confirmed=Confirmed, uncertain=Uncertain0}, AMQ, {call, St#cq{amq=AMQ, q=Q, restarted=true, crashed=true, uncertain=Uncertain}; next_state(St, _, {call, _, cmd_set_v2_check_crc32, _}) -> St; -next_state(St, _, {call, _, cmd_set_version, [Version]}) -> - St#cq{version=Version}; +next_state(St, _, {call, _, cmd_set_version, _}) -> + St; next_state(St=#cq{q=Q}, Msg, {call, _, cmd_publish_msg, _}) -> IntQ = maps:get(internal, Q, queue:new()), St#cq{q=Q#{internal => queue:in(Msg, IntQ)}}; @@ -530,8 +508,10 @@ postcondition(_, {call, _, Cmd, _}, Q) when element(1, Q) =:= amqqueue; postcondition(_, {call, _, cmd_set_v2_check_crc32, _}, Res) -> Res =:= ok; -postcondition(#cq{amq=AMQ}, {call, _, cmd_set_version, [Version]}, _) -> - do_check_queue_version(AMQ, Version) =:= ok; +postcondition(#cq{amq=AMQ}, {call, _, cmd_set_version, _}, _) -> + %% We cannot use CQv1 anymore so we always + %% expect the queue to use v2. + do_check_queue_version(AMQ, 2) =:= ok; postcondition(_, {call, _, cmd_publish_msg, _}, Msg) -> is_record(Msg, amqp_msg); postcondition(_, {call, _, cmd_purge, _}, Res) -> @@ -698,21 +678,16 @@ crashed_and_previously_received(#cq{crashed=Crashed, received=Received}, Msg) -> %% Helpers. -cmd_setup_queue(St=#cq{name=Name, version=Version}) -> +cmd_setup_queue(St=#cq{name=Name}) -> ?DEBUG("~0p", [St]), IsDurable = true, %% We want to be able to restart the queue process. IsAutoDelete = false, - %% We cannot use args to set the version as the arguments override - %% the policies and we also want to test policy changes. - cmd_set_version(Version), - Args = [ -% {<<"x-queue-version">>, long, Version} - ], + Args = [], QName = rabbit_misc:r(<<"/">>, queue, iolist_to_binary([atom_to_binary(Name, utf8), $_, integer_to_binary(erlang:unique_integer([positive]))])), {new, AMQ} = rabbit_amqqueue:declare(QName, IsDurable, IsAutoDelete, Args, none, <<"acting-user">>), - %% We check that the queue was creating with the right version. - ok = do_check_queue_version(AMQ, Version), + %% We check that the queue was created with the right version. + ok = do_check_queue_version(AMQ, 2), AMQ. cmd_teardown_queue(St=#cq{amq=undefined}) -> @@ -788,7 +763,7 @@ do_check_queue_version(AMQ, Version, N) -> timer:sleep(1), [{backing_queue_status, Status}] = rabbit_amqqueue:info(AMQ, [backing_queue_status]), case proplists:get_value(version, Status) of - Version -> ok; + 2 -> ok; _ -> do_check_queue_version(AMQ, Version, N - 1) end. @@ -1098,243 +1073,6 @@ queue_fold(Fun, Acc0, {R, F}) when is_function(Fun, 2), is_list(R), is_list(F) - queue_fold(Fun, Acc0, Q) -> erlang:error(badarg, [Fun, Acc0, Q]). -%% Regression tests. -%% -%% These tests are hard to reproduce by running the test suite normally -%% because they require a very specific sequence of events. - -reg_v1_full_recover_only_journal(Config) -> - true = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, do_reg_v1_full_recover_only_journal, [Config]). - -do_reg_v1_full_recover_only_journal(Config) -> - - St0 = #cq{name=prop_classic_queue_v1, version=1, - config=minimal_config(Config)}, - - Res1 = cmd_setup_queue(St0), - St3 = St0#cq{amq=Res1}, - - Res4 = cmd_channel_open(St3), - true = postcondition(St3, {call, undefined, cmd_channel_open, [St3]}, Res4), - St7 = next_state(St3, Res4, {call, undefined, cmd_channel_open, [St3]}), - - Res8 = cmd_restart_queue_dirty(St7), - true = postcondition(St7, {call, undefined, cmd_restart_queue_dirty, [St7]}, Res8), - St11 = next_state(St7, Res8, {call, undefined, cmd_restart_queue_dirty, [St7]}), - - Res12 = cmd_channel_publish_many(St11, Res4, 117, 4541, 2, true, undefined), - true = postcondition(St11, {call, undefined, cmd_channel_publish_many, [St11, Res4, 117, 4541, 2, true, undefined]}, Res12), - St14 = next_state(St11, Res12, {call, undefined, cmd_channel_publish_many, [St11, Res4, 117, 4541, 2, true, undefined]}), - - Res15 = cmd_restart_vhost_clean(St14), - true = postcondition(St14, {call, undefined, cmd_restart_vhost_clean, [St14]}, Res15), - St15 = next_state(St14, Res15, {call, undefined, cmd_restart_vhost_clean, [St14]}), - - cmd_teardown_queue(St15), - - true. - -%% The following reg_v1_no_del_* cases test when a classic queue has a -%% published message before an upgrade to 3.10. In that case there is -%% no delivery marker in the v1 queue index. - -%% After upgrade to 3.10 there is a published message in the journal file. -%% Consuming and acknowledging the message should work fine. -reg_v1_no_del_jif(Config) -> - try - true = rabbit_ct_broker_helpers:rpc( - Config, 0, ?MODULE, do_reg_v1_no_del_jif, [Config]) - catch exit:{exception, Reason} -> - exit(Reason) - end. - -do_reg_v1_no_del_jif(Config) -> - St0 = #cq{name=prop_classic_queue_v1, version=1, - config=minimal_config(Config)}, - - Res1 = cmd_setup_queue(St0), - St3 = St0#cq{amq=Res1}, - - {St4, Ch} = cmd(cmd_channel_open, St3, []), - - %% Simulate pre-3.10.0 behaviour by making deliver a noop - ok = meck:new(rabbit_queue_index, [passthrough]), - ok = meck:expect(rabbit_queue_index, deliver, fun(_, State) -> State end), - - {St5, _Res5} = cmd(cmd_channel_publish, St4, [Ch, 4, _Persistent = 2, _NotMandatory = false, _NoExpiration = undefined]), - - %% Enforce syncing journal to disk - %% (Not strictly necessary as vhost restart also triggers a sync) - %% At this point there should be a publish entry in the journal and no segment files - rabbit_amqqueue:pid_of(St5#cq.amq) ! timeout, - - {SyncTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, sync, '_', 1000) end), - ct:pal("wait for sync took ~p ms", [SyncTime div 1000]), - - %% Simulate RabbitMQ version upgrade by a clean vhost restart - %% (also reset delivery to normal operation) - ok = meck:delete(rabbit_queue_index, deliver, 2), - {St10, _} = cmd(cmd_restart_vhost_clean, St5, []), - - meck:reset(rabbit_queue_index), - - %% Consume the message and acknowledge it - %% The queue index should not crash when finding a pub+ack but no_del in the journal - %% (It used to crash in `action_to_entry/3' with a case_clause) - {St6, _Tag} = cmd(cmd_channel_consume, St10, [Ch]), - receive SomeMsg -> self() ! SomeMsg - after 5000 -> ct:fail(no_message_consumed) - end, - {St7, _Msg = #amqp_msg{}} = cmd(cmd_channel_receive_and_ack, St6, [Ch]), - - %% enforce syncing journal to disk - rabbit_amqqueue:pid_of(St7#cq.amq) ! timeout, - - {SyncTime2, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, sync, '_', 1000) end), - ct:pal("wait for sync took ~p ms", [SyncTime2 div 1000]), - - validate_and_teaddown(St7). - -%% After upgrade to 3.10 there is a published message in a segment file. -%% Consuming and acknowledging the message inserts an ack entry in the journal file. -%% A subsequent restart (of the queue/vhost/node) should work fine. -reg_v1_no_del_idx(Config) -> - try - true = rabbit_ct_broker_helpers:rpc( - Config, 0, ?MODULE, do_reg_v1_no_del_idx, [Config]) - catch exit:{exception, Reason} -> - exit(Reason) - end. - -do_reg_v1_no_del_idx(Config) -> - St0 = #cq{name=prop_classic_queue_v1, version=1, - config=minimal_config(Config)}, - - Res1 = cmd_setup_queue(St0), - St3 = St0#cq{amq=Res1}, - - {St4, Ch} = cmd(cmd_channel_open, St3, []), - - %% Simulate pre-3.10.0 behaviour by making deliver a noop - ok = meck:new(rabbit_queue_index, [passthrough]), - ok = meck:expect(rabbit_queue_index, deliver, fun(_, State) -> State end), - - ok = meck:new(rabbit_variable_queue, [passthrough]), - - {St5, _Res5} = cmd(cmd_channel_publish, St4, [Ch, 4, _Persistent = 2, _NotMandatory = false, _NoExpiration = undefined]), - - %% Wait for the queue process to get hibernated - %% handle_pre_hibernate syncs and flushes the journal - %% At this point there should be a publish entry in the segment file and an empty journal - {Time, ok} = timer:tc(fun() -> meck:wait(rabbit_variable_queue, handle_pre_hibernate, '_', 10000) end), - ct:pal("wait for hibernate took ~p ms", [Time div 1000]), - ok = meck:unload(rabbit_variable_queue), - - %% Simulate RabbitMQ version upgrade by a clean vhost restart - %% (also reset delivery to normal operation) - ok = meck:delete(rabbit_queue_index, deliver, 2), - {St10, _} = cmd(cmd_restart_vhost_clean, St5, []), - - %% Consume the message and acknowledge it - {St6, _Tag} = cmd(cmd_channel_consume, St10, [Ch]), - receive SomeMsg -> self() ! SomeMsg - after 5000 -> ct:fail(no_message_consumed) - end, - {St7, _Msg = #amqp_msg{}} = cmd(cmd_channel_receive_and_ack, St6, [Ch]), - - meck:reset(rabbit_queue_index), - - %% enforce syncing journal to disk - %% At this point there should be a publish entry in the segment file and an ack in the journal - rabbit_amqqueue:pid_of(St7#cq.amq) ! timeout, - {SyncTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, sync, '_', 1000) end), - ct:pal("wait for sync took ~p ms", [SyncTime div 1000]), - - meck:reset(rabbit_queue_index), - - %% Another clean vhost restart - %% The queue index should not crash when finding a pub in a - %% segment, an ack in the journal, but no_del - %% (It used to crash in `segment_plus_journal1/2' with a function_clause) - catch cmd(cmd_restart_vhost_clean, St7, []), - - {ReadTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, read, '_', 1000) end), - ct:pal("wait for queue read took ~p ms", [ReadTime div 1000]), - - validate_and_teaddown(St7). - -%% After upgrade to 3.10 there is a published message in a segment file. -%% Consuming and acknowledging the message inserts an ack entry in the journal file. -%% The recovery after a subsequent unclean shutdown (of the queue/vhost/node) should work fine. -reg_v1_no_del_idx_unclean(Config) -> - try - true = rabbit_ct_broker_helpers:rpc( - Config, 0, ?MODULE, do_reg_v1_no_del_idx_unclean, [Config]) - catch exit:{exception, Reason} -> - exit(Reason) - end. - -do_reg_v1_no_del_idx_unclean(Config) -> - St0 = #cq{name=prop_classic_queue_v1, version=1, - config=minimal_config(Config)}, - - Res1 = cmd_setup_queue(St0), - St3 = St0#cq{amq=Res1}, - - {St4, Ch} = cmd(cmd_channel_open, St3, []), - - %% Simulate pre-3.10.0 behaviour by making deliver a noop - ok = meck:new(rabbit_queue_index, [passthrough]), - ok = meck:expect(rabbit_queue_index, deliver, fun(_, State) -> State end), - - ok = meck:new(rabbit_variable_queue, [passthrough]), - - {St5, _Res5} = cmd(cmd_channel_publish, St4, [Ch, 4, _Persistent = 2, _NotMandatory = false, _NoExpiration = undefined]), - - %% Wait for the queue process to get hibernated - %% handle_pre_hibernate syncs and flushes the journal - %% At this point there should be a publish entry in the segment file and an empty journal - {Time, ok} = timer:tc(fun() -> meck:wait(rabbit_variable_queue, handle_pre_hibernate, '_', 10000) end), - ct:pal("wait for hibernate took ~p ms", [Time div 1000]), - ok = meck:unload(rabbit_variable_queue), - - %% Simulate RabbitMQ version upgrade by a clean vhost restart - %% (also reset delivery to normal operation) - ok = meck:delete(rabbit_queue_index, deliver, 2), - {St10, _} = cmd(cmd_restart_vhost_clean, St5, []), - - %% Consume the message and acknowledge it - {St6, _Tag} = cmd(cmd_channel_consume, St10, [Ch]), - receive SomeMsg -> self() ! SomeMsg - after 5000 -> ct:fail(no_message_consumed) - end, - meck:reset(rabbit_queue_index), - {St7, _Msg = #amqp_msg{}} = cmd(cmd_channel_receive_and_ack, St6, [Ch]), - - %% (need to ensure that the queue processed the ack before triggering the sync) - {AckTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, ack, '_', 1000) end), - ct:pal("wait for ack took ~p ms", [AckTime div 1000]), - - %% enforce syncing journal to disk - %% At this point there should be a publish entry in the segment file and an ack in the journal - rabbit_amqqueue:pid_of(St7#cq.amq) ! timeout, - {SyncTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, sync, '_', 1000) end), - ct:pal("wait for sync took ~p ms", [SyncTime div 1000]), - - meck:reset(rabbit_queue_index), - - %% Recovery after unclean queue shutdown - %% The queue index should not crash when finding a pub in a - %% segment, an ack in the journal, but no_del - %% (It used to crash in `journal_minus_segment1/2' with a function_clause) - {St20, _} = cmd(cmd_restart_queue_dirty, St7, []), - - {RecoverTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, recover, '_', 1000) end), - ct:pal("wait for queue recover took ~p ms", [RecoverTime div 1000]), - - validate_and_teaddown(St20). - cmd(CmdName, StIn, ExtraArgs) -> Res0 = apply(?MODULE, CmdName, [StIn | ExtraArgs]), true = postcondition(StIn, {call, undefined, CmdName, [StIn | ExtraArgs]}, Res0), diff --git a/deps/rabbit/test/priority_queue_SUITE.erl b/deps/rabbit/test/priority_queue_SUITE.erl index eb7df79378bd..3c34f5adee79 100644 --- a/deps/rabbit/test/priority_queue_SUITE.erl +++ b/deps/rabbit/test/priority_queue_SUITE.erl @@ -392,6 +392,8 @@ info_head_message_timestamp1(_Config) -> PQ:delete_and_terminate(a_whim, BQS6), passed. +%% Because queue version is now ignored, this test is expected +%% to always get a queue version 2. info_backing_queue_version(Config) -> {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Q1 = <<"info-priority-queue-v1">>, @@ -402,7 +404,7 @@ info_backing_queue_version(Config) -> {<<"x-queue-version">>, byte, 2}]), try {ok, [{backing_queue_status, BQS1}]} = info(Config, Q1, [backing_queue_status]), - 1 = proplists:get_value(version, BQS1), + 2 = proplists:get_value(version, BQS1), {ok, [{backing_queue_status, BQS2}]} = info(Config, Q2, [backing_queue_status]), 2 = proplists:get_value(version, BQS2) after diff --git a/deps/rabbit/test/unicode_SUITE.erl b/deps/rabbit/test/unicode_SUITE.erl index 65088e613961..4f28d1362c24 100644 --- a/deps/rabbit/test/unicode_SUITE.erl +++ b/deps/rabbit/test/unicode_SUITE.erl @@ -17,7 +17,6 @@ all() -> groups() -> [ {queues, [], [ - classic_queue_v1, classic_queue_v2, quorum_queue, stream @@ -57,14 +56,7 @@ init_per_testcase(Testcase, Config) -> end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). -classic_queue_v1(Config) -> - ok = rabbit_ct_broker_helpers:rpc( - Config, 0, application, set_env, [rabbit, classic_queue_default_version, 1]), - ok = queue(Config, ?FUNCTION_NAME, []). - classic_queue_v2(Config) -> - ok = rabbit_ct_broker_helpers:rpc( - Config, 0, application, set_env, [rabbit, classic_queue_default_version, 2]), ok = queue(Config, ?FUNCTION_NAME, []). quorum_queue(Config) -> diff --git a/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl b/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl index d8738b1de580..07c67b857d9c 100644 --- a/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl +++ b/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl @@ -1135,21 +1135,21 @@ queues_test(Config) -> auto_delete => false, exclusive => false, arguments => #{}, - storage_version => 1}, + storage_version => 2}, #{name => <<"foo">>, vhost => <<"/">>, durable => true, auto_delete => false, exclusive => false, arguments => #{}, - storage_version => 1}], Queues), + storage_version => 2}], Queues), assert_item(#{name => <<"foo">>, vhost => <<"/">>, durable => true, auto_delete => false, exclusive => false, arguments => #{}, - storage_version => 1}, Queue), + storage_version => 2}, Queue), http_delete(Config, "/queues/%2F/foo", {group, '2xx'}), http_delete(Config, "/queues/%2F/baz", {group, '2xx'}), @@ -2339,8 +2339,8 @@ queue_pagination_test(Config) -> ?assertEqual(1, maps:get(page, PageOfTwo)), ?assertEqual(2, maps:get(page_size, PageOfTwo)), ?assertEqual(2, maps:get(page_count, PageOfTwo)), - assert_list([#{name => <<"test0">>, vhost => <<"/">>, storage_version => 1}, - #{name => <<"test2_reg">>, vhost => <<"/">>, storage_version => 1} + assert_list([#{name => <<"test0">>, vhost => <<"/">>, storage_version => 2}, + #{name => <<"test2_reg">>, vhost => <<"/">>, storage_version => 2} ], maps:get(items, PageOfTwo)), SortedByName = http_get(Config, "/queues?sort=name&page=1&page_size=2", ?OK), diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl index acfadfc270d6..e89d25dfcd33 100644 --- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl @@ -201,11 +201,8 @@ init_per_group(Group, Config0) -> Config1, [{rmq_nodes_count, Nodes}, {rmq_nodename_suffix, Suffix}]), - Config3 = rabbit_ct_helpers:merge_app_env( - Config2, - {rabbit, [{classic_queue_default_version, 2}]}), Config = rabbit_ct_helpers:run_steps( - Config3, + Config2, rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()), util:maybe_skip_v5(Config). diff --git a/deps/rabbitmq_mqtt/test/v5_SUITE.erl b/deps/rabbitmq_mqtt/test/v5_SUITE.erl index 4ca22953ed58..309e221aabe2 100644 --- a/deps/rabbitmq_mqtt/test/v5_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/v5_SUITE.erl @@ -169,8 +169,7 @@ init_per_group(Group, Config0) -> {rmq_nodename_suffix, Suffix}]), Config2 = rabbit_ct_helpers:merge_app_env( Config1, - {rabbit, [{classic_queue_default_version, 2}, - {quorum_tick_interval, 200}]}), + {rabbit, [{quorum_tick_interval, 200}]}), Config = rabbit_ct_helpers:run_steps( Config2, rabbit_ct_broker_helpers:setup_steps() ++