Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose AMQP connection metrics #12638

Merged
merged 1 commit into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 76 additions & 32 deletions deps/rabbit/src/rabbit_amqp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("amqp10_common/include/amqp10_types.hrl").
-include("rabbit_amqp_reader.hrl").
-include("rabbit_amqp.hrl").

-export([init/1,
Expand Down Expand Up @@ -79,7 +80,8 @@
pending_recv :: boolean(),
buf :: list(),
buf_len :: non_neg_integer(),
tracked_channels :: #{channel_number() => Session :: pid()}
tracked_channels :: #{channel_number() => Session :: pid()},
stats_timer :: rabbit_event:state()
}).

-type state() :: #v1{}.
Expand All @@ -90,7 +92,7 @@

unpack_from_0_9_1(
{Sock, PendingRecv, SupPid, Buf, BufLen, ProxySocket,
ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt},
ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer},
Parent) ->
logger:update_process_metadata(#{connection => ConnectionName}),
#v1{parent = Parent,
Expand All @@ -106,6 +108,7 @@ unpack_from_0_9_1(
tracked_channels = maps:new(),
writer = none,
connection_state = received_amqp3100,
stats_timer = StatsTimer,
connection = #v1_connection{
name = ConnectionName,
container_id = none,
Expand Down Expand Up @@ -201,6 +204,10 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
end
end.

handle_other(emit_stats, State) ->
emit_stats(State);
handle_other(ensure_stats_timer, State) ->
ensure_stats_timer(State);
handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
ReasonString = rabbit_misc:format("broker forced connection closure with reason '~w'",
[Reason]),
Expand Down Expand Up @@ -247,8 +254,16 @@ handle_other({'$gen_call', From, {info, Items}}, State) ->
end,
gen_server:reply(From, Reply),
State;
handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) ->
State;
handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) ->
case ?IS_RUNNING(State) of
true ->
Infos = infos(?CONNECTION_EVENT_KEYS, State),
rabbit_event:notify(connection_created, Infos, Ref),
rabbit_event:init_stats_timer(State, #v1.stats_timer);
false ->
%% Ignore, we will emit a connection_created event once we start running.
State
end;
handle_other(terminate_connection, _State) ->
stop;
handle_other({set_credential, Cred}, State) ->
Expand Down Expand Up @@ -527,6 +542,7 @@ handle_connection_frame(
proplists:get_value(pid, Infos),
Infos),
ok = rabbit_event:notify(connection_created, Infos),
ok = maybe_emit_stats(State),
ok = rabbit_amqp1_0:register_connection(self()),
Caps = [%% https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html#_Toc51331306
<<"LINK_PAIR_V1_0">>,
Expand Down Expand Up @@ -629,25 +645,26 @@ handle_input(handshake,
switch_callback(State, {frame_header, amqp}, 8);
handle_input({frame_header, Mode},
Header = <<Size:32, DOff:8, Type:8, Channel:16>>,
State) when DOff >= 2 ->
State0) when DOff >= 2 ->
case {Mode, Type} of
{amqp, 0} -> ok;
{sasl, 1} -> ok;
_ -> throw({bad_1_0_header_type, Header, Mode})
_ -> throw({bad_1_0_header_type, Header, Mode})
end,
MaxFrameSize = State#v1.connection#v1_connection.incoming_max_frame_size,
if Size =:= 8 ->
%% heartbeat
State;
Size > MaxFrameSize ->
handle_exception(
State, Channel, error_frame(
?V_1_0_CONNECTION_ERROR_FRAMING_ERROR,
"frame size (~b bytes) > maximum frame size (~b bytes)",
[Size, MaxFrameSize]));
true ->
switch_callback(State, {frame_body, Mode, DOff, Channel}, Size - 8)
end;
MaxFrameSize = State0#v1.connection#v1_connection.incoming_max_frame_size,
State = if Size =:= 8 ->
%% heartbeat
State0;
Size > MaxFrameSize ->
Err = error_frame(
?V_1_0_CONNECTION_ERROR_FRAMING_ERROR,
"frame size (~b bytes) > maximum frame size (~b bytes)",
[Size, MaxFrameSize]),
handle_exception(State0, Channel, Err);
true ->
switch_callback(State0, {frame_body, Mode, DOff, Channel}, Size - 8)
end,
ensure_stats_timer(State);
handle_input({frame_header, _Mode}, Malformed, _State) ->
throw({bad_1_0_header, Malformed});
handle_input({frame_body, Mode, DOff, Channel},
Expand Down Expand Up @@ -1013,13 +1030,18 @@ i(peer_host, #v1{connection = #v1_connection{peer_host = Val}}) ->
Val;
i(peer_port, #v1{connection = #v1_connection{peer_port = Val}}) ->
Val;
i(SockStat, S) when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
SockStat =:= send_cnt;
SockStat =:= send_pend ->
socket_info(fun (Sock) -> rabbit_net:getstat(Sock, [SockStat]) end,
fun ([{_, I}]) -> I end, S);
i(SockStat, #v1{sock = Sock})
when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
SockStat =:= send_cnt;
SockStat =:= send_pend ->
case rabbit_net:getstat(Sock, [SockStat]) of
{ok, [{SockStat, Val}]} ->
Val;
{error, _} ->
''
end;
i(ssl, #v1{sock = Sock}) -> rabbit_net:is_ssl(Sock);
i(SSL, #v1{sock = Sock, proxy_socket = ProxySock})
when SSL =:= ssl_protocol;
Expand All @@ -1045,15 +1067,37 @@ i(channels, #v1{tracked_channels = Channels}) ->
maps:size(Channels);
i(channel_max, #v1{connection = #v1_connection{channel_max = Max}}) ->
Max;
i(reductions = Item, _State) ->
{Item, Reductions} = erlang:process_info(self(), Item),
Reductions;
i(garbage_collection, _State) ->
rabbit_misc:get_gc_info(self());
i(Item, #v1{}) ->
throw({bad_argument, Item}).

%% From rabbit_reader
socket_info(Get, Select, #v1{sock = Sock}) ->
case Get(Sock) of
{ok, T} -> Select(T);
{error, _} -> ''
end.
maybe_emit_stats(State) ->
ok = rabbit_event:if_enabled(
State,
#v1.stats_timer,
fun() -> emit_stats(State) end).

emit_stats(State) ->
[{_, Pid},
{_, RecvOct},
{_, SendOct},
{_, Reductions}] = infos(?SIMPLE_METRICS, State),
Infos = infos(?OTHER_METRICS, State),
rabbit_core_metrics:connection_stats(Pid, Infos),
rabbit_core_metrics:connection_stats(Pid, RecvOct, SendOct, Reductions),
%% NB: Don't call ensure_stats_timer because it becomes expensive
%% if all idle non-hibernating connections emit stats.
rabbit_event:reset_stats_timer(State, #v1.stats_timer).

ensure_stats_timer(State)
when ?IS_RUNNING(State) ->
rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats);
ensure_stats_timer(State) ->
State.

ignore_maintenance({map, Properties}) ->
lists:member(
Expand Down
17 changes: 17 additions & 0 deletions deps/rabbit/src/rabbit_amqp_reader.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

-define(SIMPLE_METRICS, [pid,
recv_oct,
send_oct,
reductions]).

-define(OTHER_METRICS, [recv_cnt,
send_cnt,
send_pend,
state,
channels,
garbage_collection]).
21 changes: 14 additions & 7 deletions deps/rabbit/src/rabbit_amqp_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
pending :: iolist(),
%% This field is just an optimisation to minimize the cost of erlang:iolist_size/1
pending_size :: non_neg_integer(),
monitored_sessions :: #{pid() => true}
monitored_sessions :: #{pid() => true},
stats_timer :: rabbit_event:state()
}).

-define(HIBERNATE_AFTER, 6_000).
Expand Down Expand Up @@ -100,7 +101,8 @@ init({Sock, ReaderPid}) ->
reader = ReaderPid,
pending = [],
pending_size = 0,
monitored_sessions = #{}},
monitored_sessions = #{},
stats_timer = rabbit_event:init_stats_timer()},
process_flag(message_queue_data, off_heap),
{ok, State}.

Expand All @@ -123,6 +125,10 @@ handle_call({send_command, ChannelNum, Performative}, _From, State0) ->
State = flush(State1),
{reply, ok, State}.

handle_info(emit_stats, State0 = #state{reader = ReaderPid}) ->
ReaderPid ! ensure_stats_timer,
State = rabbit_event:reset_stats_timer(State0, #state.stats_timer),
no_reply(State);
handle_info(timeout, State0) ->
State = flush(State0),
{noreply, State};
Expand Down Expand Up @@ -223,18 +229,19 @@ tcp_send(Sock, Data) ->

maybe_flush(State = #state{pending_size = PendingSize}) ->
case PendingSize > ?FLUSH_THRESHOLD of
true -> flush(State);
true -> flush(State);
false -> State
end.

flush(State = #state{pending = []}) ->
State;
flush(State = #state{sock = Sock,
pending = Pending}) ->
flush(State0 = #state{sock = Sock,
pending = Pending}) ->
case rabbit_net:send(Sock, lists:reverse(Pending)) of
ok ->
State#state{pending = [],
pending_size = 0};
State = State0#state{pending = [],
pending_size = 0},
rabbit_event:ensure_stats_timer(State, #state.stats_timer, emit_stats);
{error, Reason} ->
exit({writer, send_failed, Reason})
end.
18 changes: 8 additions & 10 deletions deps/rabbit/src/rabbit_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

-include_lib("rabbit_common/include/rabbit_framing.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include("rabbit_amqp_reader.hrl").

-export([start_link/2, info/2, force_event_refresh/2,
shutdown/2]).
Expand Down Expand Up @@ -116,10 +117,6 @@
connection_blocked_message_sent
}).

-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]).
-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, state, channels,
garbage_collection]).

-define(CREATION_EVENT_KEYS,
[pid, name, port, peer_port, host,
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
Expand Down Expand Up @@ -1582,8 +1579,8 @@ i(state, #v1{connection_state = ConnectionState,
end;
i(garbage_collection, _State) ->
rabbit_misc:get_gc_info(self());
i(reductions, _State) ->
{reductions, Reductions} = erlang:process_info(self(), reductions),
i(reductions = Item, _State) ->
{Item, Reductions} = erlang:process_info(self(), Item),
Reductions;
i(Item, #v1{connection = Conn}) -> ic(Item, Conn).

Expand Down Expand Up @@ -1623,12 +1620,12 @@ maybe_emit_stats(State) ->

emit_stats(State) ->
[{_, Pid},
{_, Recv_oct},
{_, Send_oct},
{_, RecvOct},
{_, SendOct},
{_, Reductions}] = infos(?SIMPLE_METRICS, State),
Infos = infos(?OTHER_METRICS, State),
rabbit_core_metrics:connection_stats(Pid, Infos),
rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions),
rabbit_core_metrics:connection_stats(Pid, RecvOct, SendOct, Reductions),
State1 = rabbit_event:reset_stats_timer(State, #v1.stats_timer),
ensure_stats_timer(State1).

Expand All @@ -1643,6 +1640,7 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock,
pending_recv = PendingRecv,
helper_sup = {_HelperSup091, HelperSup10},
proxy_socket = ProxySocket,
stats_timer = StatsTimer,
connection = #connection{
name = Name,
host = Host,
Expand All @@ -1651,7 +1649,7 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock,
peer_port = PeerPort,
connected_at = ConnectedAt}}) ->
{Sock, PendingRecv, HelperSup10, Buf, BufLen, ProxySocket,
Name, Host, PeerHost, Port, PeerPort, ConnectedAt}.
Name, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer}.

respond_and_close(State, Channel, Protocol, Reason, LogErr) ->
log_hard_error(State, Channel, LogErr),
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit_common/src/rabbit_core_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ connection_stats(Pid, Infos) ->
ets:insert(connection_metrics, {Pid, Infos}),
ok.

connection_stats(Pid, Recv_oct, Send_oct, Reductions) ->
connection_stats(Pid, RecvOct, SendOct, Reductions) ->
%% Includes delete marker
ets:insert(connection_coarse_metrics, {Pid, Recv_oct, Send_oct, Reductions, 0}),
ets:insert(connection_coarse_metrics, {Pid, RecvOct, SendOct, Reductions, 0}),
ok.

channel_created(Pid, Infos) ->
Expand Down
29 changes: 20 additions & 9 deletions deps/rabbit_common/src/rabbit_event.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
-include("rabbit.hrl").

-export([start_link/0]).
-export([init_stats_timer/2, init_disabled_stats_timer/2,
-export([init_stats_timer/0, init_stats_timer/2, init_disabled_stats_timer/2,
ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]).
-export([stats_level/2, if_enabled/3]).
-export([notify/2, notify/3, notify_if/3]).
Expand Down Expand Up @@ -89,23 +89,34 @@ start_link() ->
%% Nowadays, instead of sending a message to rabbit_event via notify(stats),
%% some stat-emitting objects update ETS tables directly via module rabbit_core_metrics.

init_stats_timer(C, P) ->
-spec init_stats_timer() -> state().
init_stats_timer() ->
%% If the rabbit app is not loaded - use default none:5000
StatsLevel = application:get_env(rabbit, collect_statistics, none),
Interval = application:get_env(rabbit, collect_statistics_interval, 5000),
setelement(P, C, #state{level = StatsLevel, interval = Interval,
timer = undefined}).
Interval = application:get_env(rabbit, collect_statistics_interval, 5000),
#state{level = StatsLevel,
interval = Interval,
timer = undefined}.

init_stats_timer(C, P) ->
State = init_stats_timer(),
setelement(P, C, State).

init_disabled_stats_timer(C, P) ->
setelement(P, C, #state{level = none, interval = 0, timer = undefined}).
State = #state{level = none,
interval = 0,
timer = undefined},
setelement(P, C, State).

ensure_stats_timer(C, P, Msg) ->
case element(P, C) of
#state{level = Level, interval = Interval, timer = undefined} = State
#state{level = Level,
interval = Interval,
timer = undefined} = State
when Level =/= none ->
TRef = erlang:send_after(Interval, self(), Msg),
setelement(P, C, State#state{timer = TRef});
#state{} ->
_State ->
C
end.

Expand Down Expand Up @@ -156,5 +167,5 @@ event_cons(Type, Props, Ref) ->
#event{type = Type,
props = Props,
reference = Ref,
timestamp = os:system_time(milli_seconds)}.
timestamp = os:system_time(millisecond)}.

Loading
Loading