From 7f4334b0d9a0f5c21f60e5433ef26c91886b575d Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 22 Dec 2025 12:20:22 -0800 Subject: [PATCH] Return policy fields and `delivery_limit` when stats are disabled When `management_agent.disable_metrics_collector` and `management.disable_stats` are both set to `true`, the HTTP API returns minimal queue information that excludes policy-related fields and `delivery_limit`. This causes `policy`, `operator_policy`, `effective_policy_definition`, and `delivery_limit` to be `null` in API responses, even though these are configuration metadata rather than statistics. This change adds these four fields to the `format/2` function in both `rabbit_classic_queue` and `rabbit_quorum_queue` modules. The fields now appear in the type-specific formatting that runs regardless of metrics collection status. The three policy fields use existing `i/2` function implementations that call `rabbit_policy` module functions. For `delivery_limit`, this change adds an `i(delivery_limit, Q)` function to `rabbit_quorum_queue` that extracts the delivery limit from the queue's `x-delivery-limit` argument, returning `unlimited` when not set. Fixes #15182 --- deps/rabbit/src/rabbit_classic_queue.erl | 5 +- deps/rabbit/src/rabbit_quorum_queue.erl | 53 ++++---- .../test/rabbit_mgmt_only_http_SUITE.erl | 120 +++++++++++++----- 3 files changed, 123 insertions(+), 55 deletions(-) diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index 0a8cc4116867..8340dcf55d7f 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -273,7 +273,10 @@ format(Q, _Ctx) when ?is_amqqueue(Q) -> end, [{type, rabbit_queue_type:short_alias_of(?MODULE)}, {state, State}, - {node, node(amqqueue:get_pid(Q))}]. + {node, node(amqqueue:get_pid(Q))}, + {policy, i(policy, Q)}, + {operator_policy, i(operator_policy, Q)}, + {effective_policy_definition, i(effective_policy_definition, Q)}]. -spec init(amqqueue:amqqueue()) -> {ok, state()}. init(Q) when ?amqqueue_is_classic(Q) -> diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 89542531f1bd..f9b605c6111f 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -355,21 +355,7 @@ gather_policy_config(Q, IsQueueDeclaration) -> OverflowBin = args_policy_lookup(<<"overflow">>, fun policy_has_precedence/2, Q), Overflow = overflow(OverflowBin, drop_head, QName), MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), - DeliveryLimit = case args_policy_lookup(<<"delivery-limit">>, - fun resolve_delivery_limit/2, Q) of - undefined -> - case IsQueueDeclaration of - true -> - ?LOG_INFO( - "~ts: delivery_limit not set, defaulting to ~b", - [rabbit_misc:rs(QName), ?DEFAULT_DELIVERY_LIMIT]); - false -> - ok - end, - ?DEFAULT_DELIVERY_LIMIT; - DL -> - DL - end, + DeliveryLimit = get_delivery_limit(Q, IsQueueDeclaration), Expires = args_policy_lookup(<<"expires">>, fun min/2, Q), MsgTTL = args_policy_lookup(<<"message-ttl">>, fun min/2, Q), DeadLetterHandler = dead_letter_handler(Q, Overflow), @@ -394,12 +380,6 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> created => erlang:system_time(millisecond) }. -resolve_delivery_limit(PolVal, ArgVal) - when PolVal < 0 orelse ArgVal < 0 -> - max(PolVal, ArgVal); -resolve_delivery_limit(PolVal, ArgVal) -> - min(PolVal, ArgVal). - policy_has_precedence(Policy, _QueueArg) -> Policy. @@ -1908,6 +1888,29 @@ i_totals(Q) when ?is_amqqueue(Q) -> {messages, 0}] end. +resolve_delivery_limit(PolVal, ArgVal) + when PolVal < 0 orelse ArgVal < 0 -> + max(PolVal, ArgVal); +resolve_delivery_limit(PolVal, ArgVal) -> + min(PolVal, ArgVal). + +get_delivery_limit(Q) -> + get_delivery_limit(Q, false). + +get_delivery_limit(Q, ShouldLog) -> + PolicyValue = args_policy_lookup(<<"delivery-limit">>, fun resolve_delivery_limit/2, Q), + get_delivery_limit({handle_policy_value, PolicyValue}, Q, ShouldLog). + +get_delivery_limit({handle_policy_value, undefined}, Q, true) -> + QName = amqqueue:get_name(Q), + ?LOG_INFO("~ts: delivery_limit not set, defaulting to ~b", + [rabbit_misc:rs(QName), ?DEFAULT_DELIVERY_LIMIT]), + ?DEFAULT_DELIVERY_LIMIT; +get_delivery_limit({handle_policy_value, undefined}, _Q, false) -> + ?DEFAULT_DELIVERY_LIMIT; +get_delivery_limit({handle_policy_value, Limit}, _Q, _ShouldLog) when is_integer(Limit) -> + Limit. + i(name, Q) when ?is_amqqueue(Q) -> amqqueue:get_name(Q); i(durable, Q) when ?is_amqqueue(Q) -> amqqueue:is_durable(Q); i(auto_delete, Q) when ?is_amqqueue(Q) -> amqqueue:is_auto_delete(Q); @@ -2042,6 +2045,8 @@ i(message_bytes_dlx, Q) when ?is_amqqueue(Q) -> {timeout, _} -> 0 end; +i(delivery_limit, Q) when ?is_amqqueue(Q) -> + get_delivery_limit(Q); i(_K, _Q) -> ''. open_files(Name) -> @@ -2147,7 +2152,11 @@ format(Q, Ctx) when ?is_amqqueue(Q) -> {node, LeaderNode}, {members, Nodes}, {leader, LeaderNode}, - {online, Online}]. + {online, Online}, + {policy, i(policy, Q)}, + {operator_policy, i(operator_policy, Q)}, + {effective_policy_definition, i(effective_policy_definition, Q)}, + {delivery_limit, i(delivery_limit, Q)}]. -spec quorum_messages(rabbit_amqqueue:name()) -> non_neg_integer(). diff --git a/deps/rabbitmq_management/test/rabbit_mgmt_only_http_SUITE.erl b/deps/rabbitmq_management/test/rabbit_mgmt_only_http_SUITE.erl index a7fbc5192e7a..1e0b8deb7daa 100644 --- a/deps/rabbitmq_management/test/rabbit_mgmt_only_http_SUITE.erl +++ b/deps/rabbitmq_management/test/rabbit_mgmt_only_http_SUITE.erl @@ -38,7 +38,8 @@ all() -> [ {group, all_tests_with_prefix}, {group, all_tests_without_prefix}, - {group, stats_disabled_on_request} + {group, stats_disabled_on_request}, + {group, stats_disabled_via_config} ]. groups() -> @@ -46,6 +47,10 @@ groups() -> {all_tests_with_prefix, [], some_tests() ++ all_tests()}, {all_tests_without_prefix, [], some_tests()}, {stats_disabled_on_request, [], [disable_with_disable_stats_parameter_test]}, + {stats_disabled_via_config, [], [ + classic_queue_with_stats_disabled_test, + quorum_queue_with_stats_disabled_test + ]}, {invalid_config, [], [invalid_config_test]} ]. @@ -130,6 +135,17 @@ init_per_group(all_tests_with_prefix = Group, Config0) -> Config1 = rabbit_ct_helpers:merge_app_env(Config0, PathConfig), Config2 = finish_init(Group, Config1), start_broker(Config2); +init_per_group(stats_disabled_via_config = Group, Config0) -> + Config1 = rabbit_ct_helpers:merge_app_env(Config0, + {rabbitmq_management_agent, [ + {disable_metrics_collector, true} + ]}), + Config2 = rabbit_ct_helpers:merge_app_env(Config1, + {rabbitmq_management, [ + {disable_management_stats, true} + ]}), + Config3 = finish_init(Group, Config2, false), + start_broker(Config3); init_per_group(Group, Config0) -> Config1 = finish_init(Group, Config0), start_broker(Config1). @@ -1442,6 +1458,77 @@ disable_with_disable_stats_parameter_test(Config) -> passed. +classic_queue_with_stats_disabled_test(Config) -> + QArgs = #{arguments => #{'x-max-length' => 100}}, + PolicyArgs = #{pattern => <<".*">>, + definition => #{'max-length' => 1024}, + priority => 0, + 'apply-to' => <<"queues">>}, + OpPolicyArgs = #{pattern => <<".*">>, + definition => #{'max-length' => 2048}, + priority => 0, + 'apply-to' => <<"queues">>}, + + http_put(Config, "/queues/%2F/test-classic-queue", QArgs, {group, '2xx'}), + http_put(Config, "/policies/%2F/test-policy", PolicyArgs, {group, '2xx'}), + http_put(Config, "/operator-policies/%2F/test-op-policy", OpPolicyArgs, {group, '2xx'}), + + await_condition(fun() -> + Queue = http_get(Config, "/queues/%2F/test-classic-queue", ?OK), + maps:get(policy, Queue, undefined) =:= <<"test-policy">> + end), + + Queue = http_get(Config, "/queues/%2F/test-classic-queue", ?OK), + + ?assertEqual(<<"test-policy">>, maps:get(policy, Queue)), + ?assertEqual(<<"test-op-policy">>, maps:get(operator_policy, Queue)), + ?assert(maps:is_key(effective_policy_definition, Queue)), + EffectiveDef = maps:get(effective_policy_definition, Queue), + ?assertEqual(1024, maps:get('max-length', EffectiveDef)), + + http_delete(Config, "/queues/%2F/test-classic-queue", {group, '2xx'}), + http_delete(Config, "/policies/%2F/test-policy", {group, '2xx'}), + http_delete(Config, "/operator-policies/%2F/test-op-policy", {group, '2xx'}), + + passed. + +quorum_queue_with_stats_disabled_test(Config) -> + QArgs = #{durable => true, + arguments => #{'x-queue-type' => 'quorum', + 'x-delivery-limit' => 40}}, + PolicyArgs = #{pattern => <<".*">>, + definition => #{'queue-leader-locator' => <<"balanced">>}, + priority => 0, + 'apply-to' => <<"queues">>}, + OpPolicyArgs = #{pattern => <<".*">>, + definition => #{'max-length' => 1024}, + priority => 0, + 'apply-to' => <<"queues">>}, + + http_put(Config, "/queues/%2F/test-quorum-queue", QArgs, {group, '2xx'}), + http_put(Config, "/policies/%2F/test-policy", PolicyArgs, {group, '2xx'}), + http_put(Config, "/operator-policies/%2F/test-op-policy", OpPolicyArgs, {group, '2xx'}), + + await_condition(fun() -> + Queue = http_get(Config, "/queues/%2F/test-quorum-queue", ?OK), + maps:get(policy, Queue, undefined) =:= <<"test-policy">> + end), + + Queue = http_get(Config, "/queues/%2F/test-quorum-queue", ?OK), + + ?assertEqual(<<"test-policy">>, maps:get(policy, Queue)), + ?assertEqual(<<"test-op-policy">>, maps:get(operator_policy, Queue)), + ?assert(maps:is_key(effective_policy_definition, Queue)), + EffectiveDef = maps:get(effective_policy_definition, Queue), + ?assertEqual(1024, maps:get('max-length', EffectiveDef)), + ?assertEqual(40, maps:get(delivery_limit, Queue)), + + http_delete(Config, "/queues/%2F/test-quorum-queue", {group, '2xx'}), + http_delete(Config, "/policies/%2F/test-policy", {group, '2xx'}), + http_delete(Config, "/operator-policies/%2F/test-op-policy", {group, '2xx'}), + + passed. + sorting_test(Config) -> QArgs = #{}, PermArgs = [{configure, <<".*">>}, {write, <<".*">>}, {read, <<".*">>}], @@ -1600,37 +1687,6 @@ local_port(Conn) -> {ok, Port} = inet:port(Sock), Port. -spawn_invalid(_Config, 0) -> - ok; -spawn_invalid(Config, N) -> - Self = self(), - spawn(fun() -> - timer:sleep(rand:uniform(250)), - {ok, Sock} = gen_tcp:connect("localhost", amqp_port(Config), [list]), - ok = gen_tcp:send(Sock, "Some Data"), - receive_msg(Self) - end), - spawn_invalid(Config, N-1). - -receive_msg(Self) -> - receive - {tcp, _, [$A, $M, $Q, $P | _]} -> - Self ! done - after - 60000 -> - Self ! no_reply - end. - -wait_for_answers(0) -> - ok; -wait_for_answers(N) -> - receive - done -> - wait_for_answers(N-1); - no_reply -> - throw(no_reply) - end. - publish(Ch) -> amqp_channel:call(Ch, #'basic.publish'{exchange = <<"">>, routing_key = <<"myqueue">>},