Skip to content

Commit

Permalink
Return error if stream publisher reference is longer than 255 characters
Browse files Browse the repository at this point in the history
Fixes #12499
  • Loading branch information
acogoluegnes committed Oct 11, 2024
1 parent d9ff6a0 commit 4e8fb46
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 3 deletions.
26 changes: 24 additions & 2 deletions deps/rabbitmq_stream/src/rabbit_stream_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@
-define(UNKNOWN_FIELD, unknown_field).
-define(SILENT_CLOSE_DELAY, 3_000).

-import(rabbit_stream_utils, [check_write_permitted/2]).

%% client API
-export([start_link/4,
info/2,
Expand Down Expand Up @@ -1655,6 +1657,26 @@ handle_frame_post_auth(Transport,
{C1#stream_connection{connection_step = failure}, S1}
end,
{Connection1, State1};
handle_frame_post_auth(Transport,
#stream_connection{user = User,
resource_alarm = false} = C,
State,
{request, CorrelationId,
{declare_publisher, _PublisherId, WriterRef, S}})
when is_binary(WriterRef), byte_size(WriterRef) > 255 ->
{Code, Counter} = case check_write_permitted(stream_r(S, C), User) of
ok ->
{?RESPONSE_CODE_PRECONDITION_FAILED, ?PRECONDITION_FAILED};
error ->
{?RESPONSE_CODE_ACCESS_REFUSED, ?ACCESS_REFUSED}
end,
response(Transport,
C,
declare_publisher,
CorrelationId,
Code),
rabbit_global_counters:increase_protocol_counter(stream, Counter, 1),
{C, State};
handle_frame_post_auth(Transport,
#stream_connection{user = User,
publishers = Publishers0,
Expand All @@ -1664,7 +1686,7 @@ handle_frame_post_auth(Transport,
State,
{request, CorrelationId,
{declare_publisher, PublisherId, WriterRef, Stream}}) ->
case rabbit_stream_utils:check_write_permitted(stream_r(Stream,
case check_write_permitted(stream_r(Stream,
Connection0),
User)
of
Expand Down Expand Up @@ -3102,7 +3124,7 @@ evaluate_state_after_secret_update(Transport,
{_, Conn1} = ensure_token_expiry_timer(User, Conn0),
PublisherStreams =
lists:foldl(fun(#publisher{stream = Str}, Acc) ->
case rabbit_stream_utils:check_write_permitted(stream_r(Str, Conn0), User) of
case check_write_permitted(stream_r(Str, Conn0), User) of
ok ->
Acc;
_ ->
Expand Down
35 changes: 34 additions & 1 deletion deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ groups() ->
test_super_stream_duplicate_partitions,
authentication_error_should_close_with_delay,
unauthorized_vhost_access_should_close_with_delay,
sasl_anonymous
sasl_anonymous,
test_publisher_with_too_long_reference_errors
]},
%% Run `test_global_counters` on its own so the global metrics are
%% initialised to 0 for each testcase
Expand Down Expand Up @@ -945,6 +946,38 @@ unauthorized_vhost_access_should_close_with_delay(Config) ->
closed = wait_for_socket_close(T, S, 10),
ok.

test_publisher_with_too_long_reference_errors(Config) ->
FunctionName = atom_to_binary(?FUNCTION_NAME, utf8),
T = gen_tcp,
Port = get_port(T, Config),
Opts = get_opts(T),
{ok, S} = T:connect("localhost", Port, Opts),
C = rabbit_stream_core:init(0),
ConnectionName = FunctionName,
test_peer_properties(T, S, #{<<"connection_name">> => ConnectionName}, C),
test_authenticate(T, S, C),

Stream = FunctionName,
test_create_stream(T, S, Stream, C),

MaxSize = 255,
ReferenceOK = iolist_to_binary(lists:duplicate(MaxSize, <<"a">>)),
ReferenceKO = iolist_to_binary(lists:duplicate(MaxSize + 1, <<"a">>)),

Tests = [{1, ReferenceOK, ?RESPONSE_CODE_OK},
{2, ReferenceKO, ?RESPONSE_CODE_PRECONDITION_FAILED}],

[begin
F = request({declare_publisher, PubId, Ref, Stream}),
ok = T:send(S, F),
{Cmd, C} = receive_commands(T, S, C),
?assertMatch({response, 1, {declare_publisher, ExpectedResponseCode}}, Cmd)
end || {PubId, Ref, ExpectedResponseCode} <- Tests],

test_delete_stream(T, S, Stream, C),
test_close(T, S, C),
ok.

consumer_offset_info(Config, ConnectionName) ->
[[{offset, Offset},
{offset_lag, Lag}]] = rpc(Config, 0, ?MODULE,
Expand Down

0 comments on commit 4e8fb46

Please sign in to comment.