Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
880b188
Allow configuring classic queue data dir in Cuttlefish config
the-mikedavis Oct 31, 2025
f85e436
rabbit_alarm: Prefer maps to dicts
the-mikedavis Oct 21, 2025
8778bad
rabbit_env: Enable disksup in os_mon but set threshold to 1.0
the-mikedavis Sep 16, 2025
a103ef8
rabbit_disk_monitor: Use disksup to determine available bytes
the-mikedavis Sep 16, 2025
58e41c5
rabbit.schema: Add config options for per-queue-type disk limits
the-mikedavis Oct 16, 2025
2d0ca06
rabbit_disk_monitor: Monitor per-queue-type mounts
the-mikedavis Oct 21, 2025
aa8c142
rabbit_alarm: Add a helper to format resource alarm sources
the-mikedavis Oct 23, 2025
88e7830
Set per-queue-type disk alarms for configured mounts
the-mikedavis Oct 23, 2025
a0a43ce
AMQP 0-9-1: Handle per-queue-type disk alarms
the-mikedavis Oct 23, 2025
4f01a6a
prometheus: Add core metrics per-mount for free space and limit
the-mikedavis Oct 30, 2025
c903920
management: Expose current mount stats in API and UI
the-mikedavis Oct 31, 2025
03df6e9
rabbit_disk_monitor: Add config to tune polling interval
the-mikedavis Oct 31, 2025
56654c8
CLI: Extend set_disk_free_limit to set mount limits
the-mikedavis Oct 31, 2025
336aef0
rabbit_stream_reader: Block during stream queue-type disk alarm
the-mikedavis Nov 4, 2025
df8552d
MQTT: Handle per-queue-type disk alarms
the-mikedavis Nov 10, 2025
c690460
AMQP 1.0: Handle per-queue-type disk alarms
the-mikedavis Nov 10, 2025
8a3ee94
rabbit_disk_monitor: Document NaN alarm behavior
lukebakken Feb 4, 2026
e467887
unit_disk_monitor_SUITE: Test that duplicate mount paths are allowed
lukebakken Mar 4, 2026
09eee96
rabbit.schema: Validate that disk_free_limits entries have all requir…
lukebakken Mar 9, 2026
8b5860c
rabbit_alarm: Clarify log message for per-queue-type disk alarms
lukebakken Mar 9, 2026
e64b19b
rabbit_disk_monitor: Handle multiple disks in resolve_data_dir/0
lukebakken Mar 9, 2026
fece613
amqp_client_SUITE: Test per-queue-type disk alarm blocking for direct…
lukebakken Mar 9, 2026
79c4fa3
amqp_client: Per-channel queue_types_published tracking for direct co…
lukebakken Mar 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion deps/amqp_client/src/amqp_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,11 @@ start_link(Driver, Connection, ChannelNumber, Consumer, Identity) ->
gen_server:start_link(
?MODULE, [Driver, Connection, ChannelNumber, Consumer, Identity], []).

set_writer(Pid, Writer) ->
set_writer(Pid, {network, Writer}) ->
set_writer(Pid, Writer);
set_writer(Pid, {direct, Writer}) ->
set_writer(Pid, Writer);
set_writer(Pid, Writer) when is_pid(Pid) andalso is_pid(Writer) ->
gen_server:cast(Pid, {set_writer, Writer}).

enable_delivery_flow_control(Pid) ->
Expand Down
19 changes: 11 additions & 8 deletions deps/amqp_client/src/amqp_channel_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ start_link(Type, Connection, ConnName, InfraArgs, ChNumber,
modules => [amqp_channel]},
{ok, ChPid} = supervisor:start_child(Sup, ChildSpec),
case start_writer(Sup, Type, InfraArgs, ConnName, ChNumber, ChPid) of
{ok, Writer} ->
amqp_channel:set_writer(ChPid, Writer),
{ok, TaggedWriter} ->
amqp_channel:set_writer(ChPid, TaggedWriter),
{ok, AState} = init_command_assembler(Type),
{ok, Sup, {ChPid, AState}};
{ok, Sup, {ChPid, TaggedWriter, AState}};
{error, _}=Error ->
Error
end.
Expand All @@ -58,12 +58,11 @@ start_writer(_Sup, direct, [ConnPid, Node, User, VHost, Collector, AmqpParams],
case rpc:call(Node, rabbit_direct, start_channel,
[ChNumber, ChPid, ConnPid, ConnName, ?PROTOCOL, User,
VHost, ?CLIENT_CAPABILITIES, Collector, AmqpParams], ?DIRECT_OPERATION_TIMEOUT) of
{ok, _Writer} = Reply ->
Reply;
{ok, Writer} ->
{ok, {direct, Writer}};
{badrpc, Reason} ->
{error, {Reason, Node}};
Error ->
Error
Error -> Error
end;
start_writer(Sup, network, [Sock, FrameMax], ConnName, ChNumber, ChPid) ->
GCThreshold = application:get_env(amqp_client, writer_gc_threshold, ?DEFAULT_GC_THRESHOLD),
Expand All @@ -76,7 +75,11 @@ start_writer(Sup, network, [Sock, FrameMax], ConnName, ChNumber, ChPid) ->
shutdown => ?WORKER_WAIT,
type => worker,
modules => [rabbit_writer]},
supervisor:start_child(Sup, ChildSpec).
case supervisor:start_child(Sup, ChildSpec) of
{ok, Writer} ->
{ok, {network, Writer}};
Error -> Error
end.

init_command_assembler(direct) -> {ok, none};
init_command_assembler(network) -> rabbit_command_assembler:init().
Expand Down
30 changes: 20 additions & 10 deletions deps/amqp_client/src/amqp_channels_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
channel_sup_sup,
map_num_pa = gb_trees:empty(), %% Number -> {Pid, AState}
map_pid_num = #{}, %% Pid -> Number
map_pid_writer = #{}, %% Pid -> {direct|network, WriterPid}
channel_max = ?MAX_CHANNEL_NUMBER,
closing = false}).

Expand Down Expand Up @@ -105,10 +106,10 @@ handle_open_channel(ProposedNumber, Consumer, InfraArgs,
State = #state{channel_sup_sup = ChSupSup}) ->
case new_number(ProposedNumber, State) of
{ok, Number} ->
{ok, _ChSup, {Ch, AState}} =
{ok, _ChSup, {Ch, TaggedWriter, AState}} =
amqp_channel_sup_sup:start_channel_sup(ChSupSup, InfraArgs,
Number, Consumer),
NewState = internal_register(Number, Ch, AState, State),
NewState = internal_register(Number, Ch, TaggedWriter, AState, State),
erlang:monitor(process, Ch),
{reply, {ok, Ch}, NewState};
{error, _} = Error ->
Expand Down Expand Up @@ -155,12 +156,15 @@ handle_down(Pid, Reason, State) ->
Number -> handle_channel_down(Pid, Number, Reason, State)
end.

handle_channel_down(Pid, Number, Reason, State) ->
handle_channel_down(Pid, Number, Reason, State = #state{connection = Connection,
map_pid_writer = MapPW}) ->
maybe_report_down(Pid, case Reason of {shutdown, R} -> R;
_ -> Reason
end,
State),
TaggedWriter = maps:get(Pid, MapPW),
NewState = internal_unregister(Number, Pid, State),
amqp_gen_connection:channel_closed(Connection, TaggedWriter),
check_all_channels_terminated(NewState),
{noreply, NewState}.

Expand Down Expand Up @@ -211,19 +215,25 @@ internal_pass_frame(Number, Frame, State) ->
internal_update_npa(Number, ChPid, NewAState, State)
end.

internal_register(Number, Pid, AState,
State = #state{map_num_pa = MapNPA, map_pid_num = MapPN}) ->
internal_register(Number, Pid, TaggedWriter, AState,
State = #state{map_num_pa = MapNPA, map_pid_num = MapPN,
map_pid_writer = MapPW}) ->
MapNPA1 = gb_trees:enter(Number, {Pid, AState}, MapNPA),
MapPN1 = maps:put(Pid, Number, MapPN),
State#state{map_num_pa = MapNPA1,
map_pid_num = MapPN1}.
MapPW1 = maps:put(Pid, TaggedWriter, MapPW),
State#state{map_num_pa = MapNPA1,
map_pid_num = MapPN1,
map_pid_writer = MapPW1}.

internal_unregister(Number, Pid,
State = #state{map_num_pa = MapNPA, map_pid_num = MapPN}) ->
State = #state{map_num_pa = MapNPA, map_pid_num = MapPN,
map_pid_writer = MapPW}) ->
MapNPA1 = gb_trees:delete(Number, MapNPA),
MapPN1 = maps:remove(Pid, MapPN),
State#state{map_num_pa = MapNPA1,
map_pid_num = MapPN1}.
MapPW1 = maps:remove(Pid, MapPW),
State#state{map_num_pa = MapNPA1,
map_pid_num = MapPN1,
map_pid_writer = MapPW1}.

internal_is_empty(#state{map_num_pa = MapNPA}) ->
gb_trees:is_empty(MapNPA).
Expand Down
57 changes: 45 additions & 12 deletions deps/amqp_client/src/amqp_gen_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
-behaviour(gen_server).

-export([start_link/2, connect/1, open_channel/3, hard_error_in_channel/3,
channel_internal_error/3, server_misbehaved/2, channels_terminated/1,
close/3, server_close/2, info/2, info_keys/0, info_keys/1,
register_blocked_handler/2, update_secret/2]).
channel_internal_error/3, channel_closed/2, server_misbehaved/2,
channels_terminated/1, close/3, server_close/2, info/2, info_keys/0,
info_keys/1, register_blocked_handler/2, update_secret/2]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).

Expand All @@ -32,6 +32,7 @@
%% connection.block, connection.unblock handler
block_handler,
blocked_by = sets:new([{version, 2}]),
queue_types_published = #{},
closing = false %% #closing{} | false
}).

Expand Down Expand Up @@ -96,6 +97,9 @@ hard_error_in_channel(Pid, ChannelPid, Reason) ->
channel_internal_error(Pid, ChannelPid, Reason) ->
gen_server:cast(Pid, {channel_internal_error, ChannelPid, Reason}).

channel_closed(Pid, TaggedWriter) ->
gen_server:cast(Pid, {channel_closed, TaggedWriter}).

server_misbehaved(Pid, AmqpError) ->
gen_server:cast(Pid, {server_misbehaved, AmqpError}).

Expand Down Expand Up @@ -214,22 +218,31 @@ handle_cast({register_blocked_handler, HandlerPid},
{noreply, State1};
handle_cast({conserve_resources, Source, Conserve},
#state{blocked_by = BlockedBy} = State) ->
WasNotBlocked = sets:is_empty(BlockedBy),
BlockedBy1 = case Conserve of
true ->
sets:add_element(Source, BlockedBy);
false ->
sets:del_element(Source, BlockedBy)
end,
State1 = State#state{blocked_by = BlockedBy1},
case sets:is_empty(BlockedBy1) of
true ->
handle_method(#'connection.unblocked'{}, State1);
false when WasNotBlocked ->
handle_method(#'connection.blocked'{}, State1);
false ->
{noreply, State1}
end.
maybe_block(State, State1);
handle_cast({channel_published_to_queue_type, ServerChPid, QT},
#state{queue_types_published = QTs} = State) ->
%% ServerChPid is the server-side channel pid. We key by {direct, ServerChPid}
%% so channel_closed can remove the entry using the tagged writer.
Key = {direct, ServerChPid},
ChQTs = maps:get(Key, QTs, sets:new([{version, 2}])),
State1 = State#state{queue_types_published =
QTs#{Key => sets:add_element(QT, ChQTs)}},
maybe_block(State, State1);
handle_cast({channel_closed, {direct, ServerChPid}},
#state{queue_types_published = QTs} = State) ->
State1 = State#state{queue_types_published =
maps:remove({direct, ServerChPid}, QTs)},
maybe_block(State, State1);
handle_cast({channel_closed, {network, _WriterPid}}, State) ->
%% Network connections are handled by rabbit_reader, not here.
{noreply, State}.

%% @private
handle_info({'DOWN', _, process, BlockHandler, Reason},
Expand Down Expand Up @@ -274,6 +287,26 @@ i(Item, #state{module = Mod, module_state = MState}) -> Mod:i(Item, MState).
register_blocked_handler(Pid, HandlerPid) ->
gen_server:cast(Pid, {register_blocked_handler, HandlerPid}).

maybe_block(State0, State1) ->
WasBlocked = should_block(State0),
case should_block(State1) of
true when not WasBlocked ->
handle_method(#'connection.blocked'{}, State1);
false when WasBlocked ->
handle_method(#'connection.unblocked'{}, State1);
_ ->
{noreply, State1}
end.

should_block(#state{blocked_by = BlockedBy, queue_types_published = QTs}) ->
lists:any(fun ({disk, QT}) ->
maps:fold(fun(_, ChQTs, Acc) ->
Acc orelse sets:is_element(QT, ChQTs)
end, false, QTs);
(_Resource) ->
true
end, sets:to_list(BlockedBy)).

%%---------------------------------------------------------------------------
%% Command handling
%%---------------------------------------------------------------------------
Expand Down
110 changes: 110 additions & 0 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -1288,6 +1288,102 @@ fun(Conf) ->
end
end}.

%% Tuning of disk monitor polling parameters
{mapping, "disk_monitor.fast_rate", "rabbit.disk_monitor_fast_rate", [
%% Unit: KB/second, for example 250_000 for 250MB/sec.
{datatype, [integer]}
]}.
{mapping, "disk_monitor.min_interval", "rabbit.disk_monitor_min_interval", [
%% Unit: milliseconds.
{datatype, [integer]}
]}.
{mapping, "disk_monitor.max_interval", "rabbit.disk_monitor_max_interval", [
%% Unit: milliseconds.
{datatype, [integer]}
]}.

%% Per-queue-type / per-mount disk alarms
{mapping, "disk_free_limits.$num.name", "rabbit.disk_free_limits", [
{datatype, [binary]}
]}.
{mapping, "disk_free_limits.$num.mount", "rabbit.disk_free_limits", [
{datatype, [string]}
]}.
{mapping, "disk_free_limits.$num.limit", "rabbit.disk_free_limits", [
{datatype, [integer, string]},
{validators, ["is_supported_information_unit"]}
]}.
{mapping, "disk_free_limits.$num.queue_types", "rabbit.disk_free_limits", [
{datatype, [binary]}
]}.

{translation, "rabbit.disk_free_limits",
fun(Conf) ->
case cuttlefish_variable:filter_by_prefix("disk_free_limits", Conf) of
[] ->
cuttlefish:unset();
Settings ->
Ls = lists:foldl(
fun ({["disk_free_limits", Num, Key0], Value0}, Acc) ->
Idx = case string:to_integer(Num) of
{N, []} -> N;
_ -> cuttlefish:invalid(lists:flatten(io_lib:format("~p could not be parsed as a number", [Num])))
end,
Key = case Key0 of
"name" -> name;
"mount" -> mount;
"limit" -> limit;
"queue_types" -> queue_types;
_ -> cuttlefish:invalid(lists:flatten(io_lib:format("~p is invalid", [Key0])))
end,
Value = case Key of
queue_types -> string:split(Value0, ",");
_ -> Value0
end,
maps:update_with(
Idx,
fun (#{Key := ExistingValue} = Limit) ->
cuttlefish:warn(
io_lib:format("Disk limit ~b has duplicate setting ~ts, "
"using ~tp instead of ~tp",
[Idx, Key, Value, ExistingValue])),
Limit#{Key := Value};
(Limit) ->
Limit#{Key => Value}
end, #{Key => Value}, Acc);
(Other, _Acc) ->
cuttlefish:invalid(
lists:flatten(io_lib:format("~p is invalid", [Other])))
end, #{}, Settings),
maps:fold(
fun(_Idx, #{name := Name}, Names) ->
case sets:is_element(Name, Names) of
true ->
cuttlefish:invalid(
lists:flatten(io_lib:format("name ~ts is used by multiple mounts", [Name])));
false ->
sets:add_element(Name, Names)
end
end, sets:new([{version, 2}]), Ls),
maps:foreach(
fun(Idx, Entry) ->
%% All four fields are required by the pattern match in
%% rabbit_disk_monitor:mounts/0.
Required = [name, mount, limit, queue_types],
Missing = [K || K <- Required, not maps:is_key(K, Entry)],
case Missing of
[] -> ok;
_ ->
cuttlefish:invalid(
lists:flatten(io_lib:format(
"disk_free_limits.~b is missing required fields: ~p",
[Idx, Missing])))
end
end, Ls),
Ls
end
end}.

%%
%% Clustering
%% =====================
Expand Down Expand Up @@ -2718,6 +2814,20 @@ end}.
{datatype, {enum, [true, false]}}
]}.

%% Classic queue data directory
{mapping, "classic_queue.data_dir", "rabbit.classic_queue_data_dir", [
{datatype, string}
]}.

{translation, "rabbit.classic_queue_data_dir",
fun(Conf) ->
case cuttlefish:conf_get("classic_queue.data_dir", Conf, undefined) of
undefined -> cuttlefish:unset();
Val -> Val
end
end
}.

%%
%% Backing queue version
%%
Expand Down
Loading
Loading