diff --git a/.github/workflows/authorization-server-make.yaml b/.github/workflows/authorization-server-make.yaml index 0bcc9f5d1130..726d061b14fb 100644 --- a/.github/workflows/authorization-server-make.yaml +++ b/.github/workflows/authorization-server-make.yaml @@ -11,7 +11,7 @@ on: paths: - .github/workflows/authorization-server-make.yaml - selenium/authorization-server - + env: REGISTRY_IMAGE: pivotalrabbitmq/spring-authorization-server IMAGE_TAG: 0.0.11 @@ -19,7 +19,7 @@ jobs: docker: runs-on: ubuntu-latest steps: - + - name: CHECKOUT REPOSITORY uses: actions/checkout@v6 @@ -28,10 +28,10 @@ jobs: with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_PASSWORD }} - + - name: Build and push uses: docker/build-push-action@v6 with: - context: selenium/authorization-server + context: selenium/authorization-server push: true tags: ${{ env.REGISTRY_IMAGE }}:${{ env.IMAGE_TAG }} diff --git a/.github/workflows/test-management-ui-for-pr.yaml b/.github/workflows/test-management-ui-for-pr.yaml index 5ab82fc6a9f9..652adb834b12 100644 --- a/.github/workflows/test-management-ui-for-pr.yaml +++ b/.github/workflows/test-management-ui-for-pr.yaml @@ -78,4 +78,4 @@ jobs: with: name: test-artifacts-${{ matrix.browser }}-${{ matrix.erlang_version }} path: ${{ env.SELENIUM_ARTIFACTS }}/* - \ No newline at end of file + diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index ed5078525d33..6959a3004d23 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2859,6 +2859,11 @@ end}. {mapping, "stream.read_ahead", "rabbit.stream_read_ahead", [{datatype, [{enum, [true, false]}, integer, string]}]}. +{mapping, "stream.read_ahead_limit", "rabbit.stream_read_ahead_limit", [ + {datatype, [integer, string]}, + {validators, ["is_supported_information_unit"]} +]}. + {mapping, "cluster_tags.$tag", "rabbit.cluster_tags", [ {datatype, [binary]} ]}. diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 1219ff886d31..abab85d55aa4 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -2613,6 +2613,7 @@ rejected(QNameBin, down) -> [{{symbol, <<"queue">>}, {utf8, QNameBin}}, {{symbol, <<"reason">>}, {symbol, <<"unavailable">>}}]}}}. + maybe_grant_link_credit(Credit, MaxLinkCredit, DeliveryCount, NumUnconfirmed, Handle) -> case grant_link_credit(Credit, MaxLinkCredit, NumUnconfirmed) of true -> diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index a43371cc816b..32ed8f77e196 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -1638,27 +1638,27 @@ drop_head(#?STATE{ra_indexes = Indexes0} = State0, Effects) -> #?STATE{cfg = #cfg{dead_letter_handler = DLH}, dlx = DlxState} = State = State3, {_, DlxEffects} = rabbit_fifo_dlx:discard([Msg], maxlen, DLH, DlxState), - {State, combine_effects(DlxEffects, Effects)}; + {State, add_drop_head_effects(DlxEffects, Effects)}; empty -> {State0, Effects} end. -%% combine global counter update effects to avoid bulding a huge list of -%% effects if many messages are dropped at the same time as could happen -%% when the `max_length' is changed via a configuration update. -combine_effects([{mod_call, - rabbit_global_counters, - messages_dead_lettered, - [Reason, rabbit_quorum_queue, Type, NewLen]}], - [{mod_call, - rabbit_global_counters, - messages_dead_lettered, - [Reason, rabbit_quorum_queue, Type, PrevLen]} | Rem]) -> +add_drop_head_effects([{mod_call, + rabbit_global_counters, + messages_dead_lettered, + [Reason, rabbit_quorum_queue, Type, NewLen]}], + [{mod_call, + rabbit_global_counters, + messages_dead_lettered, + [Reason, rabbit_quorum_queue, Type, PrevLen]} | Rem]) -> + %% combine global counter update effects to avoid bulding a huge list of + %% effects if many messages are dropped at the same time as could happen + %% when the `max_length' is changed via a configuration update. [{mod_call, rabbit_global_counters, messages_dead_lettered, [Reason, rabbit_quorum_queue, Type, PrevLen + NewLen]} | Rem]; -combine_effects(New, Old) -> +add_drop_head_effects(New, Old) -> New ++ Old. maybe_set_msg_ttl(Msg, RaCmdTs, Header, diff --git a/deps/rabbit/src/rabbit_plugins.erl b/deps/rabbit/src/rabbit_plugins.erl index ff50bee83a08..5b35e2447e17 100644 --- a/deps/rabbit/src/rabbit_plugins.erl +++ b/deps/rabbit/src/rabbit_plugins.erl @@ -175,6 +175,7 @@ list() -> PluginsPath = plugins_dir(), list(PluginsPath). + %% @doc Get the list of plugins which are ready to be enabled. -spec list(string()) -> [#plugin{}]. diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 817ee1975bd4..31b8c01f7b2a 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -1515,29 +1515,20 @@ shrink_all(Node) -> amqqueue:get_type(Q) == ?MODULE, lists:member(Node, get_nodes(Q))]. - +-spec grow(node() | integer(), binary(), binary(), all | even) -> + [{rabbit_amqqueue:name(), + {ok, pos_integer()} | {error, pos_integer(), term()}}]. grow(Node, VhostSpec, QueueSpec, Strategy) -> grow(Node, VhostSpec, QueueSpec, Strategy, promotable). --spec grow(node(), binary(), binary(), all | even, membership()) -> +-spec grow(node() | integer(), binary(), binary(), all | even, membership()) -> [{rabbit_amqqueue:name(), {ok, pos_integer()} | {error, pos_integer(), term()}}]. -grow(Node, VhostSpec, QueueSpec, Strategy, Membership) -> +grow(Node, VhostSpec, QueueSpec, Strategy, Membership) when is_atom(Node) -> Running = rabbit_nodes:list_running(), [begin Size = length(get_nodes(Q)), - QName = amqqueue:get_name(Q), - ?LOG_INFO("~ts: adding a new member (replica) on node ~w", - [rabbit_misc:rs(QName), Node]), - case add_member(Q, Node, Membership) of - ok -> - {QName, {ok, Size + 1}}; - {error, Err} -> - ?LOG_WARNING( - "~ts: failed to add member (replica) on node ~w, error: ~w", - [rabbit_misc:rs(QName), Node, Err]), - {QName, {error, Size, Err}} - end + maybe_grow(Q, Node, Membership, Size) end || Q <- rabbit_amqqueue:list(), amqqueue:get_type(Q) == ?MODULE, @@ -1547,7 +1538,91 @@ grow(Node, VhostSpec, QueueSpec, Strategy, Membership) -> lists:member(Node, Running), matches_strategy(Strategy, get_nodes(Q)), is_match(amqqueue:get_vhost(Q), VhostSpec) andalso - is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]. + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]; + +grow(QuorumClusterSize, VhostSpec, QueueSpec, Strategy, Membership) + when is_integer(QuorumClusterSize), QuorumClusterSize > 0 -> + Running = rabbit_nodes:list_running(), + TotalRunning = length(Running), + + TargetQuorumClusterSize = + if QuorumClusterSize > TotalRunning -> + %% we can't grow beyond total running nodes + TotalRunning; + true -> + QuorumClusterSize + end, + + lists:flatten( + [begin + QNodes = get_nodes(Q), + case length(QNodes) of + Size when Size < TargetQuorumClusterSize -> + TargetAvailableNodes = Running -- QNodes, + N = length(TargetAvailableNodes), + Node = lists:nth(rand:uniform(N), TargetAvailableNodes), + maybe_grow(Q, Node, Membership, Size); + _ -> + [] + end + end + || _ <- lists:seq(1, TargetQuorumClusterSize), + Q <- rabbit_amqqueue:list(), + amqqueue:get_type(Q) == ?MODULE, + matches_strategy(Strategy, get_nodes(Q)), + is_match(amqqueue:get_vhost(Q), VhostSpec) andalso + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)]); + +grow(QuorumClusterSize, _VhostSpec, _QueueSpec, _Strategy, _Membership) + when is_integer(QuorumClusterSize) -> + rabbit_log:warning( + "cannot grow queues to a quorum cluster size less than zero (~tp)", + [QuorumClusterSize]), + {error, bad_quorum_cluster_size}. + +maybe_grow(Q, Node, Membership, Size) -> + QNodes = get_nodes(Q), + maybe_grow(Q, Node, Membership, Size, QNodes). + +maybe_grow(Q, Node, Membership, Size, QNodes) -> + QName = amqqueue:get_name(Q), + {ok, RaName} = qname_to_internal_name(QName), + case check_all_memberships(RaName, QNodes, voter) of + true -> + ?LOG_INFO("~ts: adding a new member (replica) on node ~w", + [rabbit_misc:rs(QName), Node]), + case add_member(Q, Node, Membership) of + ok -> + {QName, {ok, Size + 1}}; + {error, Err} -> + ?LOG_WARNING( + "~ts: failed to add member (replica) on node ~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]), + {QName, {error, Size, Err}} + end; + false -> + Err = {error, non_voters_found}, + ?LOG_WARNING( + "~ts: failed to add member (replica) on node ~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]), + {QName, {error, Size, Err}} + end. + +%% Compare local membership states of all nodes in parallel. +%% +%% Note a few things: +%% 1. This function intentionally queries local member state and not the leader +%% 2. ra:key_metrics/1 is sequential and not parallel +%% 3. ra:key_metrics/1 is not multicall-friendly because it relies on erlang:node/0 +check_all_memberships(RaName, QNodes, CompareMembership) -> + case rpc:multicall(QNodes, ets, lookup, [ra_state, RaName]) of + {Result, []} -> + lists:all( + fun(M) -> M == CompareMembership end, + [Membership || [{_RaName, _RaState, Membership}] <- Result]); + _ -> + false + end. -spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}. diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 863b9d77dd61..9572a66234a3 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -535,6 +535,7 @@ credit(QName, CTag, DeliveryCountRcv, LinkCreditRcv, Drain, supports_stateful_delivery() -> true. + deliver(QSs, Msg, Options) -> lists:foldl( fun({Q, stateless}, {Qs, Actions}) -> diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 980f1d48cac3..717e40d27ab7 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -116,7 +116,8 @@ groups() -> node_removal_is_not_quorum_critical, select_nodes_with_least_replicas, select_nodes_with_least_replicas_node_down, - subscribe_from_each + subscribe_from_each, + grow_queue ]}, @@ -1768,6 +1769,116 @@ dont_leak_file_handles(Config) -> rabbit_ct_client_helpers:close_channel(C), ok. +grow_queue(Config) -> + [Server0, Server1, Server2, _Server3, _Server4] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + AQ = ?config(alt_queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-quorum-initial-group-size">>, long, 5}])), + ?assertEqual({'queue.declare_ok', AQ, 0, 0}, + declare(Ch, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-quorum-initial-group-size">>, long, 5}])), + + QQs = [QQ, AQ], + MsgCount = 3, + + [begin + RaName = ra_name(Q), + rabbit_ct_client_helpers:publish(Ch, Q, MsgCount), + wait_for_messages_ready([Server0], RaName, MsgCount), + {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(5, length(Nodes0)) + end || Q <- QQs], + + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_all_queues_shrink_member_to_current_member, []), + + TargetClusterSize_1 = 1, + assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount), + + %% grow queues to node 'Server1' + TargetClusterSize_2 = 2, + Result1 = rpc:call(Server0, rabbit_quorum_queue, grow, [Server1, <<"/">>, <<".*">>, all]), + %% [{{resource,<<"/">>,queue,<<"grow_queue">>},{ok,2}}, + %% {{resource,<<"/">>,queue,<<"grow_queue_alt">>},{ok,2}},...] + ?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result1)), + assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount), + + %% grow queues to quorum cluster size '2' has no effect + Result2 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_2, <<"/">>, <<".*">>, all]), + ?assertEqual([], Result2), + assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount), + + %% grow queues to quorum cluster size '3' + TargetClusterSize_3 = 3, + Result3 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_3, <<"/">>, <<".*">>, all, voter]), + ?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result3)), + assert_grown_queues(QQs, Server0, TargetClusterSize_3, MsgCount), + + %% grow queues to quorum cluster size '5' + TargetClusterSize_5 = 5, + Result4 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_5, <<"/">>, <<".*">>, all, voter]), + ?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result4)), + assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount), + + %% shrink all queues again down to 1 member + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_all_queues_shrink_member_to_current_member, []), + assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount), + + %% grow queues to quorum cluster size > '5' (limit = 5). + TargetClusterSize_10 = 10, + Result5 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_10, <<"/">>, <<".*">>, all]), + ?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result5)), + assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount), + + %% shrink all queues again down to 1 member + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_all_queues_shrink_member_to_current_member, []), + assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount), + + %% attempt to grow queues to quorum cluster size < '0'. + BadTargetClusterSize = -5, + ?assertEqual({error, bad_quorum_cluster_size}, + rpc:call(Server0, rabbit_quorum_queue, grow, [BadTargetClusterSize, <<"/">>, <<".*">>, all])), + + %% shrink all queues again down to 1 member + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_all_queues_shrink_member_to_current_member, []), + assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount), + + %% grow queues to node 'Server1': non_voter + rpc:call(Server0, rabbit_quorum_queue, grow, [Server1, <<"/">>, <<".*">>, all, non_voter]), + assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount), + + %% grow queues to node 'Server2': fail, non_voters found + Result6 = rpc:call(Server0, rabbit_quorum_queue, grow, [Server2, <<"/">>, <<".*">>, all, voter]), + %% [{{resource,<<"/">>,queue,<<"grow_queue">>},{error, 2, {error, non_voters_found}}, + %% {{resource,<<"/">>,queue,<<"grow_queue_alt">>},{error, 2, {error, non_voters_found}},...] + ?assert(lists:all( + fun({_, Err}) -> Err =:= {error, TargetClusterSize_2, {error, non_voters_found}} end, Result6)), + assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount), + + %% grow queues to target quorum cluster size '5': fail, non_voters found + Result7 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_5, <<"/">>, <<".*">>, all]), + ?assert(lists:all( + fun({_, Err}) -> Err =:= {error, TargetClusterSize_2, {error, non_voters_found}} end, Result7)), + assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount). + +assert_grown_queues(Qs, Node, TargetClusterSize, MsgCount) -> + [begin + RaName = ra_name(Q), + wait_for_messages_ready([Node], RaName, MsgCount), + {ok, Q0} = rpc:call(Node, rabbit_amqqueue, lookup, [Q, <<"/">>]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(TargetClusterSize, length(Nodes0)) + end || Q <- Qs]. + gh_12635(Config) -> % https://github.com/rabbitmq/rabbitmq-server/issues/12635 [Server0, _Server1, Server2] = diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_to_count_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_to_count_command.ex new file mode 100644 index 000000000000..854347799e98 --- /dev/null +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_to_count_command.ex @@ -0,0 +1,183 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +defmodule RabbitMQ.CLI.Queues.Commands.GrowToCountCommand do + alias RabbitMQ.CLI.Core.{DocGuide, Validators} + import RabbitMQ.CLI.Core.DataCoercion + + @behaviour RabbitMQ.CLI.CommandBehaviour + + defp default_opts, + do: %{vhost_pattern: ".*", queue_pattern: ".*", membership: "promotable", errors_only: false} + + def switches(), + do: [ + vhost_pattern: :string, + queue_pattern: :string, + membership: :string, + errors_only: :boolean + ] + + def merge_defaults(args, opts) do + args = + case args do + [n | rem] when is_binary(n) -> + case Integer.parse(n) do + {i, ""} -> [i | rem] + _ -> args + end + + _ -> + args + end + + {args, Map.merge(default_opts(), opts)} + end + + def validate(args, _) when length(args) < 2 do + {:validation_failure, :not_enough_args} + end + + def validate(args, _) when length(args) > 2 do + {:validation_failure, :too_many_args} + end + + def validate([_, s], _) + when not (s == "all" or + s == "even") do + {:validation_failure, "strategy '#{s}' is not recognised."} + end + + def validate([n, _], _) + when (is_integer(n) and n <= 0) do + {:validation_failure, "node count '#{n}' must be greater than 0."} + end + + def validate(_, %{membership: m}) + when not (m == "promotable" or + m == "non_voter" or + m == "voter") do + {:validation_failure, "voter status '#{m}' is not recognised."} + end + + def validate(_, _) do + :ok + end + + def validate_execution_environment(args, opts) do + Validators.rabbit_is_running(args, opts) + end + + def run([node_count, strategy], %{ + node: node_name, + vhost_pattern: vhost_pat, + queue_pattern: queue_pat, + membership: membership, + errors_only: errors_only + }) when is_integer(node_count) do + + args = [node_count, vhost_pat, queue_pat, to_atom(strategy)] + + args = + case to_atom(membership) do + :promotable -> args + other -> args ++ [other] + end + + case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :grow, args) do + {:error, _} = error -> + error + + {:badrpc, _} = error -> + error + + results when errors_only -> + for {{:resource, vhost, _kind, name}, {:error, _, _} = res} <- results, + do: [ + {:vhost, vhost}, + {:name, name}, + {:size, format_size(res)}, + {:result, format_result(res)} + ] + + results -> + for {{:resource, vhost, _kind, name}, res} <- results, + do: [ + {:vhost, vhost}, + {:name, name}, + {:size, format_size(res)}, + {:result, format_result(res)} + ] + end + end + + use RabbitMQ.CLI.DefaultOutput + + def formatter(), do: RabbitMQ.CLI.Formatters.Table + + def usage, + do: + "grow_to_count [--vhost-pattern ] [--queue-pattern ] [--membership ]" + + def usage_additional do + [ + ["", "number of nodes to place replicas on"], + [ + "", + "add a member for all matching queues or just those whose membership count is an even number" + ], + ["--queue-pattern ", "regular expression to match queue names"], + ["--vhost-pattern ", "regular expression to match virtual host names"], + ["--membership ", "add a promotable non-voter (default) or full voter"], + ["--errors-only", "only list queues which reported an error"] + ] + end + + def usage_doc_guides() do + [ + DocGuide.quorum_queues() + ] + end + + def help_section, do: :cluster_management + + def description, + do: + "Grows quorum queue clusters by adding member replicas on the specified number of nodes for all matching queues" + + def banner([node_count, strategy], _) do + "Growing #{strategy} quorum queues on #{node_count} nodes..." + end + + # + # Implementation + # + + defp format_size({:ok, size}) do + size + end + + defp format_size({:error, _size, :timeout}) do + # the actual size is uncertain here + "?" + end + + defp format_size({:error, size, _}) do + size + end + + defp format_result({:ok, _size}) do + "ok" + end + + defp format_result({:error, _size, :timeout}) do + "error: the operation timed out and may not have been completed" + end + + defp format_result({:error, _size, err}) do + to_string(:io_lib.format("error: ~W", [err, 10])) + end +end diff --git a/deps/rabbitmq_cli/test/queues/grow_command_test.exs b/deps/rabbitmq_cli/test/queues/grow_command_test.exs index 2b1aab070317..b4b8ada8acb7 100644 --- a/deps/rabbitmq_cli/test/queues/grow_command_test.exs +++ b/deps/rabbitmq_cli/test/queues/grow_command_test.exs @@ -44,51 +44,51 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do end test "validate: when one argument is provided, returns a failure" do - assert @command.validate(["quorum-queue-a"], %{}) == {:validation_failure, :not_enough_args} + assert @command.validate(["target@node"], %{}) == {:validation_failure, :not_enough_args} end test "validate: when a node and even are provided, returns a success" do - assert @command.validate(["quorum-queue-a", "even"], %{}) == :ok + assert @command.validate(["target@node", "even"], %{}) == :ok end test "validate: when a node and all are provided, returns a success" do - assert @command.validate(["quorum-queue-a", "all"], %{}) == :ok + assert @command.validate(["target@node", "all"], %{}) == :ok end test "validate: when a node and something else is provided, returns a failure" do - assert @command.validate(["quorum-queue-a", "banana"], %{}) == + assert @command.validate(["target@node", "banana"], %{}) == {:validation_failure, "strategy 'banana' is not recognised."} end test "validate: when three arguments are provided, returns a failure" do - assert @command.validate(["quorum-queue-a", "extra-arg", "another-extra-arg"], %{}) == + assert @command.validate(["target@node", "extra-arg", "another-extra-arg"], %{}) == {:validation_failure, :too_many_args} end test "validate: when membership promotable is provided, returns a success" do - assert @command.validate(["quorum-queue-a", "all"], %{membership: "promotable"}) == :ok + assert @command.validate(["target@node", "all"], %{membership: "promotable", queue_pattern: "qq.*"}) == :ok end test "validate: when membership voter is provided, returns a success" do - assert @command.validate(["quorum-queue-a", "all"], %{membership: "voter"}) == :ok + assert @command.validate(["target@node", "all"], %{membership: "voter", queue_pattern: "qq.*"}) == :ok end test "validate: when membership non_voter is provided, returns a success" do - assert @command.validate(["quorum-queue-a", "all"], %{membership: "non_voter"}) == :ok + assert @command.validate(["target@node", "all"], %{membership: "non_voter", queue_pattern: "qq.*"}) == :ok end test "validate: when wrong membership is provided, returns failure" do - assert @command.validate(["quorum-queue-a", "all"], %{membership: "banana"}) == + assert @command.validate(["target@node", "all"], %{membership: "banana", queue_pattern: "qq.*"}) == {:validation_failure, "voter status 'banana' is not recognised."} end @tag test_timeout: 3000 - test "run: targeting an unreachable node throws a badrpc", context do + test "run: targeting an unreachable node throws a badrpc when growing to a target node", context do assert match?( {:badrpc, _}, @command.run( - ["quorum-queue-a", "all"], - Map.merge(context[:opts], %{node: :jake@thedog}) + ["target@node", "all"], + Map.merge(context[:opts], %{node: :jake@thedog, queue_pattern: "qq.*"}) ) ) end diff --git a/deps/rabbitmq_cli/test/queues/grow_to_count_command_test.exs b/deps/rabbitmq_cli/test/queues/grow_to_count_command_test.exs new file mode 100644 index 000000000000..861530f9a3c8 --- /dev/null +++ b/deps/rabbitmq_cli/test/queues/grow_to_count_command_test.exs @@ -0,0 +1,109 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +defmodule RabbitMQ.CLI.Queues.Commands.GrowToCountCommandTest do + use ExUnit.Case, async: false + import TestHelper + + @command RabbitMQ.CLI.Queues.Commands.GrowToCountCommand + + setup_all do + RabbitMQ.CLI.Core.Distribution.start() + + :ok + end + + setup context do + {:ok, + opts: %{ + node: get_rabbit_hostname(), + timeout: context[:test_timeout] || 30000, + vhost_pattern: ".*", + queue_pattern: ".*", + membership: "promotable", + errors_only: false + }} + end + + test "merge_defaults: defaults to reporting complete results" do + assert @command.merge_defaults([], %{}) == + {[], + %{ + vhost_pattern: ".*", + queue_pattern: ".*", + errors_only: false, + membership: "promotable" + }} + end + + test "validate: when no arguments are provided, returns a failure" do + assert @command.validate([], %{}) == {:validation_failure, :not_enough_args} + end + + test "validate: when one argument is provided, returns a failure" do + assert @command.validate([5], %{}) == {:validation_failure, :not_enough_args} + end + + test "validate: when node count and even are provided, returns a success" do + assert @command.validate([7, "even"], %{}) == :ok + end + + test "validate: when node count and all are provided, returns a success" do + assert @command.validate([5, "all"], %{}) == :ok + end + + test "validate: when node count and something else is provided, returns a failure" do + assert @command.validate([7, "banana"], %{}) == + {:validation_failure, "strategy 'banana' is not recognised."} + end + + test "validate: when three arguments are provided, returns a failure" do + assert @command.validate([7, "extra-arg", "another-extra-arg"], %{}) == + {:validation_failure, :too_many_args} + end + + test "validate: when membership promotable is provided, returns a success" do + assert @command.validate([5, "all"], %{membership: "promotable"}) == :ok + end + + test "validate: when membership voter is provided, returns a success" do + assert @command.validate([7, "all"], %{membership: "voter"}) == :ok + end + + test "validate: when membership non_voter is provided, returns a success" do + assert @command.validate([5, "all"], %{membership: "non_voter"}) == :ok + end + + test "validate: when wrong membership is provided, returns failure" do + assert @command.validate([7, "all"], %{membership: "banana"}) == + {:validation_failure, "voter status 'banana' is not recognised."} + end + + test "validate: when node count greater than zero, returns a success" do + assert @command.validate([7, "all"], %{membership: "voter"}) == :ok + end + + test "validate: when node count is zero, returns failure" do + assert @command.validate([0, "all"], %{membership: "voter"}) == + {:validation_failure, "node count '0' must be greater than 0."} + end + + test "validate: when node count is less than zero, returns failure" do + assert @command.validate([-1, "all"], %{membership: "voter"}) == + {:validation_failure, "node count '-1' must be greater than 0."} + end + + @tag test_timeout: 3000 + test "run: targeting an unreachable node throws a badrpc when growing to a node count", context do + assert match?( + {:badrpc, _}, + @command.run( + [5, "all"], + Map.merge(context[:opts], %{node: :jake@thedog}) + ) + ) + end +end diff --git a/deps/rabbitmq_mqtt/test/auth_SUITE.erl b/deps/rabbitmq_mqtt/test/auth_SUITE.erl index b304886a86d3..3a254f153cf8 100644 --- a/deps/rabbitmq_mqtt/test/auth_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/auth_SUITE.erl @@ -1268,6 +1268,7 @@ count_connections_per_vhost(Config) -> rabbit_connection_tracking, count_local_tracked_items_in_vhost, [<<"/">>]). + vhost_queue_limit(Config) -> ok = rabbit_ct_broker_helpers:set_vhost_limit(Config, 0, <<"/">>, max_queues, 1), {ok, C} = connect_anonymous(Config), diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_raft_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_raft_metrics_collector.erl index d1d868304ddf..3d2ca5eb7788 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_raft_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_raft_metrics_collector.erl @@ -2,7 +2,7 @@ %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% -module(prometheus_rabbitmq_raft_metrics_collector). diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index 52357fad52aa..ae7a93521cca 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -884,6 +884,7 @@ detailed_raft_metrics_test(Config) -> ok. + basic_auth(Config) -> http_get(Config, [{"accept-encoding", "deflate"}], 401), AuthHeader = rabbit_mgmt_test_util:auth_header("guest", "guest"), diff --git a/deps/rabbitmq_shovel/priv/schema/rabbitmq_shovel.schema b/deps/rabbitmq_shovel/priv/schema/rabbitmq_shovel.schema index 73d2744c006c..a0d2da032a26 100644 --- a/deps/rabbitmq_shovel/priv/schema/rabbitmq_shovel.schema +++ b/deps/rabbitmq_shovel/priv/schema/rabbitmq_shovel.schema @@ -1,5 +1,5 @@ %% ---------------------------------------------------------------------------- -%% RabbitMQ Shovel plugin +%% RabbitMQ Shovel settings %% ---------------------------------------------------------------------------- {mapping, "shovel.operating_mode", "rabbitmq_shovel.operating_mode", [ diff --git a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl index 3f25d7c5c407..af5ac02e9988 100644 --- a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl @@ -598,6 +598,8 @@ status(State) -> pending_count(#{dest := #{pending_delivery := Pending}}) -> lqueue:len(Pending); +pending_count(#{source := #{current := #{unacked_message_q := UAMQ}}}) -> + ?QUEUE:len(UAMQ); pending_count(_) -> 0. diff --git a/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl index 77904c9af91a..b5b4da01f74a 100644 --- a/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl @@ -69,6 +69,9 @@ end_per_suite(Config) -> rabbit_ct_broker_helpers:teardown_steps()). init_per_group(_, Config) -> + [Node] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + ok = rabbit_ct_broker_helpers:enable_feature_flag( + Config, [Node], 'rabbitmq_4.0.0'), Config. end_per_group(_, Config) -> diff --git a/deps/rabbitmq_shovel/test/shovel_status_command_SUITE.erl b/deps/rabbitmq_shovel/test/shovel_status_command_SUITE.erl index 9d2ec522c099..988c6ba785c7 100644 --- a/deps/rabbitmq_shovel/test/shovel_status_command_SUITE.erl +++ b/deps/rabbitmq_shovel/test/shovel_status_command_SUITE.erl @@ -39,7 +39,10 @@ groups() -> init_per_suite(Config) -> rabbit_ct_helpers:log_environment(), Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, ?MODULE} + {rmq_nodename_suffix, ?MODULE}, + {ignored_crashes, [ + "stopping because dependent process" + ]} ]), Config2 = rabbit_ct_helpers:run_setup_steps(Config1, rabbit_ct_broker_helpers:setup_steps() ++ diff --git a/release-notes/4.2.0.md b/release-notes/4.2.0.md index aa68fc371a1e..0ef3e6160bdd 100644 --- a/release-notes/4.2.0.md +++ b/release-notes/4.2.0.md @@ -72,6 +72,7 @@ NOT cancelled To learn more, check out the new documentation guide on [Stream Filtering](https://www.rabbitmq.com/docs/next/stream-filtering). + Pull Request: [#14184](https://github.com/rabbitmq/rabbitmq-server/pull/14184) ### Direct Reply-To for AMQP 1.0 diff --git a/release-notes/4.3.0.md b/release-notes/4.3.0.md index cf2ede9d4d17..75a2aff1c776 100644 --- a/release-notes/4.3.0.md +++ b/release-notes/4.3.0.md @@ -72,6 +72,26 @@ compared to other versions. ### Core Server +#### Enhancements + + * When a message is rejected by a queue, RabbitMQ now provides the queue name and rejection reason to AMQP 1.0 publishers + in the `Rejected` outcome. This is particularly useful when multiple queues are bound to an exchange, as it allows + publishers to identify which specific queue out of several target queues rejected the message and why + (e.g., maximum queue length reached or queue unavailable). Previously, publishers had no way to determine which queue + rejected their message or the reason for rejection. + + The queue name and reason are included in the `info` field of the `Rejected` outcome's `error` field: + * `queue: ` + * `reason: maxlen | unavailable` + + GitHub issue: [#15075](https://github.com/rabbitmq/rabbitmq-server/pull/15075) + +#### Bug Fixes + + * Quorum queue at-most-once dead lettering for the overflow behaviour `drop-head` now happens in the correct order. + + GitHub issue: [#14926](https://github.com/rabbitmq/rabbitmq-server/pull/14926) + #### Enhancements * When a message is rejected by a queue, RabbitMQ now provides the queue name and rejection reason to AMQP 1.0 publishers