Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Comply with §2.2.2 of Anonymous Terminus extension (backport #12391) #12397

Merged
merged 2 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,8 @@ rcv_settle_mode(_) -> undefined.
% TODO: work out if we can assume accepted
translate_delivery_state(undefined) -> undefined;
translate_delivery_state(#'v1_0.accepted'{}) -> accepted;
translate_delivery_state(#'v1_0.rejected'{}) -> rejected;
translate_delivery_state(#'v1_0.rejected'{error = undefined}) -> rejected;
translate_delivery_state(#'v1_0.rejected'{error = Error}) -> {rejected, Error};
translate_delivery_state(#'v1_0.modified'{}) -> modified;
translate_delivery_state(#'v1_0.released'{}) -> released;
translate_delivery_state(#'v1_0.received'{}) -> received;
Expand Down
45 changes: 38 additions & 7 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2342,6 +2342,7 @@ incoming_link_transfer(
validate_transfer_snd_settle_mode(SndSettleMode, Settled),
validate_transfer_rcv_settle_mode(RcvSettleMode, Settled),
validate_message_size(PayloadBin, MaxMessageSize),
messages_received(Settled),

Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
Expand All @@ -2350,7 +2351,6 @@ incoming_link_transfer(
check_user_id(Mc2, User),
TopicPermCache = check_write_permitted_on_topic(
X, User, RoutingKey, TopicPermCache0),
messages_received(Settled),
QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}),
rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace),
Opts = #{correlation => {HandleInt, DeliveryId}},
Expand Down Expand Up @@ -2386,9 +2386,34 @@ incoming_link_transfer(
[DeliveryTag, DeliveryId, Reason])
end;
{error, #'v1_0.error'{} = Err} ->
Disposition = released(DeliveryId),
Detach = detach(HandleInt, Link0, Err),
{error, [Disposition, Detach]}
Disposition = case Settled of
true -> [];
false -> [released(DeliveryId)]
end,
Detach = [detach(HandleInt, Link0, Err)],
{error, Disposition ++ Detach};
{error, anonymous_terminus, #'v1_0.error'{} = Err} ->
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors
case Settled of
true ->
Info = {map, [{{symbol, <<"delivery-tag">>}, DeliveryTag}]},
Err1 = Err#'v1_0.error'{info = Info},
Detach = detach(HandleInt, Link0, Err1),
{error, [Detach]};
false ->
Disposition = rejected(DeliveryId, Err),
DeliveryCount = add(DeliveryCount0, 1),
Credit1 = Credit0 - 1,
{Credit, Reply0} = maybe_grant_link_credit(
Credit1, MaxLinkCredit,
DeliveryCount, map_size(U0), Handle),
Reply = [Disposition | Reply0],
Link = Link0#incoming_link{
delivery_count = DeliveryCount,
credit = Credit,
multi_transfer_msg = undefined},
{ok, Reply, Link, State0}
end
end.

lookup_target(#exchange{} = X, LinkRKey, Mc, _, _, PermCache) ->
Expand All @@ -2412,16 +2437,16 @@ lookup_target(to, to, Mc, Vhost, User, PermCache0) ->
check_internal_exchange(X),
lookup_routing_key(X, RKey, Mc, PermCache);
{error, not_found} ->
{error, error_not_found(XName)}
{error, anonymous_terminus, error_not_found(XName)}
end;
{error, bad_address} ->
{error,
{error, anonymous_terminus,
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
description = {utf8, <<"bad 'to' address string: ", String/binary>>}}}
end;
undefined ->
{error,
{error, anonymous_terminus,
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
description = {utf8, <<"anonymous terminus requires 'to' address to be set">>}}}
Expand Down Expand Up @@ -2465,6 +2490,12 @@ released(DeliveryId) ->
settled = true,
state = #'v1_0.released'{}}.

rejected(DeliveryId, Error) ->
#'v1_0.disposition'{role = ?AMQP_ROLE_RECEIVER,
first = ?UINT(DeliveryId),
settled = true,
state = #'v1_0.rejected'{error = Error}}.

maybe_grant_link_credit(Credit, MaxLinkCredit, DeliveryCount, NumUnconfirmed, Handle) ->
case grant_link_credit(Credit, MaxLinkCredit, NumUnconfirmed) of
true ->
Expand Down
99 changes: 68 additions & 31 deletions deps/rabbit/test/amqp_address_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ common_tests() ->
target_per_message_queue,
target_per_message_unset_to_address,
target_per_message_bad_to_address,
target_per_message_exchange_absent,
target_per_message_exchange_absent_settled,
target_per_message_exchange_absent_unsettled,
target_bad_address,
source_bad_address
].
Expand Down Expand Up @@ -393,16 +394,15 @@ target_per_message_unset_to_address(Config) ->
%% Send message with 'to' unset.
DTag = <<1>>,
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag, <<0>>)),
ok = wait_for_settled(released, DTag),
receive {amqp10_event,
{link, Sender,
{detached,
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
description = {utf8, <<"anonymous terminus requires 'to' address to be set">>}}}}} -> ok
after 5000 -> ct:fail("server did not close our outgoing link")
ExpectedError = #'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
description = {utf8, <<"anonymous terminus requires 'to' address to be set">>}},
ok = wait_for_settled({rejected, ExpectedError}, DTag),

ok = amqp10_client:detach_link(Sender),
receive {amqp10_event, {link, Sender, {detached, normal}}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
end,

ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).

Expand Down Expand Up @@ -449,34 +449,32 @@ bad_v2_addresses() ->

%% Test v2 target address 'null' with an invalid 'to' addresses.
target_per_message_bad_to_address(Config) ->
lists:foreach(fun(Addr) ->
ok = target_per_message_bad_to_address0(Addr, Config)
end, bad_v2_addresses()).

target_per_message_bad_to_address0(Address, Config) ->
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null),
ok = wait_for_credit(Sender),

DTag = <<255>>,
Msg = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag, <<0>>)),
ok = amqp10_client:send_msg(Sender, Msg),
ok = wait_for_settled(released, DTag),
receive {amqp10_event,
{link, Sender,
{detached,
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
description = {utf8, <<"bad 'to' address", _Rest/binary>>}}}}} -> ok
after 5000 -> ct:fail("server did not close our outgoing link")
end,
lists:foreach(
fun(Addr) ->
DTag = <<"some delivery tag">>,
Msg = amqp10_msg:set_properties(#{to => Addr}, amqp10_msg:new(DTag, <<0>>, false)),
ok = amqp10_client:send_msg(Sender, Msg),
receive
{amqp10_disposition, {{rejected, Error}, DTag}} ->
?assertMatch(#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
description = {utf8, <<"bad 'to' address", _Rest/binary>>}},
Error)
after 5000 ->
flush(missing_disposition),
ct:fail(missing_disposition)
end
end, bad_v2_addresses()),

ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).

target_per_message_exchange_absent(Config) ->
target_per_message_exchange_absent_settled(Config) ->
Init = {_, LinkPair = #link_pair{session = Session}} = init(Config),
XName = <<"🎈"/utf8>>,
Address = rabbitmq_amqp_address:exchange(XName),
Expand All @@ -492,20 +490,59 @@ target_per_message_exchange_absent(Config) ->
ok = rabbitmq_amqp_client:delete_exchange(LinkPair, XName),

DTag2 = <<2>>,
Msg2 = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag2, <<"m2">>)),
Msg2 = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag2, <<"m2">>, true)),
ok = amqp10_client:send_msg(Sender, Msg2),
ok = wait_for_settled(released, DTag2),

%% "the routing node MUST detach the link over which the message was sent with an error.
%% [...] Additionally the info field of error MUST contain an entry with symbolic key delivery-tag
%% and binary value of the delivery-tag of the message which caused the failure."
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors
receive {amqp10_event, {link, Sender, {detached, Error}}} ->
?assertEqual(
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
description = {utf8, <<"no exchange '", XName/binary, "' in vhost '/'">>}},
description = {utf8, <<"no exchange '", XName/binary, "' in vhost '/'">>},
info = {map, [{{symbol, <<"delivery-tag">>}, {binary, DTag2}}]}
},
Error)
after 5000 -> ct:fail("server did not close our outgoing link")
end,

ok = cleanup(Init).

target_per_message_exchange_absent_unsettled(Config) ->
Init = {_, LinkPair = #link_pair{session = Session}} = init(Config),
XName = <<"🎈"/utf8>>,
Address = rabbitmq_amqp_address:exchange(XName),
ok = rabbitmq_amqp_client:declare_exchange(LinkPair, XName, #{}),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null),
ok = wait_for_credit(Sender),

DTag1 = <<"my tag">>,
Msg1 = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag1, <<"hey">>)),
ok = amqp10_client:send_msg(Sender, Msg1),
ok = wait_for_settled(released, DTag1),

ok = rabbitmq_amqp_client:delete_exchange(LinkPair, XName),

%% "If the source of the link supports the rejected outcome, and the message has not
%% already been settled by the sender, then the routing node MUST reject the message.
%% In this case the error field of rejected MUST contain the error which would have been communicated
%% in the detach which would have be sent if a link to the same address had been attempted."
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors
%% We test here multiple rejections implicilty checking that link flow control works correctly.
ExpectedError = #'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
description = {utf8, <<"no exchange '", XName/binary, "' in vhost '/'">>}},
[begin
DTag = Body = integer_to_binary(N),
Msg = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag, Body, false)),
ok = amqp10_client:send_msg(Sender, Msg),
ok = wait_for_settled({rejected, ExpectedError}, DTag)
end || N <- lists:seq(1, 300)],

ok = cleanup(Init).

target_bad_address(Config) ->
%% bad v1 and bad v2 target address
TargetAddr = <<"/qqq/🎈"/utf8>>,
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/test/amqp_auth_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ target_per_message_internal_exchange(Config) ->
ExpectedErr = error_unauthorized(
<<"forbidden to publish to internal exchange '", XName/binary, "' in vhost 'test vhost'">>),
receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok
after 5000 -> flush(aaa),
after 5000 -> flush(missing_event),
ct:fail({missing_event, ?LINE})
end,
ok = close_connection_sync(Conn1),
Expand Down
22 changes: 17 additions & 5 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ groups() ->
server_closes_link_classic_queue,
server_closes_link_quorum_queue,
server_closes_link_stream,
server_closes_link_exchange,
server_closes_link_exchange_settled,
server_closes_link_exchange_unsettled,
link_target_classic_queue_deleted,
link_target_quorum_queue_deleted,
target_queues_deleted_accepted,
Expand Down Expand Up @@ -1513,7 +1514,13 @@ server_closes_link(QType, Config) ->
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).

server_closes_link_exchange(Config) ->
server_closes_link_exchange_settled(Config) ->
server_closes_link_exchange(true, Config).

server_closes_link_exchange_unsettled(Config) ->
server_closes_link_exchange(false, Config).

server_closes_link_exchange(Settled, Config) ->
XName = atom_to_binary(?FUNCTION_NAME),
QName = <<"my queue">>,
RoutingKey = <<"my routing key">>,
Expand Down Expand Up @@ -1543,8 +1550,13 @@ server_closes_link_exchange(Config) ->
%% When we publish the next message, we expect:
%% 1. that the message is released because the exchange doesn't exist anymore, and
DTag2 = <<255>>,
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag2, <<"m2">>, false)),
ok = wait_for_settlement(DTag2, released),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag2, <<"m2">>, Settled)),
case Settled of
true ->
ok;
false ->
ok = wait_for_settlement(DTag2, released)
end,
%% 2. that the server closes the link, i.e. sends us a DETACH frame.
receive {amqp10_event,
{link, Sender,
Expand Down Expand Up @@ -5980,7 +5992,7 @@ assert_messages(QNameBin, NumTotalMsgs, NumUnackedMsgs, Config, Node) ->
Infos = rpc(Config, Node, rabbit_amqqueue, info, [Q, [messages, messages_unacknowledged]]),
lists:sort(Infos)
end
), 500, 5).
), 500, 10).

serial_number_increment(S) ->
case S + 1 of
Expand Down
Loading