Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track requeue history #12506

Merged
merged 4 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1179,13 +1179,20 @@ wrap_map_value(true) ->
wrap_map_value(false) ->
{boolean, false};
wrap_map_value(V) when is_integer(V) ->
{uint, V};
case V < 0 of
true ->
{int, V};
false ->
uint(V)
end;
wrap_map_value(V) when is_binary(V) ->
utf8(V);
wrap_map_value(V) when is_list(V) ->
utf8(list_to_binary(V));
wrap_map_value(V) when is_atom(V) ->
utf8(atom_to_list(V)).
utf8(atom_to_list(V));
wrap_map_value(TaggedValue) when is_atom(element(1, TaggedValue)) ->
TaggedValue.

utf8(V) -> amqp10_client_types:utf8(V).

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
-type str() :: atom() | string() | binary().
-type internal_ann_key() :: atom().
-type x_ann_key() :: binary(). %% should begin with x- or ideally x-opt-
-type x_ann_value() :: str() | integer() | float() | [x_ann_value()].
-type x_ann_value() :: str() | integer() | float() | TaggedValue :: tuple() | [x_ann_value()].
-type protocol() :: module().
-type annotations() :: #{internal_ann_key() => term(),
x_ann_key() => x_ann_value()}.
Expand Down
5 changes: 2 additions & 3 deletions deps/rabbit/src/mc_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ infer_type(V) when is_integer(V) ->
{long, V};
infer_type(V) when is_boolean(V) ->
{boolean, V};
infer_type({T, _} = V) when is_atom(T) ->
%% looks like a pre-tagged type
V.
infer_type(TaggedValue) when is_atom(element(1, TaggedValue)) ->
TaggedValue.

utf8_string_is_ascii(UTF8String) ->
utf8_scan(UTF8String, fun(Char) -> Char >= 0 andalso Char < 128 end).
Expand Down
13 changes: 10 additions & 3 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1938,7 +1938,7 @@ settle_op_from_outcome(#'v1_0.modified'{delivery_failed = DelFailed,
Anns1 = lists:map(
%% "all symbolic keys except those beginning with "x-" are reserved." [3.2.10]
fun({{symbol, <<"x-", _/binary>> = K}, V}) ->
{K, unwrap(V)}
{K, unwrap_simple_type(V)}
end, KVList),
maps:from_list(Anns1)
end,
Expand Down Expand Up @@ -3624,7 +3624,14 @@ format_status(
topic_permission_cache => TopicPermissionCache},
maps:update(state, State, Status).

unwrap({_Tag, V}) ->

unwrap_simple_type(V = {list, _}) ->
V;
unwrap_simple_type(V = {map, _}) ->
V;
unwrap_simple_type(V = {array, _, _}) ->
V;
unwrap_simple_type({_SimpleType, V}) ->
V;
unwrap(V) ->
unwrap_simple_type(V) ->
V.
209 changes: 178 additions & 31 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ groups() ->
modified_classic_queue,
modified_quorum_queue,
modified_dead_letter_headers_exchange,
modified_dead_letter_history,
dead_letter_headers_exchange,
dead_letter_reject,
dead_letter_reject_message_order_classic_queue,
Expand Down Expand Up @@ -264,7 +265,8 @@ init_per_testcase(T, Config)
end;
init_per_testcase(T, Config)
when T =:= modified_quorum_queue orelse
T =:= modified_dead_letter_headers_exchange ->
T =:= modified_dead_letter_headers_exchange orelse
T =:= modified_dead_letter_history ->
case rpc(Config, rabbit_feature_flags, is_enabled, ['rabbitmq_4.0.0']) of
true ->
rabbit_ct_helpers:testcase_started(Config, T);
Expand Down Expand Up @@ -501,79 +503,127 @@ modified_quorum_queue(Config) ->
ok = amqp10_client:send_msg(Sender, Msg2),
ok = amqp10_client:detach_link(Sender),

{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled),
Receiver1Name = <<"receiver 1">>,
Receiver2Name = <<"receiver 2">>,
{ok, Receiver1} = amqp10_client:attach_receiver_link(Session, Receiver1Name, Address, unsettled),
{ok, Receiver2} = amqp10_client:attach_receiver_link(Session, Receiver2Name, Address, unsettled),

{ok, M1} = amqp10_client:get_msg(Receiver),
{ok, M1} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m1">>], amqp10_msg:body(M1)),
?assertMatch(#{delivery_count := 0,
first_acquirer := true},
amqp10_msg:headers(M1)),
ok = amqp10_client:settle_msg(Receiver, M1, {modified, false, true, #{}}),
ok = amqp10_client:settle_msg(Receiver1, M1, {modified, false, true, #{}}),

{ok, M2a} = amqp10_client:get_msg(Receiver),
{ok, M2a} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m2">>], amqp10_msg:body(M2a)),
?assertMatch(#{delivery_count := 0,
first_acquirer := true},
amqp10_msg:headers(M2a)),
ok = amqp10_client:settle_msg(Receiver, M2a, {modified, false, false, #{}}),
ok = amqp10_client:settle_msg(Receiver1, M2a, {modified, false, false, #{}}),

{ok, M2b} = amqp10_client:get_msg(Receiver),
{ok, M2b} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m2">>], amqp10_msg:body(M2b)),
?assertMatch(#{delivery_count := 0,
first_acquirer := false},
amqp10_msg:headers(M2b)),
ok = amqp10_client:settle_msg(Receiver, M2b, {modified, true, false, #{}}),
ok = amqp10_client:settle_msg(Receiver1, M2b, {modified, true, false, #{}}),

{ok, M2c} = amqp10_client:get_msg(Receiver),
{ok, M2c} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m2">>], amqp10_msg:body(M2c)),
?assertMatch(#{delivery_count := 1,
first_acquirer := false},
amqp10_msg:headers(M2c)),
ok = amqp10_client:settle_msg(Receiver, M2c,
{modified, true, false,
#{<<"x-opt-key">> => <<"val 1">>}}),

{ok, M2d} = amqp10_client:get_msg(Receiver),
ok = amqp10_client:settle_msg(
Receiver1, M2c,
{modified, true, false,
%% Test that a history of requeue events can be tracked as described in
%% https://rabbitmq.com/blog/2024/10/11/modified-outcome
#{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver1Name}]},
<<"x-opt-requeue-reason">> => {list, [{utf8, <<"reason 1">>}]},
<<"x-opt-my-map">> => {map, [
{{utf8, <<"k1">>}, {byte, -1}},
{{utf8, <<"k2">>}, {ulong, 2}}
]}}}),

{ok, M2d} = amqp10_client:get_msg(Receiver2),
?assertEqual([<<"m2">>], amqp10_msg:body(M2d)),
?assertMatch(#{delivery_count := 2,
first_acquirer := false},
amqp10_msg:headers(M2d)),
?assertMatch(#{<<"x-opt-key">> := <<"val 1">>}, amqp10_msg:message_annotations(M2d)),
ok = amqp10_client:settle_msg(Receiver, M2d,
{modified, false, false,
#{<<"x-opt-key">> => <<"val 2">>,
<<"x-other">> => 99}}),

{ok, M2e} = amqp10_client:get_msg(Receiver),
#{<<"x-opt-requeued-by">> := {array, utf8, L0},
<<"x-opt-requeue-reason">> := L1,
<<"x-opt-my-map">> := L2} = amqp10_msg:message_annotations(M2d),
ok = amqp10_client:settle_msg(
Receiver1, M2d,
{modified, false, false,
#{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver2Name} | L0]},
<<"x-opt-requeue-reason">> => {list, [{symbol, <<"reason 2">>} | L1]},
<<"x-opt-my-map">> => {map, L2 ++ [{{symbol, <<"k3">>}, {symbol, <<"val 3">>}}]},
<<"x-other">> => 99}}),

{ok, M2e} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m2">>], amqp10_msg:body(M2e)),
?assertMatch(#{delivery_count := 2,
first_acquirer := false},
amqp10_msg:headers(M2e)),
?assertMatch(#{<<"x-opt-key">> := <<"val 2">>,
?assertMatch(#{<<"x-opt-requeued-by">> := {array, utf8, [{utf8, Receiver2Name}, {utf8, Receiver1Name}]},
<<"x-opt-requeue-reason">> := [{symbol, <<"reason 2">>}, {utf8, <<"reason 1">>}],
<<"x-opt-my-map">> := [
{{utf8, <<"k1">>}, {byte, -1}},
{{utf8, <<"k2">>}, {ulong, 2}},
{{symbol, <<"k3">>}, {symbol, <<"val 3">>}}
],
<<"x-other">> := 99}, amqp10_msg:message_annotations(M2e)),
ok = amqp10_client:settle_msg(Receiver, M2e, modified),
ok = amqp10_client:settle_msg(Receiver1, M2e, modified),

ok = amqp10_client:detach_link(Receiver),
?assertMatch({ok, #{message_count := 1}},
rabbitmq_amqp_client:delete_queue(LinkPair, QName)),
%% Test that we can consume via AMQP 0.9.1
Ch = rabbit_ct_client_helpers:open_channel(Config),
{#'basic.get_ok'{},
#amqp_msg{payload = <<"m2">>,
props = #'P_basic'{headers = Headers}}
} = amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = true}),
%% We expect to receive only modified AMQP 1.0 message annotations that are of simple types
%% (i.e. excluding list, map, array).
?assertEqual({value, {<<"x-other">>, long, 99}},
lists:keysearch(<<"x-other">>, 1, Headers)),
?assertEqual({value, {<<"x-delivery-count">>, long, 5}},
lists:keysearch(<<"x-delivery-count">>, 1, Headers)),
ok = rabbit_ct_client_helpers:close_channel(Ch),

ok = amqp10_client:detach_link(Receiver1),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).

%% Test that a message can be routed based on the message-annotations
%% provided in the modified outcome.
%% provided in the modified outcome as described in
%% https://rabbitmq.com/blog/2024/10/11/modified-outcome
modified_dead_letter_headers_exchange(Config) ->
{Connection, Session, LinkPair} = init(Config),
HeadersXName = <<"my headers exchange">>,
AlternateXName = <<"my alternate exchange">>,
SourceQName = <<"source quorum queue">>,
AppleQName = <<"dead letter classic queue receiving apples">>,
BananaQName = <<"dead letter quorum queue receiving bananas">>,
TrashQName = <<"trash queue receiving anything that doesn't match">>,

ok = rabbitmq_amqp_client:declare_exchange(
LinkPair,
HeadersXName,
#{type => <<"headers">>,
arguments => #{<<"alternate-exchange">> => {utf8, AlternateXName}}}),

ok = rabbitmq_amqp_client:declare_exchange(LinkPair, AlternateXName, #{type => <<"fanout">>}),

{ok, #{type := <<"quorum">>}} = rabbitmq_amqp_client:declare_queue(
LinkPair,
SourceQName,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>},
<<"x-overflow">> => {utf8, <<"reject-publish">>},
<<"x-dead-letter-strategy">> => {utf8, <<"at-least-once">>},
<<"x-dead-letter-exchange">> => {utf8, <<"amq.headers">>}}}),
<<"x-dead-letter-exchange">> => {utf8, HeadersXName}}}),
{ok, #{type := <<"classic">>}} = rabbitmq_amqp_client:declare_queue(
LinkPair,
AppleQName,
Expand All @@ -582,14 +632,16 @@ modified_dead_letter_headers_exchange(Config) ->
LinkPair,
BananaQName,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, TrashQName, #{}),
ok = rabbitmq_amqp_client:bind_queue(
LinkPair, AppleQName, <<"amq.headers">>, <<>>,
LinkPair, AppleQName, HeadersXName, <<>>,
#{<<"x-fruit">> => {utf8, <<"apple">>},
<<"x-match">> => {utf8, <<"any-with-x">>}}),
ok = rabbitmq_amqp_client:bind_queue(
LinkPair, BananaQName, <<"amq.headers">>, <<>>,
LinkPair, BananaQName, HeadersXName, <<>>,
#{<<"x-fruit">> => {utf8, <<"banana">>},
<<"x-match">> => {utf8, <<"any-with-x">>}}),
ok = rabbitmq_amqp_client:bind_queue(LinkPair, TrashQName, AlternateXName, <<>>, #{}),

{ok, Sender} = amqp10_client:attach_sender_link(
Session, <<"test-sender">>, rabbitmq_amqp_address:queue(SourceQName)),
Expand All @@ -600,6 +652,8 @@ modified_dead_letter_headers_exchange(Config) ->
Session, <<"receiver apple">>, rabbitmq_amqp_address:queue(AppleQName), unsettled),
{ok, ReceiverBanana} = amqp10_client:attach_receiver_link(
Session, <<"receiver banana">>, rabbitmq_amqp_address:queue(BananaQName), unsettled),
{ok, ReceiverTrash} = amqp10_client:attach_receiver_link(
Session, <<"receiver trash">>, rabbitmq_amqp_address:queue(TrashQName), unsettled),

ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>)),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t2">>, <<"m2">>)),
Expand All @@ -609,7 +663,8 @@ modified_dead_letter_headers_exchange(Config) ->
ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations(
#{"x-fruit" => <<"apple">>},
amqp10_msg:new(<<"t4">>, <<"m4">>))),
ok = wait_for_accepts(3),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t5">>, <<"m5">>)),
ok = wait_for_accepts(5),

{ok, Msg1} = amqp10_client:get_msg(Receiver),
?assertMatch(#{delivery_count := 0,
Expand Down Expand Up @@ -650,13 +705,105 @@ modified_dead_letter_headers_exchange(Config) ->
amqp10_msg:headers(MsgBanana2)),
ok = amqp10_client:accept_msg(ReceiverBanana, MsgBanana2),

{ok, Msg5} = amqp10_client:get_msg(Receiver),
%% This message should be routed via the alternate exchange to the trash queue.
ok = amqp10_client:settle_msg(Receiver, Msg5, {modified, false, true, #{<<"x-fruit">> => <<"strawberry">>}}),
{ok, MsgTrash} = amqp10_client:get_msg(ReceiverTrash),
?assertEqual([<<"m5">>], amqp10_msg:body(MsgTrash)),
?assertMatch(#{delivery_count := 0,
first_acquirer := false},
amqp10_msg:headers(MsgTrash)),
ok = amqp10_client:accept_msg(ReceiverTrash, MsgTrash),

ok = detach_link_sync(Sender),
ok = detach_link_sync(Receiver),
ok = detach_link_sync(ReceiverApple),
ok = detach_link_sync(ReceiverBanana),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, SourceQName),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, AppleQName),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, BananaQName),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, TrashQName),
ok = rabbitmq_amqp_client:delete_exchange(LinkPair, HeadersXName),
ok = rabbitmq_amqp_client:delete_exchange(LinkPair, AlternateXName),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).

%% Test that custom dead lettering event tracking works as described in
%% https://rabbitmq.com/blog/2024/10/11/modified-outcome
modified_dead_letter_history(Config) ->
{Connection, Session, LinkPair} = init(Config),
Q1 = <<"qq 1">>,
Q2 = <<"qq 2">>,

{ok, _} = rabbitmq_amqp_client:declare_queue(
LinkPair, Q1,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>},
<<"x-dead-letter-strategy">> => {utf8, <<"at-most-once">>},
<<"x-dead-letter-exchange">> => {utf8, <<"amq.fanout">>}}}),
{ok, _} = rabbitmq_amqp_client:declare_queue(
LinkPair, Q2,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>},
<<"x-dead-letter-strategy">> => {utf8, <<"at-most-once">>},
<<"x-dead-letter-exchange">> => {utf8, <<>>}}}),
ok = rabbitmq_amqp_client:bind_queue(LinkPair, Q2, <<"amq.fanout">>, <<>>, #{}),

{ok, Sender} = amqp10_client:attach_sender_link(
Session, <<"test-sender">>, rabbitmq_amqp_address:queue(Q1)),
wait_for_credit(Sender),
{ok, Receiver1} = amqp10_client:attach_receiver_link(
Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(Q1), unsettled),
{ok, Receiver2} = amqp10_client:attach_receiver_link(
Session, <<"receiver 2">>, rabbitmq_amqp_address:queue(Q2), unsettled),

ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t">>, <<"m">>)),
ok = wait_for_accepts(1),
ok = detach_link_sync(Sender),

{ok, Msg1} = amqp10_client:get_msg(Receiver1),
?assertMatch(#{delivery_count := 0,
first_acquirer := true},
amqp10_msg:headers(Msg1)),
ok = amqp10_client:settle_msg(
Receiver1, Msg1,
{modified, true, true,
#{<<"x-opt-history-list">> => {list, [{utf8, <<"l1">>}]},
<<"x-opt-history-map">> => {map, [{{symbol, <<"k1">>}, {byte, -1}}]},
<<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a1">>}]}}
}),

{ok, Msg2} = amqp10_client:get_msg(Receiver2),
?assertMatch(#{delivery_count := 1,
first_acquirer := false},
amqp10_msg:headers(Msg2)),
#{<<"x-opt-history-list">> := L1,
<<"x-opt-history-map">> := L2,
<<"x-opt-history-array">> := {array, utf8, L0}
} = amqp10_msg:message_annotations(Msg2),
ok = amqp10_client:settle_msg(
Receiver2, Msg2,
{modified, true, true,
#{<<"x-opt-history-list">> => {list, [{int, -99} | L1]},
<<"x-opt-history-map">> => {map, [{{symbol, <<"k2">>}, {symbol, <<"v2">>}} | L2]},
<<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a2">>} | L0]},
<<"x-other">> => -99}}),

{ok, Msg3} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m">>], amqp10_msg:body(Msg3)),
?assertMatch(#{delivery_count := 2,
first_acquirer := false},
amqp10_msg:headers(Msg3)),
?assertMatch(#{<<"x-opt-history-array">> := {array, utf8, [{utf8, <<"a2">>}, {utf8, <<"a1">>}]},
<<"x-opt-history-list">> := [{int, -99}, {utf8, <<"l1">>}],
<<"x-opt-history-map">> := [{{symbol, <<"k2">>}, {symbol, <<"v2">>}},
{{symbol, <<"k1">>}, {byte, -1}}],
<<"x-other">> := -99}, amqp10_msg:message_annotations(Msg3)),
ok = amqp10_client:accept_msg(Receiver1, Msg3),

ok = detach_link_sync(Receiver1),
ok = detach_link_sync(Receiver2),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q1),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q2),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).
Expand Down
Loading
Loading