diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index 192ad3918c1a..d14486bbf054 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -249,7 +249,7 @@ define ct_master.erl endef PARALLEL_CT_SET_1_A = unit_rabbit_ssl unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking -PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_filter_prop amqp_filter_sql amqp_filter_sql_unit amqp_dotnet server_named_queue_prefix_prop signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management +PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_filter_prop amqp_filter_sql amqp_filter_sql_unit amqp_dotnet server_named_queue_prefix_prop signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management rabbit_direct PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange cluster_limit cluster_minority default_queue_type_prop term_to_binary_compat_prop topic_permission unicode unit_access_control user_tags_count_limit PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit server_named_queue_prefix @@ -263,7 +263,7 @@ PARALLEL_CT_SET_3_B = list_consumers_sanity_check list_queues_online_and_offline PARALLEL_CT_SET_3_C = cli_forget_cluster_node mc_unit message_size_limit PARALLEL_CT_SET_3_D = metrics mirrored_supervisor proxy_protocol runtime_parameters unit_rabbit_vm unit_stats_and_metrics unit_supervisor2 unit_vm_memory_monitor -PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_msg_interceptor rabbitmq_4_0_deprecations unit_pg_local unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue +PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_msg_interceptor rabbitmq_4_0_deprecations unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue PARALLEL_CT_SET_4_B = per_user_connection_tracking per_vhost_connection_limit rabbit_fifo_int unit_default_queue_type PARALLEL_CT_SET_4_C = msg_size_metrics unit_msg_size_metrics per_vhost_msg_store per_vhost_queue_limit priority_queue upgrade_preparation vhost PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info queue_type rabbitmq_queues_cli_integration rabbitmq_streams_cli_integration rabbitmqctl_integration rabbitmqctl_shutdown routing rabbit_amqqueue diff --git a/deps/rabbit/src/pg_local.erl b/deps/rabbit/src/pg_local.erl deleted file mode 100644 index 6397ea69a5c0..000000000000 --- a/deps/rabbit/src/pg_local.erl +++ /dev/null @@ -1,251 +0,0 @@ -%% This file is a copy of pg2.erl from the R13B-3 Erlang/OTP -%% distribution, with the following modifications: -%% -%% 1) Process groups are node-local only. -%% -%% 2) Groups are created/deleted implicitly. -%% -%% 3) 'join' and 'leave' are asynchronous. -%% -%% 4) the type specs of the exported non-callback functions have been -%% extracted into a separate, guarded section, and rewritten in -%% old-style spec syntax, for better compatibility with older -%% versions of Erlang/OTP. The remaining type specs have been -%% removed. - -%% All modifications are (C) 2007-2024 Broadcom. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. - -%% %CopyrightBegin% -%% -%% Copyright Ericsson AB 1997-2009. All Rights Reserved. -%% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at https://www.erlang.org/. -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. -%% -%% %CopyrightEnd% -%% --module(pg_local). - --include_lib("kernel/include/logger.hrl"). - --export([join/2, leave/2, get_members/1, in_group/2]). -%% intended for testing only; not part of official API --export([sync/0, clear/0]). --export([start/0, start_link/0, init/1, handle_call/3, handle_cast/2, - handle_info/2, terminate/2]). - -%%---------------------------------------------------------------------------- - --type name() :: term(). - -%%---------------------------------------------------------------------------- - --define(TABLE, pg_local_table). - -%%% -%%% Exported functions -%%% - --spec start_link() -> {'ok', pid()} | {'error', any()}. - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - --spec start() -> {'ok', pid()} | {'error', any()}. - -start() -> - ensure_started(). - --spec join(name(), pid()) -> 'ok'. - -join(Name, Pid) when is_pid(Pid) -> - _ = ensure_started(), - gen_server:cast(?MODULE, {join, Name, Pid}). - --spec leave(name(), pid()) -> 'ok'. - -leave(Name, Pid) when is_pid(Pid) -> - _ = ensure_started(), - gen_server:cast(?MODULE, {leave, Name, Pid}). - --spec get_members(name()) -> [pid()]. - -get_members(Name) -> - _ = ensure_started(), - group_members(Name). - --spec in_group(name(), pid()) -> boolean(). - -in_group(Name, Pid) -> - _ = ensure_started(), - %% The join message is a cast and thus can race, but we want to - %% keep it that way to be fast in the common case. - case member_present(Name, Pid) of - true -> true; - false -> sync(), - member_present(Name, Pid) - end. - --spec sync() -> 'ok'. - -sync() -> - _ = ensure_started(), - gen_server:call(?MODULE, sync, infinity). - -clear() -> - _ = ensure_started(), - gen_server:call(?MODULE, clear, infinity). - -%%% -%%% Callback functions from gen_server -%%% - --record(state, {}). - -init([]) -> - ?TABLE = ets:new(?TABLE, [ordered_set, protected, named_table]), - {ok, #state{}}. - -handle_call(sync, _From, S) -> - {reply, ok, S}; - -handle_call(clear, _From, S) -> - ets:delete_all_objects(?TABLE), - {reply, ok, S}; - -handle_call(Request, From, S) -> - ?LOG_WARNING("The pg_local server received an unexpected message:\n" - "handle_call(~tp, ~tp, _)\n", - [Request, From]), - {noreply, S}. - -handle_cast({join, Name, Pid}, S) -> - _ = join_group(Name, Pid), - {noreply, S}; -handle_cast({leave, Name, Pid}, S) -> - leave_group(Name, Pid), - {noreply, S}; -handle_cast(_, S) -> - {noreply, S}. - -handle_info({'DOWN', MonitorRef, process, Pid, _Info}, S) -> - member_died(MonitorRef, Pid), - {noreply, S}; -handle_info(_, S) -> - {noreply, S}. - -terminate(_Reason, _S) -> - true = ets:delete(?TABLE), - ok. - -%%% -%%% Local functions -%%% - -%%% One ETS table, pg_local_table, is used for bookkeeping. The type of the -%%% table is ordered_set, and the fast matching of partially -%%% instantiated keys is used extensively. -%%% -%%% {{ref, Pid}, MonitorRef, Counter} -%%% {{ref, MonitorRef}, Pid} -%%% Each process has one monitor. Counter is incremented when the -%%% Pid joins some group. -%%% {{member, Name, Pid}, _} -%%% Pid is a member of group Name, GroupCounter is incremented when the -%%% Pid joins the group Name. -%%% {{pid, Pid, Name}} -%%% Pid is a member of group Name. - -member_died(Ref, Pid) -> - _ = case ets:lookup(?TABLE, {ref, Ref}) of - [{{ref, Ref}, Pid}] -> - leave_all_groups(Pid); - %% in case the key has already been removed - %% we can clean up using the value from the DOWN message - _ -> - leave_all_groups(Pid) - end, - ok. - -leave_all_groups(Pid) -> - Names = member_groups(Pid), - _ = [leave_group(Name, P) || - Name <- Names, - P <- member_in_group(Pid, Name)]. - -join_group(Name, Pid) -> - Ref_Pid = {ref, Pid}, - _ = try ets:update_counter(?TABLE, Ref_Pid, {3, +1}) - catch _:_ -> - Ref = erlang:monitor(process, Pid), - true = ets:insert(?TABLE, {Ref_Pid, Ref, 1}), - true = ets:insert(?TABLE, {{ref, Ref}, Pid}) - end, - Member_Name_Pid = {member, Name, Pid}, - try _ = ets:update_counter(?TABLE, Member_Name_Pid, {2, +1}) - catch _:_ -> - true = ets:insert(?TABLE, {Member_Name_Pid, 1}), - true = ets:insert(?TABLE, {{pid, Pid, Name}}) - end. - -leave_group(Name, Pid) -> - Member_Name_Pid = {member, Name, Pid}, - try ets:update_counter(?TABLE, Member_Name_Pid, {2, -1}) of - N -> - if - N =:= 0 -> - true = ets:delete(?TABLE, {pid, Pid, Name}), - true = ets:delete(?TABLE, Member_Name_Pid); - true -> - ok - end, - Ref_Pid = {ref, Pid}, - case ets:update_counter(?TABLE, Ref_Pid, {3, -1}) of - 0 -> - [{Ref_Pid,Ref,0}] = ets:lookup(?TABLE, Ref_Pid), - true = ets:delete(?TABLE, {ref, Ref}), - true = ets:delete(?TABLE, Ref_Pid), - true = erlang:demonitor(Ref, [flush]), - ok; - _ -> - ok - end - catch _:_ -> - ok - end. - -group_members(Name) -> - [P || - [P, N] <- ets:match(?TABLE, {{member, Name, '$1'},'$2'}), - _ <- lists:seq(1, N)]. - -member_in_group(Pid, Name) -> - [{{member, Name, Pid}, N}] = ets:lookup(?TABLE, {member, Name, Pid}), - lists:duplicate(N, Pid). - -member_present(Name, Pid) -> - case ets:lookup(?TABLE, {member, Name, Pid}) of - [_] -> true; - [] -> false - end. - -member_groups(Pid) -> - [Name || [Name] <- ets:match(?TABLE, {{pid, Pid, '$1'}})]. - -ensure_started() -> - case whereis(?MODULE) of - undefined -> - C = {pg_local, {?MODULE, start_link, []}, permanent, - 16#ffffffff, worker, [?MODULE]}, - supervisor:start_child(kernel_safe_sup, C); - PgLocalPid -> - {ok, PgLocalPid} - end. diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index e2907f8d4965..a35dbd1a7042 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -30,7 +30,10 @@ base_product_version/0, motd_file/0, motd/0, - pg_local_scope/1]). + pg_local_scope/1, + pg_scope_amqp091_channel/0, + pg_scope_amqp091_connection/0, + pg_scope_non_amqp_connection/0]). %% For CLI, testing and mgmt-agent. -export([set_log_level/1, log_locations/0, config_files/0]). -export([is_booted/1, is_booted/0, is_booting/1, is_booting/0]). @@ -39,7 +42,11 @@ %% Boot steps. -export([update_cluster_tags/0, maybe_insert_default_data/0, boot_delegate/0, recover/0, pg_local_amqp_session/0, - pg_local_amqp_connection/0, prevent_startup_if_node_was_reset/0]). + pg_local_amqp_connection/0, + pg_local_amqp091_channel/0, + pg_local_amqp091_connection/0, + pg_local_non_amqp_connection/0, + prevent_startup_if_node_was_reset/0]). -rabbit_boot_step({pre_boot, [{description, "rabbit boot start"}]}). @@ -286,11 +293,29 @@ {enables, core_initialized}]}). -rabbit_boot_step({pg_local_amqp_connection, - [{description, "local-only pg scope for AMQP connections"}, + [{description, "local-only pg scope for AMQP 1.0 connections"}, {mfa, {rabbit, pg_local_amqp_connection, []}}, {requires, kernel_ready}, {enables, core_initialized}]}). +-rabbit_boot_step({pg_local_amqp091_channel, + [{description, "local-only pg scope for AMQP 0-9-1 channels"}, + {mfa, {rabbit, pg_local_amqp091_channel, []}}, + {requires, kernel_ready}, + {enables, core_initialized}]}). + +-rabbit_boot_step({pg_local_amqp091_connection, + [{description, "local-only pg scope for AMQP 0-9-1 connections"}, + {mfa, {rabbit, pg_local_amqp091_connection, []}}, + {requires, kernel_ready}, + {enables, core_initialized}]}). + +-rabbit_boot_step({pg_local_non_amqp_connection, + [{description, "local-only pg scope for non-AMQP connections"}, + {mfa, {rabbit, pg_local_non_amqp_connection, []}}, + {requires, kernel_ready}, + {enables, core_initialized}]}). + %%--------------------------------------------------------------------------- -include_lib("rabbit_common/include/rabbit.hrl"). @@ -1122,9 +1147,33 @@ pg_local_amqp_connection() -> PgScope = pg_local_scope(amqp_connection), rabbit_sup:start_child(pg_amqp_connection, pg, [PgScope]). +pg_local_amqp091_channel() -> + PgScope = pg_local_scope(amqp091_channel), + persistent_term:put(pg_scope_amqp091_channel, PgScope), + rabbit_sup:start_child(pg_amqp091_channel, pg, [PgScope]). + +pg_local_amqp091_connection() -> + PgScope = pg_local_scope(amqp091_connection), + persistent_term:put(pg_scope_amqp091_connection, PgScope), + rabbit_sup:start_child(pg_amqp091_connection, pg, [PgScope]). + +pg_local_non_amqp_connection() -> + PgScope = pg_local_scope(non_amqp_connection), + persistent_term:put(pg_scope_non_amqp_connection, PgScope), + rabbit_sup:start_child(pg_non_amqp_connection, pg, [PgScope]). + pg_local_scope(Prefix) -> list_to_atom(io_lib:format("~s_~s", [Prefix, node()])). +pg_scope_amqp091_channel() -> + persistent_term:get(pg_scope_amqp091_channel). + +pg_scope_amqp091_connection() -> + persistent_term:get(pg_scope_amqp091_connection). + +pg_scope_non_amqp_connection() -> + persistent_term:get(pg_scope_non_amqp_connection). + -spec update_cluster_tags() -> 'ok'. update_cluster_tags() -> diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 858040f068ed..9b1905f9a017 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -307,7 +307,9 @@ list() -> -spec list_local() -> [pid()]. list_local() -> - pg_local:get_members(rabbit_channels). + try pg:which_groups(pg_scope()) + catch error:badarg -> [] + end. -spec info_keys() -> rabbit_types:info_keys(). @@ -425,6 +427,10 @@ update_user_state(Pid, UserState) when is_pid(Pid) -> %%--------------------------------------------------------------------------- +-spec pg_scope() -> atom(). +pg_scope() -> + rabbit:pg_scope_amqp091_channel(). + init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, User, VHost, Capabilities, CollectorPid, LimiterPid, AmqpParams]) -> process_flag(trap_exit, true), @@ -433,7 +439,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, User, VHost, ?LG_PROCESS_TYPE(channel), ?store_proc_name({ConnName, Channel}), - ok = pg_local:join(rabbit_channels, self()), + ok = pg:join(pg_scope(), self(), self()), Flow = case rabbit_misc:get_env(rabbit, classic_queue_flow_control, true) of true -> flow; false -> noflow @@ -732,7 +738,7 @@ terminate(_Reason, queue_states = QueueCtxs}) -> rabbit_queue_type:close(QueueCtxs), {_Res, _State1} = notify_queues(State), - pg_local:leave(rabbit_channels, self()), + pg:leave(pg_scope(), self(), self()), rabbit_event:if_enabled(State, #ch.stats_timer, fun() -> emit_stats(State) end), [delete_stats(Tag) || {Tag, _} <- get()], diff --git a/deps/rabbit/src/rabbit_direct.erl b/deps/rabbit/src/rabbit_direct.erl index b5363b9bc6c2..3563495013dc 100644 --- a/deps/rabbit/src/rabbit_direct.erl +++ b/deps/rabbit/src/rabbit_direct.erl @@ -20,14 +20,18 @@ -include_lib("rabbit_common/include/rabbit_misc.hrl"). -include_lib("kernel/include/logger.hrl"). +-define(TABLE, ?MODULE). + %%---------------------------------------------------------------------------- -spec boot() -> 'ok'. -boot() -> rabbit_sup:start_supervisor_child( - rabbit_direct_client_sup, rabbit_client_sup, - [{local, rabbit_direct_client_sup}, - {rabbit_channel_sup, start_link, []}]). +boot() -> + ?TABLE = ets:new(?TABLE, [set, public, named_table]), + rabbit_sup:start_supervisor_child( + rabbit_direct_client_sup, rabbit_client_sup, + [{local, rabbit_direct_client_sup}, + {rabbit_channel_sup, start_link, []}]). -spec force_event_refresh(reference()) -> 'ok'. @@ -38,7 +42,7 @@ force_event_refresh(Ref) -> -spec list_local() -> [pid()]. list_local() -> - pg_local:get_members(rabbit_direct). + [Pid || {Pid} <- ets:tab2list(?TABLE)]. -spec list() -> [pid()]. @@ -186,7 +190,7 @@ connect1(User = #user{username = Username}, VHost, Pid, Infos) -> AuthzContext = proplists:get_value(variable_map, Infos, #{}), try rabbit_access_control:check_vhost_access(User, VHost, {ip, PeerHost}, AuthzContext) of - ok -> ok = pg_local:join(rabbit_direct, Pid), + ok -> ets:insert(?TABLE, {Pid}), rabbit_core_metrics:connection_created(Pid, Infos), rabbit_event:notify(connection_created, Infos), _ = rabbit_alarm:register( @@ -246,7 +250,7 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, -spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'. disconnect(Pid, Infos) -> - pg_local:leave(rabbit_direct, Pid), + ets:delete(?TABLE, Pid), rabbit_core_metrics:connection_closed(Pid), rabbit_event:notify(connection_closed, Infos). diff --git a/deps/rabbit/src/rabbit_networking.erl b/deps/rabbit/src/rabbit_networking.erl index a03293dc46de..dc8813357272 100644 --- a/deps/rabbit/src/rabbit_networking.erl +++ b/deps/rabbit/src/rabbit_networking.erl @@ -468,13 +468,18 @@ node_client_listeners(Node) -> end, Xs) end. +pg_scope_amqp091_connection() -> + rabbit:pg_scope_amqp091_connection(). + -spec register_connection(pid()) -> ok. -register_connection(Pid) -> pg_local:join(rabbit_connections, Pid). +register_connection(Pid) -> + pg:join(pg_scope_amqp091_connection(), Pid, Pid). -spec unregister_connection(pid()) -> ok. -unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid). +unregister_connection(Pid) -> + pg:leave(pg_scope_amqp091_connection(), Pid, Pid). -spec connections() -> [rabbit_types:connection()]. connections() -> @@ -483,17 +488,17 @@ connections() -> -spec local_connections() -> [rabbit_types:connection()]. local_connections() -> - Amqp091Pids = pg_local:get_members(rabbit_connections), + Amqp091Pids = pg:which_groups(pg_scope_amqp091_connection()), Amqp10Pids = rabbit_amqp1_0:list_local(), Amqp10Pids ++ Amqp091Pids. -spec register_non_amqp_connection(pid()) -> ok. -register_non_amqp_connection(Pid) -> pg_local:join(rabbit_non_amqp_connections, Pid). +register_non_amqp_connection(Pid) -> pg:join(rabbit:pg_scope_non_amqp_connection(), Pid, Pid). -spec unregister_non_amqp_connection(pid()) -> ok. -unregister_non_amqp_connection(Pid) -> pg_local:leave(rabbit_non_amqp_connections, Pid). +unregister_non_amqp_connection(Pid) -> pg:leave(rabbit:pg_scope_non_amqp_connection(), Pid, Pid). -spec non_amqp_connections() -> [rabbit_types:connection()]. @@ -503,7 +508,7 @@ non_amqp_connections() -> -spec local_non_amqp_connections() -> [rabbit_types:connection()]. local_non_amqp_connections() -> - pg_local:get_members(rabbit_non_amqp_connections). + pg:which_local_groups(rabbit:pg_scope_non_amqp_connection()). -spec connection_info(rabbit_types:connection(), rabbit_types:info_keys()) -> rabbit_types:infos(). diff --git a/deps/rabbit/src/rabbit_volatile_queue.erl b/deps/rabbit/src/rabbit_volatile_queue.erl index ccb176a18785..3e6bc09726be 100644 --- a/deps/rabbit/src/rabbit_volatile_queue.erl +++ b/deps/rabbit/src/rabbit_volatile_queue.erl @@ -208,7 +208,7 @@ local_call(Pid, Request) -> is_local(Pid) -> rabbit_amqp_session:is_local(Pid) orelse - pg_local:in_group(rabbit_channels, Pid). + pg:get_local_members(rabbit:pg_scope_amqp091_channel(), Pid) =/= []. handle_event(QName, {deliver, Msg}, #?STATE{name = QName, ctag = Ctag, diff --git a/deps/rabbit/test/proxy_protocol_SUITE.erl b/deps/rabbit/test/proxy_protocol_SUITE.erl index 52b62630c625..8eacbb69f850 100644 --- a/deps/rabbit/test/proxy_protocol_SUITE.erl +++ b/deps/rabbit/test/proxy_protocol_SUITE.erl @@ -110,8 +110,14 @@ proxy_protocol_v2_local(Config) -> ok. connection_name() -> - ?awaitMatch([_], pg_local:get_members(rabbit_connections), 30000), - [Pid] = pg_local:get_members(rabbit_connections), + Scope = rabbit:pg_scope_amqp091_connection(), + GetGroups = fun() -> + try pg:which_groups(Scope) + catch error:badarg -> [] + end + end, + ?awaitMatch([_], GetGroups(), 30000), + [Pid] = GetGroups(), {dictionary, Dict} = process_info(Pid, dictionary), {process_name, {rabbit_reader, ConnectionName}} = lists:keyfind(process_name, 1, Dict), ConnectionName. @@ -119,6 +125,10 @@ connection_name() -> wait_for_connection_close(Config) -> ?awaitMatch( [], - rabbit_ct_broker_helpers:rpc( - Config, 0, pg_local, get_members, [rabbit_connnections]), + begin + Scope = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit, pg_scope_amqp091_connection, []), + try rabbit_ct_broker_helpers:rpc(Config, 0, pg, which_groups, [Scope]) + catch error:badarg -> [] + end + end, 30000). diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 921238319aee..948486094d1b 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -2983,8 +2983,17 @@ cleanup_queue_state_on_channel_after_publish(Config) -> [NCh1, NCh2] = rpc:call(Server, rabbit_channel, list, []) -- ChsBefore, %% Check the channel state contains the state for the quorum queue on %% channel 1 and 2 - wait_for_cleanup(Server, NCh1, 0), - wait_for_cleanup(Server, NCh2, 1), + %% Note: pg:get_local_members doesn't guarantee order, so we need to identify + %% which channel has queue state + {ChWithoutState, ChWithState} = case length(rpc:call(Server, + rabbit_channel, + list_queue_states, + [NCh1])) of + 0 -> {NCh1, NCh2}; + 1 -> {NCh2, NCh1} + end, + wait_for_cleanup(Server, ChWithoutState, 0), + wait_for_cleanup(Server, ChWithState, 1), %% then delete the queue and wait for the process to terminate ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})), @@ -2994,8 +3003,8 @@ cleanup_queue_state_on_channel_after_publish(Config) -> [?SUPNAME])) end, 30000), %% Check that all queue states have been cleaned - wait_for_cleanup(Server, NCh2, 0), - wait_for_cleanup(Server, NCh1, 0). + wait_for_cleanup(Server, ChWithState, 0), + wait_for_cleanup(Server, ChWithoutState, 0). cleanup_queue_state_on_channel_after_subscribe(Config) -> %% Declare/delete the queue and publish in one channel, while consuming on a diff --git a/deps/rabbit/test/rabbit_direct_SUITE.erl b/deps/rabbit/test/rabbit_direct_SUITE.erl new file mode 100644 index 000000000000..0813f5659ad4 --- /dev/null +++ b/deps/rabbit/test/rabbit_direct_SUITE.erl @@ -0,0 +1,81 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rabbit_direct_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-export([all/0, groups/0]). +-export([init_per_suite/1, end_per_suite/1, + init_per_group/2, end_per_group/2, + init_per_testcase/2, end_per_testcase/2]). +-export([direct_connection_registered/1]). + +all() -> + [{group, tests}]. + +groups() -> + [{tests, [], [direct_connection_registered]}]. + +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + Config1 = rabbit_ct_helpers:set_config(Config, [{rmq_nodename_suffix, Testcase}]), + rabbit_ct_helpers:run_setup_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%% ------------------------------------------------------------------- + +direct_connection_registered(Config) -> + Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + BeforeLocal = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_direct, list_local, []), + BeforeList = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_direct, list, []), + ?assertEqual([], BeforeLocal), + ?assertEqual([], BeforeList), + + Params = #amqp_params_direct{node = Node, + virtual_host = <<"/">>, + username = <<"guest">>, + password = <<"guest">>}, + {ok, Conn} = amqp_connection:start(Params), + true = is_pid(Conn), + + AfterLocal = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_direct, list_local, []), + AfterList = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_direct, list, []), + ?assertEqual([Conn], AfterLocal), + ?assertEqual([Conn], AfterList), + + ok = amqp_connection:close(Conn), + + FinalLocal = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_direct, list_local, []), + FinalList = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_direct, list, []), + ?assertEqual([], FinalLocal), + ?assertEqual([], FinalList), + ok. diff --git a/deps/rabbit/test/unit_pg_local_SUITE.erl b/deps/rabbit/test/unit_pg_local_SUITE.erl deleted file mode 100644 index 2b91bbf8308b..000000000000 --- a/deps/rabbit/test/unit_pg_local_SUITE.erl +++ /dev/null @@ -1,102 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - --module(unit_pg_local_SUITE). - --include_lib("eunit/include/eunit.hrl"). - --compile(export_all). - -all() -> - [ - {group, sequential_tests} - ]. - -groups() -> - [ - {sequential_tests, [], [ - pg_local, - pg_local_with_unexpected_deaths1, - pg_local_with_unexpected_deaths2 - ]} - ]. - - -pg_local(_Config) -> - [P, Q] = [spawn(fun () -> receive X -> X end end) || _ <- lists:seq(0, 1)], - check_pg_local(ok, [], []), - %% P joins group a, then b, then a again - check_pg_local(pg_local:join(a, P), [P], []), - check_pg_local(pg_local:join(b, P), [P], [P]), - check_pg_local(pg_local:join(a, P), [P, P], [P]), - %% Q joins group a, then b, then b again - check_pg_local(pg_local:join(a, Q), [P, P, Q], [P]), - check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q]), - check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q, Q]), - %% P leaves groups a and a - check_pg_local(pg_local:leave(a, P), [P, Q], [P, Q, Q]), - check_pg_local(pg_local:leave(b, P), [P, Q], [Q, Q]), - %% leave/2 is idempotent - check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]), - check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]), - %% clean up all processes - [begin X ! done, - Ref = erlang:monitor(process, X), - receive {'DOWN', Ref, process, X, _Info} -> ok end - end || X <- [P, Q]], - %% ensure the groups are empty - check_pg_local(ok, [], []), - passed. - -pg_local_with_unexpected_deaths1(_Config) -> - [A, B] = [spawn(fun () -> receive X -> X end end) || _ <- lists:seq(0, 1)], - check_pg_local(ok, [], []), - %% A joins groups a and b - check_pg_local(pg_local:join(a, A), [A], []), - check_pg_local(pg_local:join(b, A), [A], [A]), - %% B joins group b - check_pg_local(pg_local:join(b, B), [A], [A, B]), - - [begin erlang:exit(X, sleep_now_in_a_fire), - Ref = erlang:monitor(process, X), - receive {'DOWN', Ref, process, X, _Info} -> ok end - end || X <- [A, B]], - %% ensure the groups are empty - check_pg_local(ok, [], []), - ?assertNot(erlang:is_process_alive(A)), - ?assertNot(erlang:is_process_alive(B)), - - passed. - -pg_local_with_unexpected_deaths2(_Config) -> - [A, B] = [spawn(fun () -> receive X -> X end end) || _ <- lists:seq(0, 1)], - check_pg_local(ok, [], []), - %% A joins groups a and b - check_pg_local(pg_local:join(a, A), [A], []), - check_pg_local(pg_local:join(b, A), [A], [A]), - %% B joins group b - check_pg_local(pg_local:join(b, B), [A], [A, B]), - - %% something else yanks a record (or all of them) from the pg_local - %% bookkeeping table - ok = pg_local:clear(), - - [begin erlang:exit(X, sleep_now_in_a_fire), - Ref = erlang:monitor(process, X), - receive {'DOWN', Ref, process, X, _Info} -> ok end - end || X <- [A, B]], - %% ensure the groups are empty - check_pg_local(ok, [], []), - ?assertNot(erlang:is_process_alive(A)), - ?assertNot(erlang:is_process_alive(B)), - - passed. - -check_pg_local(ok, APids, BPids) -> - ok = pg_local:sync(), - ?assertEqual([true, true], [lists:sort(Pids) == lists:sort(pg_local:get_members(Key)) || - {Key, Pids} <- [{a, APids}, {b, BPids}]]). diff --git a/deps/rabbitmq_stream/src/rabbit_stream.erl b/deps/rabbitmq_stream/src/rabbit_stream.erl index e01774c62ac4..71f643727d57 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream.erl @@ -22,8 +22,7 @@ host/0, tls_host/0, port/0, - tls_port/0, - kill_connection/1]). + tls_port/0]). -export([stop/1]). -export([emit_connection_info_local/3, emit_connection_info_all/4, @@ -132,20 +131,6 @@ tls_port_from_listener() -> stop(_State) -> ok. -kill_connection(ConnectionName) -> - ConnectionNameBin = rabbit_data_coercion:to_binary(ConnectionName), - lists:foreach(fun(ConnectionPid) -> - ConnectionPid ! {infos, self()}, - receive - {ConnectionPid, - #{<<"connection_name">> := ConnectionNameBin}} -> - exit(ConnectionPid, kill); - {ConnectionPid, _ClientProperties} -> ok - after 1000 -> ok - end - end, - pg_local:get_members(rabbit_stream_connections)). - emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) -> Pids = [spawn_link(Node, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 888c0add7fda..d296a42ea374 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -505,10 +505,6 @@ transition_to_opened(Transport, Configuration, NewConnection, NewConnectionState) -> - % TODO remove registration to rabbit_stream_connections - % just meant to be able to close the connection remotely - % should be possible once the connections are available in ctl list_connections - pg_local:join(rabbit_stream_connections, self()), Connection1 = rabbit_event:init_stats_timer(NewConnection, #stream_connection.stats_timer), diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java index 016da1f59789..ed270cad6770 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java @@ -429,6 +429,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception { @Test void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception { + LOGGER.info("Stream name is {}", stream); executorService = Executors.newCachedThreadPool(); Client metadataClient = cf.get(new Client.ClientParameters().port(streamPortNode1())); Map metadata = metadataClient.metadata(stream); @@ -514,42 +515,64 @@ void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception { CountDownLatch reconnectionLatch = new CountDownLatch(1); AtomicReference shutdownListenerReference = new AtomicReference<>(); + Runnable resubscribe = + () -> { + AtomicInteger newReplicaPort = new AtomicInteger(-1); + waitAtMost( + Duration.ofSeconds(5), + () -> { + try { + Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); + newReplicaPort.set(m.getReplicas().get(0).getPort()); + LOGGER.info("Metadata: {}", m); + return true; + } catch (Exception e) { + return false; + } + }); + LOGGER.info("Replica port is {}", newReplicaPort); + + Client newConsumer = + cf.get( + new Client.ClientParameters() + .port(newReplicaPort.get()) + .shutdownListener(shutdownListenerReference.get()) + .chunkListener(credit()) + .messageListener(messageListener)); + + LOGGER.info("Subscribing..."); + newConsumer.subscribe( + (byte) 1, stream, OffsetSpecification.offset(lastProcessedOffset.get() + 1), 10); + LOGGER.info("Subscribed"); + + generation.incrementAndGet(); + reconnectionLatch.countDown(); + LOGGER.info("Shutdown listener done"); + }; Client.ShutdownListener shutdownListener = shutdownContext -> { + LOGGER.info("Shutdown reason: {}", shutdownContext.getShutdownReason()); if (shutdownContext.getShutdownReason() == Client.ShutdownContext.ShutdownReason.UNKNOWN) { // avoid long-running task in the IO thread executorService.submit( () -> { - AtomicInteger newReplicaPort = new AtomicInteger(-1); - waitAtMost( - Duration.ofSeconds(5), - () -> { - try { - Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); - newReplicaPort.set(m.getReplicas().get(0).getPort()); - return true; - } catch (Exception e) { - return false; - } - }); - - Client newConsumer = - cf.get( - new Client.ClientParameters() - .port(newReplicaPort.get()) - .shutdownListener(shutdownListenerReference.get()) - .chunkListener(credit()) - .messageListener(messageListener)); - - newConsumer.subscribe( - (byte) 1, - stream, - OffsetSpecification.offset(lastProcessedOffset.get() + 1), - 10); - - generation.incrementAndGet(); - reconnectionLatch.countDown(); + int attempts = 0; + while (attempts < 3) { + try { + resubscribe.run(); + break; + } catch (RuntimeException e) { + LOGGER.warn("Error while re-subscribing: {}", e.getMessage()); + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + break; + } + attempts++; + } + } }); } };