Skip to content

Commit

Permalink
Set a floor of zero for incoming-window
Browse files Browse the repository at this point in the history
Prior to this commit, when the sending client overshot RabbitMQ's incoming-window
(which is allowed in the event of a cluster wide memory or disk alarm),
and RabbitMQ sent a FLOW frame to the client, RabbitMQ sent a negative
incoming-window field in the FLOW frame causing the following crash in
the writer proc:
```
crasher:
  initial call: rabbit_amqp_writer:init/1
  pid: <0.19353.0>
  registered_name: []
  exception error: bad argument
    in function  iolist_size/1
       called as iolist_size([<<112,0,0,23,120>>,
                              [82,-15],
                              <<"pÿÿÿü">>,<<"pÿÿÿÿ">>,67,
                              <<112,0,0,23,120>>,
                              "Rª",64,64,64,64])
       *** argument 1: not an iodata term
    in call from amqp10_binary_generator:generate1/1 (amqp10_binary_generator.erl, line 141)
    in call from amqp10_binary_generator:generate1/1 (amqp10_binary_generator.erl, line 88)
    in call from amqp10_binary_generator:generate/1 (amqp10_binary_generator.erl, line 79)
    in call from rabbit_amqp_writer:assemble_frame/3 (rabbit_amqp_writer.erl, line 206)
    in call from rabbit_amqp_writer:internal_send_command_async/3 (rabbit_amqp_writer.erl, line 189)
    in call from rabbit_amqp_writer:handle_cast/2 (rabbit_amqp_writer.erl, line 110)
    in call from gen_server:try_handle_cast/3 (gen_server.erl, line 1121)
```

This commit fixes this crash by maintaning a floor of zero for
incoming-window in the FLOW frame.

Fixes #12816
  • Loading branch information
ansd committed Dec 9, 2024
1 parent c15ba8e commit c3dcaca
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 6 deletions.
5 changes: 0 additions & 5 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -467,11 +467,6 @@ mapped({call, From},
#state{remote_incoming_window = Window})
when Window =< 0 ->
{keep_state_and_data, {reply, From, {error, remote_incoming_window_exceeded}}};
mapped({call, From},
{transfer, _Transfer, _Sections},
#state{remote_incoming_window = Window})
when Window =< 0 ->
{keep_state_and_data, {reply, From, {error, remote_incoming_window_exceeded}}};
mapped({call, From = {Pid, _}},
{transfer, #'v1_0.transfer'{handle = {uint, OutHandle},
delivery_tag = {binary, DeliveryTag},
Expand Down
5 changes: 4 additions & 1 deletion deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2003,7 +2003,10 @@ session_flow_fields(Frames, State)
session_flow_fields(Flow = #'v1_0.flow'{},
#state{next_outgoing_id = NextOutgoingId,
next_incoming_id = NextIncomingId,
incoming_window = IncomingWindow}) ->
incoming_window = IncomingWindow0}) ->
%% IncomingWindow0 can be negative when the sending client overshoots our window.
%% However, we must set a floor of 0 in the FLOW frame because field incoming-window is an uint.
IncomingWindow = max(0, IncomingWindow0),
Flow#'v1_0.flow'{
next_outgoing_id = ?UINT(NextOutgoingId),
outgoing_window = ?UINT_OUTGOING_WINDOW,
Expand Down
59 changes: 59 additions & 0 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ groups() ->
detach_requeues_drop_head_classic_queue,
resource_alarm_before_session_begin,
resource_alarm_after_session_begin,
resource_alarm_send_many,
max_message_size_client_to_server,
max_message_size_server_to_client,
global_counters,
Expand Down Expand Up @@ -3207,6 +3208,42 @@ resource_alarm_after_session_begin(Config) ->
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch).

%% Test case for
%% https://github.com/rabbitmq/rabbitmq-server/issues/12816
resource_alarm_send_many(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
Address = rabbitmq_amqp_address:queue(QName),
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),

%% Send many messages while a memory alarm kicks in.
%% Our expectations are:
%% 1. At some point, our client's remote-incoming-window should be exceeded because
%% RabbitMQ sets its incoming-window to 0 when the alarm kicks in.
%% 2. No crash.
{Pid, Ref} = spawn_monitor(?MODULE,
send_until_remote_incoming_window_exceeded,
[Session, Address]),
DefaultWatermark = rpc(Config, vm_memory_monitor, get_vm_memory_high_watermark, []),
ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0]),
receive {'DOWN', Ref, process, Pid, Reason} ->
?assertEqual(normal, Reason)
after 30_000 ->
ct:fail(send_timeout)
end,

%% Clear memory alarm.
ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [DefaultWatermark]),
timer:sleep(100),

ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch).

auth_attempt_metrics(Config) ->
open_and_close_connection(Config),
[Attempt1] = rpc(Config, rabbit_core_metrics, get_auth_attempts, []),
Expand Down Expand Up @@ -6286,6 +6323,28 @@ count_received_messages0(Receiver, Count) ->
Count
end.

send_until_remote_incoming_window_exceeded(Session, Address) ->
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address, settled),
ok = wait_for_credit(Sender),
ok = send_until_remote_incoming_window_exceeded0(Sender, 100_000),
ok = amqp10_client:detach_link(Sender).

send_until_remote_incoming_window_exceeded0(_Sender, 0) ->
ct:fail(remote_incoming_window_never_exceeded);
send_until_remote_incoming_window_exceeded0(Sender, Left) ->
Bin = integer_to_binary(Left),
Msg = amqp10_msg:new(Bin, Bin, true),
case amqp10_client:send_msg(Sender, Msg) of
ok ->
send_until_remote_incoming_window_exceeded0(Sender, Left - 1);
{error, insufficient_credit} ->
ok = wait_for_credit(Sender),
send_until_remote_incoming_window_exceeded0(Sender, Left);
{error, remote_incoming_window_exceeded = Reason} ->
ct:pal("~s: ~b messages left", [Reason, Left]),
ok
end.

assert_link_credit_runs_out(_Sender, 0) ->
ct:fail(sufficient_link_credit);
assert_link_credit_runs_out(Sender, Left) ->
Expand Down

0 comments on commit c3dcaca

Please sign in to comment.