Skip to content

Commit

Permalink
Track requeue history
Browse files Browse the repository at this point in the history
as described in rabbitmq/rabbitmq-website#2095

This commit adds a test case and fixes a bug in the broker to allow
using `array` type in the value of the modified annotations.
  • Loading branch information
ansd committed Oct 11, 2024
1 parent d9ff6a0 commit b9f45d3
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 29 deletions.
8 changes: 5 additions & 3 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1178,14 +1178,16 @@ wrap_map_value(true) ->
{boolean, true};
wrap_map_value(false) ->
{boolean, false};
wrap_map_value(V) when is_integer(V) ->
{uint, V};
wrap_map_value(V) when is_integer(V) andalso V >= 0 ->
uint(V);
wrap_map_value(V) when is_binary(V) ->
utf8(V);
wrap_map_value(V) when is_list(V) ->
utf8(list_to_binary(V));
wrap_map_value(V) when is_atom(V) ->
utf8(atom_to_list(V)).
utf8(atom_to_list(V));
wrap_map_value(TaggedValue) when is_atom(element(1, TaggedValue)) ->
TaggedValue.

utf8(V) -> amqp10_client_types:utf8(V).

Expand Down
5 changes: 2 additions & 3 deletions deps/rabbit/src/mc_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ infer_type(V) when is_integer(V) ->
{long, V};
infer_type(V) when is_boolean(V) ->
{boolean, V};
infer_type({T, _} = V) when is_atom(T) ->
%% looks like a pre-tagged type
V.
infer_type(TaggedValue) when is_atom(element(1, TaggedValue)) ->
TaggedValue.

utf8_string_is_ascii(UTF8String) ->
utf8_scan(UTF8String, fun(Char) -> Char >= 0 andalso Char < 128 end).
Expand Down
54 changes: 31 additions & 23 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -501,59 +501,67 @@ modified_quorum_queue(Config) ->
ok = amqp10_client:send_msg(Sender, Msg2),
ok = amqp10_client:detach_link(Sender),

{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled),
Receiver1Name = <<"receiver 1">>,
Receiver2Name = <<"receiver 2">>,
{ok, Receiver1} = amqp10_client:attach_receiver_link(Session, Receiver1Name, Address, unsettled),
{ok, Receiver2} = amqp10_client:attach_receiver_link(Session, Receiver2Name, Address, unsettled),

{ok, M1} = amqp10_client:get_msg(Receiver),
{ok, M1} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m1">>], amqp10_msg:body(M1)),
?assertMatch(#{delivery_count := 0,
first_acquirer := true},
amqp10_msg:headers(M1)),
ok = amqp10_client:settle_msg(Receiver, M1, {modified, false, true, #{}}),
ok = amqp10_client:settle_msg(Receiver1, M1, {modified, false, true, #{}}),

{ok, M2a} = amqp10_client:get_msg(Receiver),
{ok, M2a} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m2">>], amqp10_msg:body(M2a)),
?assertMatch(#{delivery_count := 0,
first_acquirer := true},
amqp10_msg:headers(M2a)),
ok = amqp10_client:settle_msg(Receiver, M2a, {modified, false, false, #{}}),
ok = amqp10_client:settle_msg(Receiver1, M2a, {modified, false, false, #{}}),

{ok, M2b} = amqp10_client:get_msg(Receiver),
{ok, M2b} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m2">>], amqp10_msg:body(M2b)),
?assertMatch(#{delivery_count := 0,
first_acquirer := false},
amqp10_msg:headers(M2b)),
ok = amqp10_client:settle_msg(Receiver, M2b, {modified, true, false, #{}}),
ok = amqp10_client:settle_msg(Receiver1, M2b, {modified, true, false, #{}}),

{ok, M2c} = amqp10_client:get_msg(Receiver),
{ok, M2c} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m2">>], amqp10_msg:body(M2c)),
?assertMatch(#{delivery_count := 1,
first_acquirer := false},
amqp10_msg:headers(M2c)),
ok = amqp10_client:settle_msg(Receiver, M2c,
{modified, true, false,
#{<<"x-opt-key">> => <<"val 1">>}}),

{ok, M2d} = amqp10_client:get_msg(Receiver),
ok = amqp10_client:settle_msg(
Receiver1, M2c,
{modified, true, false,
%% Test that a history of requeue events can be tracked as described in
%% https://rabbitmq.com/blog/2024/10/11/modified-outcome
#{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver1Name}]}}
}),

{ok, M2d} = amqp10_client:get_msg(Receiver2),
?assertEqual([<<"m2">>], amqp10_msg:body(M2d)),
?assertMatch(#{delivery_count := 2,
first_acquirer := false},
amqp10_msg:headers(M2d)),
?assertMatch(#{<<"x-opt-key">> := <<"val 1">>}, amqp10_msg:message_annotations(M2d)),
ok = amqp10_client:settle_msg(Receiver, M2d,
{modified, false, false,
#{<<"x-opt-key">> => <<"val 2">>,
<<"x-other">> => 99}}),

{ok, M2e} = amqp10_client:get_msg(Receiver),
#{<<"x-opt-requeued-by">> := {array, utf8, L}} = amqp10_msg:message_annotations(M2d),
ok = amqp10_client:settle_msg(
Receiver1, M2d,
{modified, false, false,
#{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver2Name} | L]},
<<"x-other">> => 99}}),

{ok, M2e} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m2">>], amqp10_msg:body(M2e)),
?assertMatch(#{delivery_count := 2,
first_acquirer := false},
amqp10_msg:headers(M2e)),
?assertMatch(#{<<"x-opt-key">> := <<"val 2">>,
?assertMatch(#{<<"x-opt-requeued-by">> := {array, utf8, [{utf8, Receiver2Name}, {utf8, Receiver1Name}]},
<<"x-other">> := 99}, amqp10_msg:message_annotations(M2e)),
ok = amqp10_client:settle_msg(Receiver, M2e, modified),
ok = amqp10_client:settle_msg(Receiver1, M2e, modified),

ok = amqp10_client:detach_link(Receiver),
ok = amqp10_client:detach_link(Receiver1),
?assertMatch({ok, #{message_count := 1}},
rabbitmq_amqp_client:delete_queue(LinkPair, QName)),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
Expand Down

0 comments on commit b9f45d3

Please sign in to comment.