From f37be52ca2d4b4dac9635b5382aff4536cd54cc4 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Mon, 27 Oct 2025 15:56:04 +0100 Subject: [PATCH 1/6] Remove rabbit_stream:kill_connection/1 This function was not called from anywhere and was the only place that expected stream connections to be registered in pg_local. --- deps/rabbitmq_stream/src/rabbit_stream.erl | 17 +---------------- .../src/rabbit_stream_reader.erl | 4 ---- 2 files changed, 1 insertion(+), 20 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream.erl b/deps/rabbitmq_stream/src/rabbit_stream.erl index e01774c62ac4..71f643727d57 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream.erl @@ -22,8 +22,7 @@ host/0, tls_host/0, port/0, - tls_port/0, - kill_connection/1]). + tls_port/0]). -export([stop/1]). -export([emit_connection_info_local/3, emit_connection_info_all/4, @@ -132,20 +131,6 @@ tls_port_from_listener() -> stop(_State) -> ok. -kill_connection(ConnectionName) -> - ConnectionNameBin = rabbit_data_coercion:to_binary(ConnectionName), - lists:foreach(fun(ConnectionPid) -> - ConnectionPid ! {infos, self()}, - receive - {ConnectionPid, - #{<<"connection_name">> := ConnectionNameBin}} -> - exit(ConnectionPid, kill); - {ConnectionPid, _ClientProperties} -> ok - after 1000 -> ok - end - end, - pg_local:get_members(rabbit_stream_connections)). - emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) -> Pids = [spawn_link(Node, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 888c0add7fda..d296a42ea374 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -505,10 +505,6 @@ transition_to_opened(Transport, Configuration, NewConnection, NewConnectionState) -> - % TODO remove registration to rabbit_stream_connections - % just meant to be able to close the connection remotely - % should be possible once the connections are available in ctl list_connections - pg_local:join(rabbit_stream_connections, self()), Connection1 = rabbit_event:init_stats_timer(NewConnection, #stream_connection.stats_timer), From add71f3401745327feb52fdf3055c731670923b3 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Thu, 30 Apr 2026 15:08:08 +0200 Subject: [PATCH 2/6] Switch from pg_local to pg pg_local is based on the pg2 module, which got removed from Erlang/OTP years ago. It was replaced by the more efficient pg module, so use it directly for AMQP 0-9-1 channels, TCP and non-AMQP connection tracking, and related tests. We use node-local scopes and single-item groups so that (de)registration is fast, but we can list all connections and channels by listing all groups in the scope. rabbit_direct still uses pg_local for one more commit; the next commit moves it to a local ETS table because pg cannot represent those peers in all deployment topologies (see that commit message). --- deps/rabbit/src/rabbit.erl | 55 +++++++++++++++++++++-- deps/rabbit/src/rabbit_channel.erl | 12 +++-- deps/rabbit/src/rabbit_networking.erl | 17 ++++--- deps/rabbit/src/rabbit_volatile_queue.erl | 2 +- deps/rabbit/test/proxy_protocol_SUITE.erl | 18 ++++++-- deps/rabbit/test/quorum_queue_SUITE.erl | 17 +++++-- 6 files changed, 100 insertions(+), 21 deletions(-) diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index e2907f8d4965..a35dbd1a7042 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -30,7 +30,10 @@ base_product_version/0, motd_file/0, motd/0, - pg_local_scope/1]). + pg_local_scope/1, + pg_scope_amqp091_channel/0, + pg_scope_amqp091_connection/0, + pg_scope_non_amqp_connection/0]). %% For CLI, testing and mgmt-agent. -export([set_log_level/1, log_locations/0, config_files/0]). -export([is_booted/1, is_booted/0, is_booting/1, is_booting/0]). @@ -39,7 +42,11 @@ %% Boot steps. -export([update_cluster_tags/0, maybe_insert_default_data/0, boot_delegate/0, recover/0, pg_local_amqp_session/0, - pg_local_amqp_connection/0, prevent_startup_if_node_was_reset/0]). + pg_local_amqp_connection/0, + pg_local_amqp091_channel/0, + pg_local_amqp091_connection/0, + pg_local_non_amqp_connection/0, + prevent_startup_if_node_was_reset/0]). -rabbit_boot_step({pre_boot, [{description, "rabbit boot start"}]}). @@ -286,11 +293,29 @@ {enables, core_initialized}]}). -rabbit_boot_step({pg_local_amqp_connection, - [{description, "local-only pg scope for AMQP connections"}, + [{description, "local-only pg scope for AMQP 1.0 connections"}, {mfa, {rabbit, pg_local_amqp_connection, []}}, {requires, kernel_ready}, {enables, core_initialized}]}). +-rabbit_boot_step({pg_local_amqp091_channel, + [{description, "local-only pg scope for AMQP 0-9-1 channels"}, + {mfa, {rabbit, pg_local_amqp091_channel, []}}, + {requires, kernel_ready}, + {enables, core_initialized}]}). + +-rabbit_boot_step({pg_local_amqp091_connection, + [{description, "local-only pg scope for AMQP 0-9-1 connections"}, + {mfa, {rabbit, pg_local_amqp091_connection, []}}, + {requires, kernel_ready}, + {enables, core_initialized}]}). + +-rabbit_boot_step({pg_local_non_amqp_connection, + [{description, "local-only pg scope for non-AMQP connections"}, + {mfa, {rabbit, pg_local_non_amqp_connection, []}}, + {requires, kernel_ready}, + {enables, core_initialized}]}). + %%--------------------------------------------------------------------------- -include_lib("rabbit_common/include/rabbit.hrl"). @@ -1122,9 +1147,33 @@ pg_local_amqp_connection() -> PgScope = pg_local_scope(amqp_connection), rabbit_sup:start_child(pg_amqp_connection, pg, [PgScope]). +pg_local_amqp091_channel() -> + PgScope = pg_local_scope(amqp091_channel), + persistent_term:put(pg_scope_amqp091_channel, PgScope), + rabbit_sup:start_child(pg_amqp091_channel, pg, [PgScope]). + +pg_local_amqp091_connection() -> + PgScope = pg_local_scope(amqp091_connection), + persistent_term:put(pg_scope_amqp091_connection, PgScope), + rabbit_sup:start_child(pg_amqp091_connection, pg, [PgScope]). + +pg_local_non_amqp_connection() -> + PgScope = pg_local_scope(non_amqp_connection), + persistent_term:put(pg_scope_non_amqp_connection, PgScope), + rabbit_sup:start_child(pg_non_amqp_connection, pg, [PgScope]). + pg_local_scope(Prefix) -> list_to_atom(io_lib:format("~s_~s", [Prefix, node()])). +pg_scope_amqp091_channel() -> + persistent_term:get(pg_scope_amqp091_channel). + +pg_scope_amqp091_connection() -> + persistent_term:get(pg_scope_amqp091_connection). + +pg_scope_non_amqp_connection() -> + persistent_term:get(pg_scope_non_amqp_connection). + -spec update_cluster_tags() -> 'ok'. update_cluster_tags() -> diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 858040f068ed..9b1905f9a017 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -307,7 +307,9 @@ list() -> -spec list_local() -> [pid()]. list_local() -> - pg_local:get_members(rabbit_channels). + try pg:which_groups(pg_scope()) + catch error:badarg -> [] + end. -spec info_keys() -> rabbit_types:info_keys(). @@ -425,6 +427,10 @@ update_user_state(Pid, UserState) when is_pid(Pid) -> %%--------------------------------------------------------------------------- +-spec pg_scope() -> atom(). +pg_scope() -> + rabbit:pg_scope_amqp091_channel(). + init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, User, VHost, Capabilities, CollectorPid, LimiterPid, AmqpParams]) -> process_flag(trap_exit, true), @@ -433,7 +439,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, User, VHost, ?LG_PROCESS_TYPE(channel), ?store_proc_name({ConnName, Channel}), - ok = pg_local:join(rabbit_channels, self()), + ok = pg:join(pg_scope(), self(), self()), Flow = case rabbit_misc:get_env(rabbit, classic_queue_flow_control, true) of true -> flow; false -> noflow @@ -732,7 +738,7 @@ terminate(_Reason, queue_states = QueueCtxs}) -> rabbit_queue_type:close(QueueCtxs), {_Res, _State1} = notify_queues(State), - pg_local:leave(rabbit_channels, self()), + pg:leave(pg_scope(), self(), self()), rabbit_event:if_enabled(State, #ch.stats_timer, fun() -> emit_stats(State) end), [delete_stats(Tag) || {Tag, _} <- get()], diff --git a/deps/rabbit/src/rabbit_networking.erl b/deps/rabbit/src/rabbit_networking.erl index a03293dc46de..dc8813357272 100644 --- a/deps/rabbit/src/rabbit_networking.erl +++ b/deps/rabbit/src/rabbit_networking.erl @@ -468,13 +468,18 @@ node_client_listeners(Node) -> end, Xs) end. +pg_scope_amqp091_connection() -> + rabbit:pg_scope_amqp091_connection(). + -spec register_connection(pid()) -> ok. -register_connection(Pid) -> pg_local:join(rabbit_connections, Pid). +register_connection(Pid) -> + pg:join(pg_scope_amqp091_connection(), Pid, Pid). -spec unregister_connection(pid()) -> ok. -unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid). +unregister_connection(Pid) -> + pg:leave(pg_scope_amqp091_connection(), Pid, Pid). -spec connections() -> [rabbit_types:connection()]. connections() -> @@ -483,17 +488,17 @@ connections() -> -spec local_connections() -> [rabbit_types:connection()]. local_connections() -> - Amqp091Pids = pg_local:get_members(rabbit_connections), + Amqp091Pids = pg:which_groups(pg_scope_amqp091_connection()), Amqp10Pids = rabbit_amqp1_0:list_local(), Amqp10Pids ++ Amqp091Pids. -spec register_non_amqp_connection(pid()) -> ok. -register_non_amqp_connection(Pid) -> pg_local:join(rabbit_non_amqp_connections, Pid). +register_non_amqp_connection(Pid) -> pg:join(rabbit:pg_scope_non_amqp_connection(), Pid, Pid). -spec unregister_non_amqp_connection(pid()) -> ok. -unregister_non_amqp_connection(Pid) -> pg_local:leave(rabbit_non_amqp_connections, Pid). +unregister_non_amqp_connection(Pid) -> pg:leave(rabbit:pg_scope_non_amqp_connection(), Pid, Pid). -spec non_amqp_connections() -> [rabbit_types:connection()]. @@ -503,7 +508,7 @@ non_amqp_connections() -> -spec local_non_amqp_connections() -> [rabbit_types:connection()]. local_non_amqp_connections() -> - pg_local:get_members(rabbit_non_amqp_connections). + pg:which_local_groups(rabbit:pg_scope_non_amqp_connection()). -spec connection_info(rabbit_types:connection(), rabbit_types:info_keys()) -> rabbit_types:infos(). diff --git a/deps/rabbit/src/rabbit_volatile_queue.erl b/deps/rabbit/src/rabbit_volatile_queue.erl index ccb176a18785..3e6bc09726be 100644 --- a/deps/rabbit/src/rabbit_volatile_queue.erl +++ b/deps/rabbit/src/rabbit_volatile_queue.erl @@ -208,7 +208,7 @@ local_call(Pid, Request) -> is_local(Pid) -> rabbit_amqp_session:is_local(Pid) orelse - pg_local:in_group(rabbit_channels, Pid). + pg:get_local_members(rabbit:pg_scope_amqp091_channel(), Pid) =/= []. handle_event(QName, {deliver, Msg}, #?STATE{name = QName, ctag = Ctag, diff --git a/deps/rabbit/test/proxy_protocol_SUITE.erl b/deps/rabbit/test/proxy_protocol_SUITE.erl index 52b62630c625..8eacbb69f850 100644 --- a/deps/rabbit/test/proxy_protocol_SUITE.erl +++ b/deps/rabbit/test/proxy_protocol_SUITE.erl @@ -110,8 +110,14 @@ proxy_protocol_v2_local(Config) -> ok. connection_name() -> - ?awaitMatch([_], pg_local:get_members(rabbit_connections), 30000), - [Pid] = pg_local:get_members(rabbit_connections), + Scope = rabbit:pg_scope_amqp091_connection(), + GetGroups = fun() -> + try pg:which_groups(Scope) + catch error:badarg -> [] + end + end, + ?awaitMatch([_], GetGroups(), 30000), + [Pid] = GetGroups(), {dictionary, Dict} = process_info(Pid, dictionary), {process_name, {rabbit_reader, ConnectionName}} = lists:keyfind(process_name, 1, Dict), ConnectionName. @@ -119,6 +125,10 @@ connection_name() -> wait_for_connection_close(Config) -> ?awaitMatch( [], - rabbit_ct_broker_helpers:rpc( - Config, 0, pg_local, get_members, [rabbit_connnections]), + begin + Scope = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit, pg_scope_amqp091_connection, []), + try rabbit_ct_broker_helpers:rpc(Config, 0, pg, which_groups, [Scope]) + catch error:badarg -> [] + end + end, 30000). diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 921238319aee..948486094d1b 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -2983,8 +2983,17 @@ cleanup_queue_state_on_channel_after_publish(Config) -> [NCh1, NCh2] = rpc:call(Server, rabbit_channel, list, []) -- ChsBefore, %% Check the channel state contains the state for the quorum queue on %% channel 1 and 2 - wait_for_cleanup(Server, NCh1, 0), - wait_for_cleanup(Server, NCh2, 1), + %% Note: pg:get_local_members doesn't guarantee order, so we need to identify + %% which channel has queue state + {ChWithoutState, ChWithState} = case length(rpc:call(Server, + rabbit_channel, + list_queue_states, + [NCh1])) of + 0 -> {NCh1, NCh2}; + 1 -> {NCh2, NCh1} + end, + wait_for_cleanup(Server, ChWithoutState, 0), + wait_for_cleanup(Server, ChWithState, 1), %% then delete the queue and wait for the process to terminate ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})), @@ -2994,8 +3003,8 @@ cleanup_queue_state_on_channel_after_publish(Config) -> [?SUPNAME])) end, 30000), %% Check that all queue states have been cleaned - wait_for_cleanup(Server, NCh2, 0), - wait_for_cleanup(Server, NCh1, 0). + wait_for_cleanup(Server, ChWithState, 0), + wait_for_cleanup(Server, ChWithoutState, 0). cleanup_queue_state_on_channel_after_subscribe(Config) -> %% Declare/delete the queue and publish in one channel, while consuming on a From 5899ab4b5be27a68cc9a0c2d245cb76df8a1e59a Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Thu, 30 Apr 2026 15:08:18 +0200 Subject: [PATCH 3/6] rabbit_direct: track connections in ETS instead of pg pg only allows registering processes that are node local. Direct connections pass the client pid, which can be remote. Therefore, here we switch from pg_local to ETS, instead of pg. --- deps/rabbit/src/rabbit_direct.erl | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/deps/rabbit/src/rabbit_direct.erl b/deps/rabbit/src/rabbit_direct.erl index b5363b9bc6c2..3563495013dc 100644 --- a/deps/rabbit/src/rabbit_direct.erl +++ b/deps/rabbit/src/rabbit_direct.erl @@ -20,14 +20,18 @@ -include_lib("rabbit_common/include/rabbit_misc.hrl"). -include_lib("kernel/include/logger.hrl"). +-define(TABLE, ?MODULE). + %%---------------------------------------------------------------------------- -spec boot() -> 'ok'. -boot() -> rabbit_sup:start_supervisor_child( - rabbit_direct_client_sup, rabbit_client_sup, - [{local, rabbit_direct_client_sup}, - {rabbit_channel_sup, start_link, []}]). +boot() -> + ?TABLE = ets:new(?TABLE, [set, public, named_table]), + rabbit_sup:start_supervisor_child( + rabbit_direct_client_sup, rabbit_client_sup, + [{local, rabbit_direct_client_sup}, + {rabbit_channel_sup, start_link, []}]). -spec force_event_refresh(reference()) -> 'ok'. @@ -38,7 +42,7 @@ force_event_refresh(Ref) -> -spec list_local() -> [pid()]. list_local() -> - pg_local:get_members(rabbit_direct). + [Pid || {Pid} <- ets:tab2list(?TABLE)]. -spec list() -> [pid()]. @@ -186,7 +190,7 @@ connect1(User = #user{username = Username}, VHost, Pid, Infos) -> AuthzContext = proplists:get_value(variable_map, Infos, #{}), try rabbit_access_control:check_vhost_access(User, VHost, {ip, PeerHost}, AuthzContext) of - ok -> ok = pg_local:join(rabbit_direct, Pid), + ok -> ets:insert(?TABLE, {Pid}), rabbit_core_metrics:connection_created(Pid, Infos), rabbit_event:notify(connection_created, Infos), _ = rabbit_alarm:register( @@ -246,7 +250,7 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, -spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'. disconnect(Pid, Infos) -> - pg_local:leave(rabbit_direct, Pid), + ets:delete(?TABLE, Pid), rabbit_core_metrics:connection_closed(Pid), rabbit_event:notify(connection_closed, Infos). From 5263fdc6f2e009949303bf879b10a59f75862c5c Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Wed, 18 Feb 2026 11:53:08 +0100 Subject: [PATCH 4/6] Delete pg_local module --- deps/rabbit/Makefile | 2 +- deps/rabbit/src/pg_local.erl | 251 ----------------------- deps/rabbit/test/unit_pg_local_SUITE.erl | 102 --------- 3 files changed, 1 insertion(+), 354 deletions(-) delete mode 100644 deps/rabbit/src/pg_local.erl delete mode 100644 deps/rabbit/test/unit_pg_local_SUITE.erl diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index 192ad3918c1a..dbafc524a72e 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -263,7 +263,7 @@ PARALLEL_CT_SET_3_B = list_consumers_sanity_check list_queues_online_and_offline PARALLEL_CT_SET_3_C = cli_forget_cluster_node mc_unit message_size_limit PARALLEL_CT_SET_3_D = metrics mirrored_supervisor proxy_protocol runtime_parameters unit_rabbit_vm unit_stats_and_metrics unit_supervisor2 unit_vm_memory_monitor -PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_msg_interceptor rabbitmq_4_0_deprecations unit_pg_local unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue +PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_msg_interceptor rabbitmq_4_0_deprecations unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue PARALLEL_CT_SET_4_B = per_user_connection_tracking per_vhost_connection_limit rabbit_fifo_int unit_default_queue_type PARALLEL_CT_SET_4_C = msg_size_metrics unit_msg_size_metrics per_vhost_msg_store per_vhost_queue_limit priority_queue upgrade_preparation vhost PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info queue_type rabbitmq_queues_cli_integration rabbitmq_streams_cli_integration rabbitmqctl_integration rabbitmqctl_shutdown routing rabbit_amqqueue diff --git a/deps/rabbit/src/pg_local.erl b/deps/rabbit/src/pg_local.erl deleted file mode 100644 index 6397ea69a5c0..000000000000 --- a/deps/rabbit/src/pg_local.erl +++ /dev/null @@ -1,251 +0,0 @@ -%% This file is a copy of pg2.erl from the R13B-3 Erlang/OTP -%% distribution, with the following modifications: -%% -%% 1) Process groups are node-local only. -%% -%% 2) Groups are created/deleted implicitly. -%% -%% 3) 'join' and 'leave' are asynchronous. -%% -%% 4) the type specs of the exported non-callback functions have been -%% extracted into a separate, guarded section, and rewritten in -%% old-style spec syntax, for better compatibility with older -%% versions of Erlang/OTP. The remaining type specs have been -%% removed. - -%% All modifications are (C) 2007-2024 Broadcom. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. - -%% %CopyrightBegin% -%% -%% Copyright Ericsson AB 1997-2009. All Rights Reserved. -%% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at https://www.erlang.org/. -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. -%% -%% %CopyrightEnd% -%% --module(pg_local). - --include_lib("kernel/include/logger.hrl"). - --export([join/2, leave/2, get_members/1, in_group/2]). -%% intended for testing only; not part of official API --export([sync/0, clear/0]). --export([start/0, start_link/0, init/1, handle_call/3, handle_cast/2, - handle_info/2, terminate/2]). - -%%---------------------------------------------------------------------------- - --type name() :: term(). - -%%---------------------------------------------------------------------------- - --define(TABLE, pg_local_table). - -%%% -%%% Exported functions -%%% - --spec start_link() -> {'ok', pid()} | {'error', any()}. - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - --spec start() -> {'ok', pid()} | {'error', any()}. - -start() -> - ensure_started(). - --spec join(name(), pid()) -> 'ok'. - -join(Name, Pid) when is_pid(Pid) -> - _ = ensure_started(), - gen_server:cast(?MODULE, {join, Name, Pid}). - --spec leave(name(), pid()) -> 'ok'. - -leave(Name, Pid) when is_pid(Pid) -> - _ = ensure_started(), - gen_server:cast(?MODULE, {leave, Name, Pid}). - --spec get_members(name()) -> [pid()]. - -get_members(Name) -> - _ = ensure_started(), - group_members(Name). - --spec in_group(name(), pid()) -> boolean(). - -in_group(Name, Pid) -> - _ = ensure_started(), - %% The join message is a cast and thus can race, but we want to - %% keep it that way to be fast in the common case. - case member_present(Name, Pid) of - true -> true; - false -> sync(), - member_present(Name, Pid) - end. - --spec sync() -> 'ok'. - -sync() -> - _ = ensure_started(), - gen_server:call(?MODULE, sync, infinity). - -clear() -> - _ = ensure_started(), - gen_server:call(?MODULE, clear, infinity). - -%%% -%%% Callback functions from gen_server -%%% - --record(state, {}). - -init([]) -> - ?TABLE = ets:new(?TABLE, [ordered_set, protected, named_table]), - {ok, #state{}}. - -handle_call(sync, _From, S) -> - {reply, ok, S}; - -handle_call(clear, _From, S) -> - ets:delete_all_objects(?TABLE), - {reply, ok, S}; - -handle_call(Request, From, S) -> - ?LOG_WARNING("The pg_local server received an unexpected message:\n" - "handle_call(~tp, ~tp, _)\n", - [Request, From]), - {noreply, S}. - -handle_cast({join, Name, Pid}, S) -> - _ = join_group(Name, Pid), - {noreply, S}; -handle_cast({leave, Name, Pid}, S) -> - leave_group(Name, Pid), - {noreply, S}; -handle_cast(_, S) -> - {noreply, S}. - -handle_info({'DOWN', MonitorRef, process, Pid, _Info}, S) -> - member_died(MonitorRef, Pid), - {noreply, S}; -handle_info(_, S) -> - {noreply, S}. - -terminate(_Reason, _S) -> - true = ets:delete(?TABLE), - ok. - -%%% -%%% Local functions -%%% - -%%% One ETS table, pg_local_table, is used for bookkeeping. The type of the -%%% table is ordered_set, and the fast matching of partially -%%% instantiated keys is used extensively. -%%% -%%% {{ref, Pid}, MonitorRef, Counter} -%%% {{ref, MonitorRef}, Pid} -%%% Each process has one monitor. Counter is incremented when the -%%% Pid joins some group. -%%% {{member, Name, Pid}, _} -%%% Pid is a member of group Name, GroupCounter is incremented when the -%%% Pid joins the group Name. -%%% {{pid, Pid, Name}} -%%% Pid is a member of group Name. - -member_died(Ref, Pid) -> - _ = case ets:lookup(?TABLE, {ref, Ref}) of - [{{ref, Ref}, Pid}] -> - leave_all_groups(Pid); - %% in case the key has already been removed - %% we can clean up using the value from the DOWN message - _ -> - leave_all_groups(Pid) - end, - ok. - -leave_all_groups(Pid) -> - Names = member_groups(Pid), - _ = [leave_group(Name, P) || - Name <- Names, - P <- member_in_group(Pid, Name)]. - -join_group(Name, Pid) -> - Ref_Pid = {ref, Pid}, - _ = try ets:update_counter(?TABLE, Ref_Pid, {3, +1}) - catch _:_ -> - Ref = erlang:monitor(process, Pid), - true = ets:insert(?TABLE, {Ref_Pid, Ref, 1}), - true = ets:insert(?TABLE, {{ref, Ref}, Pid}) - end, - Member_Name_Pid = {member, Name, Pid}, - try _ = ets:update_counter(?TABLE, Member_Name_Pid, {2, +1}) - catch _:_ -> - true = ets:insert(?TABLE, {Member_Name_Pid, 1}), - true = ets:insert(?TABLE, {{pid, Pid, Name}}) - end. - -leave_group(Name, Pid) -> - Member_Name_Pid = {member, Name, Pid}, - try ets:update_counter(?TABLE, Member_Name_Pid, {2, -1}) of - N -> - if - N =:= 0 -> - true = ets:delete(?TABLE, {pid, Pid, Name}), - true = ets:delete(?TABLE, Member_Name_Pid); - true -> - ok - end, - Ref_Pid = {ref, Pid}, - case ets:update_counter(?TABLE, Ref_Pid, {3, -1}) of - 0 -> - [{Ref_Pid,Ref,0}] = ets:lookup(?TABLE, Ref_Pid), - true = ets:delete(?TABLE, {ref, Ref}), - true = ets:delete(?TABLE, Ref_Pid), - true = erlang:demonitor(Ref, [flush]), - ok; - _ -> - ok - end - catch _:_ -> - ok - end. - -group_members(Name) -> - [P || - [P, N] <- ets:match(?TABLE, {{member, Name, '$1'},'$2'}), - _ <- lists:seq(1, N)]. - -member_in_group(Pid, Name) -> - [{{member, Name, Pid}, N}] = ets:lookup(?TABLE, {member, Name, Pid}), - lists:duplicate(N, Pid). - -member_present(Name, Pid) -> - case ets:lookup(?TABLE, {member, Name, Pid}) of - [_] -> true; - [] -> false - end. - -member_groups(Pid) -> - [Name || [Name] <- ets:match(?TABLE, {{pid, Pid, '$1'}})]. - -ensure_started() -> - case whereis(?MODULE) of - undefined -> - C = {pg_local, {?MODULE, start_link, []}, permanent, - 16#ffffffff, worker, [?MODULE]}, - supervisor:start_child(kernel_safe_sup, C); - PgLocalPid -> - {ok, PgLocalPid} - end. diff --git a/deps/rabbit/test/unit_pg_local_SUITE.erl b/deps/rabbit/test/unit_pg_local_SUITE.erl deleted file mode 100644 index 2b91bbf8308b..000000000000 --- a/deps/rabbit/test/unit_pg_local_SUITE.erl +++ /dev/null @@ -1,102 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - --module(unit_pg_local_SUITE). - --include_lib("eunit/include/eunit.hrl"). - --compile(export_all). - -all() -> - [ - {group, sequential_tests} - ]. - -groups() -> - [ - {sequential_tests, [], [ - pg_local, - pg_local_with_unexpected_deaths1, - pg_local_with_unexpected_deaths2 - ]} - ]. - - -pg_local(_Config) -> - [P, Q] = [spawn(fun () -> receive X -> X end end) || _ <- lists:seq(0, 1)], - check_pg_local(ok, [], []), - %% P joins group a, then b, then a again - check_pg_local(pg_local:join(a, P), [P], []), - check_pg_local(pg_local:join(b, P), [P], [P]), - check_pg_local(pg_local:join(a, P), [P, P], [P]), - %% Q joins group a, then b, then b again - check_pg_local(pg_local:join(a, Q), [P, P, Q], [P]), - check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q]), - check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q, Q]), - %% P leaves groups a and a - check_pg_local(pg_local:leave(a, P), [P, Q], [P, Q, Q]), - check_pg_local(pg_local:leave(b, P), [P, Q], [Q, Q]), - %% leave/2 is idempotent - check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]), - check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]), - %% clean up all processes - [begin X ! done, - Ref = erlang:monitor(process, X), - receive {'DOWN', Ref, process, X, _Info} -> ok end - end || X <- [P, Q]], - %% ensure the groups are empty - check_pg_local(ok, [], []), - passed. - -pg_local_with_unexpected_deaths1(_Config) -> - [A, B] = [spawn(fun () -> receive X -> X end end) || _ <- lists:seq(0, 1)], - check_pg_local(ok, [], []), - %% A joins groups a and b - check_pg_local(pg_local:join(a, A), [A], []), - check_pg_local(pg_local:join(b, A), [A], [A]), - %% B joins group b - check_pg_local(pg_local:join(b, B), [A], [A, B]), - - [begin erlang:exit(X, sleep_now_in_a_fire), - Ref = erlang:monitor(process, X), - receive {'DOWN', Ref, process, X, _Info} -> ok end - end || X <- [A, B]], - %% ensure the groups are empty - check_pg_local(ok, [], []), - ?assertNot(erlang:is_process_alive(A)), - ?assertNot(erlang:is_process_alive(B)), - - passed. - -pg_local_with_unexpected_deaths2(_Config) -> - [A, B] = [spawn(fun () -> receive X -> X end end) || _ <- lists:seq(0, 1)], - check_pg_local(ok, [], []), - %% A joins groups a and b - check_pg_local(pg_local:join(a, A), [A], []), - check_pg_local(pg_local:join(b, A), [A], [A]), - %% B joins group b - check_pg_local(pg_local:join(b, B), [A], [A, B]), - - %% something else yanks a record (or all of them) from the pg_local - %% bookkeeping table - ok = pg_local:clear(), - - [begin erlang:exit(X, sleep_now_in_a_fire), - Ref = erlang:monitor(process, X), - receive {'DOWN', Ref, process, X, _Info} -> ok end - end || X <- [A, B]], - %% ensure the groups are empty - check_pg_local(ok, [], []), - ?assertNot(erlang:is_process_alive(A)), - ?assertNot(erlang:is_process_alive(B)), - - passed. - -check_pg_local(ok, APids, BPids) -> - ok = pg_local:sync(), - ?assertEqual([true, true], [lists:sort(Pids) == lists:sort(pg_local:get_members(Key)) || - {Key, Pids} <- [{a, APids}, {b, BPids}]]). From 5490d9251664a8e034c8dec0c8a1b275d19a5770 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 3 Nov 2025 17:30:20 +0100 Subject: [PATCH 5/6] Retry connection attempt in stream test The metadata can take a few milliseconds to reflect a node is down, so the test code must consider a connection failure and test another node. --- .../java/com/rabbitmq/stream/FailureTest.java | 81 ++++++++++++------- 1 file changed, 52 insertions(+), 29 deletions(-) diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java index 016da1f59789..ed270cad6770 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java @@ -429,6 +429,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception { @Test void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception { + LOGGER.info("Stream name is {}", stream); executorService = Executors.newCachedThreadPool(); Client metadataClient = cf.get(new Client.ClientParameters().port(streamPortNode1())); Map metadata = metadataClient.metadata(stream); @@ -514,42 +515,64 @@ void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception { CountDownLatch reconnectionLatch = new CountDownLatch(1); AtomicReference shutdownListenerReference = new AtomicReference<>(); + Runnable resubscribe = + () -> { + AtomicInteger newReplicaPort = new AtomicInteger(-1); + waitAtMost( + Duration.ofSeconds(5), + () -> { + try { + Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); + newReplicaPort.set(m.getReplicas().get(0).getPort()); + LOGGER.info("Metadata: {}", m); + return true; + } catch (Exception e) { + return false; + } + }); + LOGGER.info("Replica port is {}", newReplicaPort); + + Client newConsumer = + cf.get( + new Client.ClientParameters() + .port(newReplicaPort.get()) + .shutdownListener(shutdownListenerReference.get()) + .chunkListener(credit()) + .messageListener(messageListener)); + + LOGGER.info("Subscribing..."); + newConsumer.subscribe( + (byte) 1, stream, OffsetSpecification.offset(lastProcessedOffset.get() + 1), 10); + LOGGER.info("Subscribed"); + + generation.incrementAndGet(); + reconnectionLatch.countDown(); + LOGGER.info("Shutdown listener done"); + }; Client.ShutdownListener shutdownListener = shutdownContext -> { + LOGGER.info("Shutdown reason: {}", shutdownContext.getShutdownReason()); if (shutdownContext.getShutdownReason() == Client.ShutdownContext.ShutdownReason.UNKNOWN) { // avoid long-running task in the IO thread executorService.submit( () -> { - AtomicInteger newReplicaPort = new AtomicInteger(-1); - waitAtMost( - Duration.ofSeconds(5), - () -> { - try { - Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); - newReplicaPort.set(m.getReplicas().get(0).getPort()); - return true; - } catch (Exception e) { - return false; - } - }); - - Client newConsumer = - cf.get( - new Client.ClientParameters() - .port(newReplicaPort.get()) - .shutdownListener(shutdownListenerReference.get()) - .chunkListener(credit()) - .messageListener(messageListener)); - - newConsumer.subscribe( - (byte) 1, - stream, - OffsetSpecification.offset(lastProcessedOffset.get() + 1), - 10); - - generation.incrementAndGet(); - reconnectionLatch.countDown(); + int attempts = 0; + while (attempts < 3) { + try { + resubscribe.run(); + break; + } catch (RuntimeException e) { + LOGGER.warn("Error while re-subscribing: {}", e.getMessage()); + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + break; + } + attempts++; + } + } }); } }; From af03555d164ed2250ef220099857a0173ed08c64 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Thu, 30 Apr 2026 15:35:23 +0200 Subject: [PATCH 6/6] Add CT for rabbit_direct list and list_local --- deps/rabbit/Makefile | 2 +- deps/rabbit/test/rabbit_direct_SUITE.erl | 81 ++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 1 deletion(-) create mode 100644 deps/rabbit/test/rabbit_direct_SUITE.erl diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index dbafc524a72e..d14486bbf054 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -249,7 +249,7 @@ define ct_master.erl endef PARALLEL_CT_SET_1_A = unit_rabbit_ssl unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking -PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_filter_prop amqp_filter_sql amqp_filter_sql_unit amqp_dotnet server_named_queue_prefix_prop signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management +PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_filter_prop amqp_filter_sql amqp_filter_sql_unit amqp_dotnet server_named_queue_prefix_prop signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management rabbit_direct PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange cluster_limit cluster_minority default_queue_type_prop term_to_binary_compat_prop topic_permission unicode unit_access_control user_tags_count_limit PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit server_named_queue_prefix diff --git a/deps/rabbit/test/rabbit_direct_SUITE.erl b/deps/rabbit/test/rabbit_direct_SUITE.erl new file mode 100644 index 000000000000..0813f5659ad4 --- /dev/null +++ b/deps/rabbit/test/rabbit_direct_SUITE.erl @@ -0,0 +1,81 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rabbit_direct_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-export([all/0, groups/0]). +-export([init_per_suite/1, end_per_suite/1, + init_per_group/2, end_per_group/2, + init_per_testcase/2, end_per_testcase/2]). +-export([direct_connection_registered/1]). + +all() -> + [{group, tests}]. + +groups() -> + [{tests, [], [direct_connection_registered]}]. + +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + Config1 = rabbit_ct_helpers:set_config(Config, [{rmq_nodename_suffix, Testcase}]), + rabbit_ct_helpers:run_setup_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%% ------------------------------------------------------------------- + +direct_connection_registered(Config) -> + Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + BeforeLocal = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_direct, list_local, []), + BeforeList = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_direct, list, []), + ?assertEqual([], BeforeLocal), + ?assertEqual([], BeforeList), + + Params = #amqp_params_direct{node = Node, + virtual_host = <<"/">>, + username = <<"guest">>, + password = <<"guest">>}, + {ok, Conn} = amqp_connection:start(Params), + true = is_pid(Conn), + + AfterLocal = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_direct, list_local, []), + AfterList = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_direct, list, []), + ?assertEqual([Conn], AfterLocal), + ?assertEqual([Conn], AfterList), + + ok = amqp_connection:close(Conn), + + FinalLocal = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_direct, list_local, []), + FinalList = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_direct, list, []), + ?assertEqual([], FinalLocal), + ?assertEqual([], FinalList), + ok.