Skip to content

Commit

Permalink
Add alternate exchange test assertion
Browse files Browse the repository at this point in the history
Test the use case described in
rabbitmq/rabbitmq-website#2095
  • Loading branch information
ansd committed Oct 11, 2024
1 parent e6818f0 commit 855a32a
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 9 deletions.
41 changes: 36 additions & 5 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -596,19 +596,32 @@ modified_quorum_queue(Config) ->
ok = amqp10_client:close_connection(Connection).

%% Test that a message can be routed based on the message-annotations
%% provided in the modified outcome.
%% provided in the modified outcome as described in
%% https://rabbitmq.com/blog/2024/10/11/modified-outcome
modified_dead_letter_headers_exchange(Config) ->
{Connection, Session, LinkPair} = init(Config),
HeadersXName = <<"my headers exchange">>,
AlternateXName = <<"my alternate exchange">>,
SourceQName = <<"source quorum queue">>,
AppleQName = <<"dead letter classic queue receiving apples">>,
BananaQName = <<"dead letter quorum queue receiving bananas">>,
TrashQName = <<"trash queue receiving anything that doesn't match">>,

ok = rabbitmq_amqp_client:declare_exchange(
LinkPair,
HeadersXName,
#{type => <<"headers">>,
arguments => #{<<"alternate-exchange">> => {utf8, AlternateXName}}}),

ok = rabbitmq_amqp_client:declare_exchange(LinkPair, AlternateXName, #{type => <<"fanout">>}),

{ok, #{type := <<"quorum">>}} = rabbitmq_amqp_client:declare_queue(
LinkPair,
SourceQName,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>},
<<"x-overflow">> => {utf8, <<"reject-publish">>},
<<"x-dead-letter-strategy">> => {utf8, <<"at-least-once">>},
<<"x-dead-letter-exchange">> => {utf8, <<"amq.headers">>}}}),
<<"x-dead-letter-exchange">> => {utf8, HeadersXName}}}),
{ok, #{type := <<"classic">>}} = rabbitmq_amqp_client:declare_queue(
LinkPair,
AppleQName,
Expand All @@ -617,14 +630,16 @@ modified_dead_letter_headers_exchange(Config) ->
LinkPair,
BananaQName,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, TrashQName, #{}),
ok = rabbitmq_amqp_client:bind_queue(
LinkPair, AppleQName, <<"amq.headers">>, <<>>,
LinkPair, AppleQName, HeadersXName, <<>>,
#{<<"x-fruit">> => {utf8, <<"apple">>},
<<"x-match">> => {utf8, <<"any-with-x">>}}),
ok = rabbitmq_amqp_client:bind_queue(
LinkPair, BananaQName, <<"amq.headers">>, <<>>,
LinkPair, BananaQName, HeadersXName, <<>>,
#{<<"x-fruit">> => {utf8, <<"banana">>},
<<"x-match">> => {utf8, <<"any-with-x">>}}),
ok = rabbitmq_amqp_client:bind_queue(LinkPair, TrashQName, AlternateXName, <<>>, #{}),

{ok, Sender} = amqp10_client:attach_sender_link(
Session, <<"test-sender">>, rabbitmq_amqp_address:queue(SourceQName)),
Expand All @@ -635,6 +650,8 @@ modified_dead_letter_headers_exchange(Config) ->
Session, <<"receiver apple">>, rabbitmq_amqp_address:queue(AppleQName), unsettled),
{ok, ReceiverBanana} = amqp10_client:attach_receiver_link(
Session, <<"receiver banana">>, rabbitmq_amqp_address:queue(BananaQName), unsettled),
{ok, ReceiverTrash} = amqp10_client:attach_receiver_link(
Session, <<"receiver trash">>, rabbitmq_amqp_address:queue(TrashQName), unsettled),

ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>)),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t2">>, <<"m2">>)),
Expand All @@ -644,7 +661,8 @@ modified_dead_letter_headers_exchange(Config) ->
ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations(
#{"x-fruit" => <<"apple">>},
amqp10_msg:new(<<"t4">>, <<"m4">>))),
ok = wait_for_accepts(3),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t5">>, <<"m5">>)),
ok = wait_for_accepts(5),

{ok, Msg1} = amqp10_client:get_msg(Receiver),
?assertMatch(#{delivery_count := 0,
Expand Down Expand Up @@ -685,13 +703,26 @@ modified_dead_letter_headers_exchange(Config) ->
amqp10_msg:headers(MsgBanana2)),
ok = amqp10_client:accept_msg(ReceiverBanana, MsgBanana2),

{ok, Msg5} = amqp10_client:get_msg(Receiver),
%% This message should be routed via the alternate exchange to the trash queue.
ok = amqp10_client:settle_msg(Receiver, Msg5, {modified, false, true, #{<<"x-fruit">> => <<"strawberry">>}}),
{ok, MsgTrash} = amqp10_client:get_msg(ReceiverTrash),
?assertEqual([<<"m5">>], amqp10_msg:body(MsgTrash)),
?assertMatch(#{delivery_count := 0,
first_acquirer := false},
amqp10_msg:headers(MsgTrash)),
ok = amqp10_client:accept_msg(ReceiverTrash, MsgTrash),

ok = detach_link_sync(Sender),
ok = detach_link_sync(Receiver),
ok = detach_link_sync(ReceiverApple),
ok = detach_link_sync(ReceiverBanana),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, SourceQName),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, AppleQName),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, BananaQName),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, TrashQName),
ok = rabbitmq_amqp_client:delete_exchange(LinkPair, HeadersXName),
ok = rabbitmq_amqp_client:delete_exchange(LinkPair, AlternateXName),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).
Expand Down
6 changes: 2 additions & 4 deletions deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,12 @@
replicas => [binary()],
leader => binary()}.

-type queue_properties() :: #{name := binary(),
durable => boolean(),
-type queue_properties() :: #{durable => boolean(),
exclusive => boolean(),
auto_delete => boolean(),
arguments => arguments()}.

-type exchange_properties() :: #{name := binary(),
type => binary(),
-type exchange_properties() :: #{type => binary(),
durable => boolean(),
auto_delete => boolean(),
internal => boolean(),
Expand Down

0 comments on commit 855a32a

Please sign in to comment.