diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index 0a8cc4116867..d6aff02eb341 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -264,7 +264,7 @@ stat(Q) -> {gen_server2, call, [stat, infinity]}). -format(Q, _Ctx) when ?is_amqqueue(Q) -> +format(Q, Ctx) when ?is_amqqueue(Q) -> State = case amqqueue:get_state(Q) of live -> running; @@ -273,7 +273,18 @@ format(Q, _Ctx) when ?is_amqqueue(Q) -> end, [{type, rabbit_queue_type:short_alias_of(?MODULE)}, {state, State}, - {node, node(amqqueue:get_pid(Q))}]. + {node, node(amqqueue:get_pid(Q))} + | format_policy_fields(Q, Ctx)]. + +format_policy_fields(Q, Ctx) -> + case maps:get(management_stats_disabled, Ctx, true) of + true -> + [{policy, i(policy, Q)}, + {operator_policy, i(operator_policy, Q)}, + {effective_policy_definition, i(effective_policy_definition, Q)}]; + false -> + [] + end. -spec init(amqqueue:amqqueue()) -> {ok, state()}. init(Q) when ?amqqueue_is_classic(Q) -> diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 89542531f1bd..ffa84611f54e 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -355,21 +355,7 @@ gather_policy_config(Q, IsQueueDeclaration) -> OverflowBin = args_policy_lookup(<<"overflow">>, fun policy_has_precedence/2, Q), Overflow = overflow(OverflowBin, drop_head, QName), MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), - DeliveryLimit = case args_policy_lookup(<<"delivery-limit">>, - fun resolve_delivery_limit/2, Q) of - undefined -> - case IsQueueDeclaration of - true -> - ?LOG_INFO( - "~ts: delivery_limit not set, defaulting to ~b", - [rabbit_misc:rs(QName), ?DEFAULT_DELIVERY_LIMIT]); - false -> - ok - end, - ?DEFAULT_DELIVERY_LIMIT; - DL -> - DL - end, + DeliveryLimit = get_delivery_limit(Q, IsQueueDeclaration), Expires = args_policy_lookup(<<"expires">>, fun min/2, Q), MsgTTL = args_policy_lookup(<<"message-ttl">>, fun min/2, Q), DeadLetterHandler = dead_letter_handler(Q, Overflow), @@ -394,12 +380,6 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> created => erlang:system_time(millisecond) }. -resolve_delivery_limit(PolVal, ArgVal) - when PolVal < 0 orelse ArgVal < 0 -> - max(PolVal, ArgVal); -resolve_delivery_limit(PolVal, ArgVal) -> - min(PolVal, ArgVal). - policy_has_precedence(Policy, _QueueArg) -> Policy. @@ -1908,6 +1888,31 @@ i_totals(Q) when ?is_amqqueue(Q) -> {messages, 0}] end. +resolve_delivery_limit(PolVal, ArgVal) + when PolVal < 0 orelse ArgVal < 0 -> + max(PolVal, ArgVal); +resolve_delivery_limit(PolVal, ArgVal) -> + min(PolVal, ArgVal). + +get_delivery_limit(Q) -> + get_delivery_limit(Q, false). + +get_delivery_limit(Q, ShouldLog) -> + PolicyValue = args_policy_lookup(<<"delivery-limit">>, fun resolve_delivery_limit/2, Q), + get_delivery_limit({handle_policy_value, PolicyValue}, Q, ShouldLog). + +get_delivery_limit({handle_policy_value, undefined}, Q, true) -> + QName = amqqueue:get_name(Q), + ?LOG_INFO("~ts: delivery_limit not set, defaulting to ~b", + [rabbit_misc:rs(QName), ?DEFAULT_DELIVERY_LIMIT]), + ?DEFAULT_DELIVERY_LIMIT; +get_delivery_limit({handle_policy_value, undefined}, _Q, false) -> + ?DEFAULT_DELIVERY_LIMIT; +get_delivery_limit({handle_policy_value, Limit}, _Q, _ShouldLog) when is_integer(Limit) -> + Limit; +get_delivery_limit({handle_policy_value, _Other}, _Q, _ShouldLog) -> + ?DEFAULT_DELIVERY_LIMIT. + i(name, Q) when ?is_amqqueue(Q) -> amqqueue:get_name(Q); i(durable, Q) when ?is_amqqueue(Q) -> amqqueue:is_durable(Q); i(auto_delete, Q) when ?is_amqqueue(Q) -> amqqueue:is_auto_delete(Q); @@ -2042,6 +2047,8 @@ i(message_bytes_dlx, Q) when ?is_amqqueue(Q) -> {timeout, _} -> 0 end; +i(delivery_limit, Q) when ?is_amqqueue(Q) -> + get_delivery_limit(Q); i(_K, _Q) -> ''. open_files(Name) -> @@ -2147,7 +2154,19 @@ format(Q, Ctx) when ?is_amqqueue(Q) -> {node, LeaderNode}, {members, Nodes}, {leader, LeaderNode}, - {online, Online}]. + {online, Online} + | format_policy_fields(Q, Ctx)]. + +format_policy_fields(Q, Ctx) -> + case maps:get(management_stats_disabled, Ctx, true) of + true -> + [{policy, i(policy, Q)}, + {operator_policy, i(operator_policy, Q)}, + {effective_policy_definition, i(effective_policy_definition, Q)}, + {delivery_limit, i(delivery_limit, Q)}]; + false -> + [] + end. -spec quorum_messages(rabbit_amqqueue:name()) -> non_neg_integer(). diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index fc61e3f5e8f3..c5be92a507b7 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -303,10 +303,22 @@ format(Q, Ctx) -> {leader, LeaderNode}, {online, Online}, {members, Nodes}, - {node, node(Pid)}]; + {node, node(Pid)} + | format_policy_fields(Q, Ctx)]; _ -> [{type, rabbit_queue_type:short_alias_of(?MODULE)}, - {state, down}] + {state, down} + | format_policy_fields(Q, Ctx)] + end. + +format_policy_fields(Q, Ctx) -> + case maps:get(management_stats_disabled, Ctx, true) of + true -> + [{policy, i(policy, Q)}, + {operator_policy, i(operator_policy, Q)}, + {effective_policy_definition, i(effective_policy_definition, Q)}]; + false -> + [] end. consume(Q, #{mode := {simple_prefetch, 0}}, _) diff --git a/deps/rabbit_common/src/rabbit_json.erl b/deps/rabbit_common/src/rabbit_json.erl index d884dc296e43..903ab4a281e7 100644 --- a/deps/rabbit_common/src/rabbit_json.erl +++ b/deps/rabbit_common/src/rabbit_json.erl @@ -46,7 +46,7 @@ encode(Term) -> encode(Term, Opts) -> %% Fixup for JSON encoding %% * Transforms any Funs into strings - %% See rabbit_mgmt_format:format_nulls/1 + %% See rabbit_mgmt_format:prepare_for_encoding/1 F = fun (V) when is_function(V) -> rabbit_data_coercion:to_binary(V); diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_db.erl b/deps/rabbitmq_management/src/rabbit_mgmt_db.erl index 119c71c28210..5b12691a9481 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_db.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_db.erl @@ -665,10 +665,13 @@ node_stats(Ranges, Objs, Interval) -> end || Obj <- Objs]. combine(New, Old) -> + NewKeys = [K || {K, _} <- New], case pget(state, Old) of - unknown -> New ++ Old; - live -> New ++ delete_keys([state, online], Old); - _ -> lists:keydelete(state, 1, New) ++ Old + unknown -> New ++ delete_keys(NewKeys, Old); + live -> New ++ delete_keys([state, online | NewKeys], Old); + _ -> + NewKeysNoState = lists:delete(state, NewKeys), + lists:keydelete(state, 1, New) ++ delete_keys(NewKeysNoState, Old) end. delete_keys(Keys, List) -> diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_oauth_bootstrap.erl b/deps/rabbitmq_management/src/rabbit_mgmt_oauth_bootstrap.erl index 524869b33587..f659e2ab403e 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_oauth_bootstrap.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_oauth_bootstrap.erl @@ -103,7 +103,7 @@ validate_auth_mechanism({Type, <<"basic">>}, _AuthSettings) -> validate_auth_mechanism({_, _}, _AuthSettings) -> {error, unknown_auth_mechanism}. set_oauth_settings(AuthSettings) -> - JsonAuthSettings = rabbit_json:encode(rabbit_mgmt_format:format_nulls(AuthSettings)), + JsonAuthSettings = rabbit_json:encode(rabbit_mgmt_format:prepare_for_encoding(AuthSettings)), ["set_oauth_settings(", JsonAuthSettings, ");"]. set_token_auth(AuthSettings, Req0) -> diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_util.erl b/deps/rabbitmq_management/src/rabbit_mgmt_util.erl index 3ccbba874a3b..dae4ea597cf9 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_util.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_util.erl @@ -49,7 +49,7 @@ -export([catch_no_such_user_or_vhost/2]). -export([method_not_allowed/3]). --export([disable_stats/1, enable_queue_totals/1]). +-export([disable_stats/0, disable_stats/1, enable_queue_totals/1]). -export([set_resp_not_found/2]). @@ -140,14 +140,16 @@ auth_config() -> oauth2_enabled = OauthEnabled, oauth_client_id = OauthClientId}. +disable_stats() -> + not rabbit_mgmt_agent_config:is_metrics_collector_enabled() orelse + not rabbit_mgmt_features:are_stats_enabled(). + disable_stats(ReqData) -> MgmtOnly = case qs_val(<<"disable_stats">>, ReqData) of <<"true">> -> true; _ -> false end, - MgmtOnly orelse - not rabbit_mgmt_agent_config:is_metrics_collector_enabled() orelse - not rabbit_mgmt_features:are_stats_enabled(). + MgmtOnly orelse disable_stats(). enable_queue_totals(ReqData) -> EnableTotals = case qs_val(<<"enable_queue_totals">>, ReqData) of @@ -267,7 +269,7 @@ reply0(Facts, ReqData, Context) -> {<<"application">>, <<"bert">>, _} -> {term_to_binary(Facts), ReqData1, Context}; _ -> - {rabbit_json:encode(rabbit_mgmt_format:format_nulls(Facts)), + {rabbit_json:encode(rabbit_mgmt_format:prepare_for_encoding(Facts)), ReqData1, Context} end catch exit:{json_encode, E} -> diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_queue.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_queue.erl index 5d25aff09d2a..0fa35ac487a5 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_wm_queue.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_queue.erl @@ -10,7 +10,7 @@ -export([init/2, resource_exists/2, to_json/2, content_types_provided/2, content_types_accepted/2, is_authorized/2, allowed_methods/2, accept_content/2, - delete_resource/2, queue/1, queue/2]). + delete_resource/2, queue/1, queue/2, queue/3]). -export([variances/2]). -include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl"). @@ -39,17 +39,19 @@ resource_exists(ReqData, Context) -> to_json(ReqData, Context) -> try - case rabbit_mgmt_util:disable_stats(ReqData) of + StatsDisabled = rabbit_mgmt_util:disable_stats(ReqData), + case StatsDisabled of false -> [Q] = rabbit_mgmt_db:augment_queues( - [queue(ReqData)], rabbit_mgmt_util:range_ceil(ReqData), + [queue(ReqData, StatsDisabled)], + rabbit_mgmt_util:range_ceil(ReqData), full), Payload = rabbit_mgmt_format:clean_consumer_details( rabbit_mgmt_format:strip_pids(Q)), rabbit_mgmt_util:reply(ensure_defaults(Payload), ReqData, Context); true -> Q = case rabbit_mgmt_util:enable_queue_totals(ReqData) of - false -> queue(ReqData); + false -> queue(ReqData, StatsDisabled); true -> queue_with_totals(ReqData) end, rabbit_mgmt_util:reply( @@ -108,15 +110,20 @@ ensure_defaults(Payload0) -> end. queue(ReqData) -> + queue(ReqData, rabbit_mgmt_util:disable_stats(ReqData)). + +queue(ReqData, StatsDisabled) -> case rabbit_mgmt_util:vhost(ReqData) of not_found -> not_found; - VHost -> queue(VHost, rabbit_mgmt_util:id(queue, ReqData)) + VHost -> + Ctx = #{management_stats_disabled => StatsDisabled}, + queue(VHost, rabbit_mgmt_util:id(queue, ReqData), Ctx) end. -queue(VHost, QName) -> +queue(VHost, QName, Ctx) -> Name = rabbit_misc:r(VHost, queue, QName), case rabbit_amqqueue:lookup(Name) of - {ok, Q} -> rabbit_mgmt_format:queue(Q); + {ok, Q} -> rabbit_mgmt_format:queue(Q, Ctx); {error, not_found} -> not_found end. diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_queues.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_queues.erl index 3383c82926fc..e42359ff4421 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_wm_queues.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_queues.erl @@ -76,22 +76,17 @@ basic(ReqData) -> %% a cluster wide query with a reasonably long (10s) timeout. %% TODO: replace with faster approximate function Running = rabbit_nodes:list_running(), - Ctx = #{running_nodes => Running}, + StatsDisabled = rabbit_mgmt_util:disable_stats(ReqData), + Ctx = #{running_nodes => Running, + management_stats_disabled => StatsDisabled}, FmtQ = fun (Q) -> rabbit_mgmt_format:queue(Q, Ctx) end, - case rabbit_mgmt_util:disable_stats(ReqData) of + case StatsDisabled of false -> list_queues(ReqData, Running, FmtQ, FmtQ); true -> case rabbit_mgmt_util:enable_queue_totals(ReqData) of false -> - list_queues(ReqData, Running, - fun(Q) -> - FmtQ(Q) ++ - %% TODO: just add policy name in - %% rabbit_mgmt_format:queue/1? - policy(Q) - end, - FmtQ); + list_queues(ReqData, Running, FmtQ, FmtQ); true -> %% TODO: this is not optimised like the other code paths %% most likely we can avoid the collector pattern by @@ -169,9 +164,3 @@ collect_info_all(VHostPath) -> down_queues(ReqData, Running) -> Fun = fun(VhostPath) -> rabbit_amqqueue:list_down(VhostPath, Running) end, rabbit_mgmt_util:all_or_one_vhost(ReqData, Fun). - -policy(Q) -> - case rabbit_policy:name(Q) of - none -> []; - Policy -> [{policy, Policy}] - end. diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_user_queues.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_user_queues.erl index 02df9f4857d6..516bddb09b83 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_wm_user_queues.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_user_queues.erl @@ -70,7 +70,9 @@ basic(ReqData) -> %% a cluster wide query with a reasonably long (10s) timeout. %% TODO: replace with faster approximate function Running = rabbit_nodes:list_running(), - Ctx = #{running_nodes => Running}, + StatsDisabled = rabbit_mgmt_util:disable_stats(ReqData), + Ctx = #{running_nodes => Running, + management_stats_disabled => StatsDisabled}, FmtQ = fun (Q) -> rabbit_mgmt_format:queue(Q, Ctx) end, User = rabbit_mgmt_util:id(user, ReqData), list_queues(ReqData, Running, FmtQ, FmtQ, User). diff --git a/deps/rabbitmq_management/test/rabbit_mgmt_only_http_SUITE.erl b/deps/rabbitmq_management/test/rabbit_mgmt_only_http_SUITE.erl index a7fbc5192e7a..7d45c0210fb4 100644 --- a/deps/rabbitmq_management/test/rabbit_mgmt_only_http_SUITE.erl +++ b/deps/rabbitmq_management/test/rabbit_mgmt_only_http_SUITE.erl @@ -38,7 +38,8 @@ all() -> [ {group, all_tests_with_prefix}, {group, all_tests_without_prefix}, - {group, stats_disabled_on_request} + {group, stats_disabled_on_request}, + {group, stats_disabled_via_config} ]. groups() -> @@ -46,6 +47,12 @@ groups() -> {all_tests_with_prefix, [], some_tests() ++ all_tests()}, {all_tests_without_prefix, [], some_tests()}, {stats_disabled_on_request, [], [disable_with_disable_stats_parameter_test]}, + {stats_disabled_via_config, [], [ + classic_queue_with_stats_disabled_test, + quorum_queue_with_stats_disabled_test, + quorum_queue_default_delivery_limit_with_stats_disabled_test, + stream_queue_with_stats_disabled_test + ]}, {invalid_config, [], [invalid_config_test]} ]. @@ -130,6 +137,17 @@ init_per_group(all_tests_with_prefix = Group, Config0) -> Config1 = rabbit_ct_helpers:merge_app_env(Config0, PathConfig), Config2 = finish_init(Group, Config1), start_broker(Config2); +init_per_group(stats_disabled_via_config = Group, Config0) -> + Config1 = rabbit_ct_helpers:merge_app_env(Config0, + {rabbitmq_management_agent, [ + {disable_metrics_collector, true} + ]}), + Config2 = rabbit_ct_helpers:merge_app_env(Config1, + {rabbitmq_management, [ + {disable_management_stats, true} + ]}), + Config3 = finish_init(Group, Config2, true), + start_broker(Config3); init_per_group(Group, Config0) -> Config1 = finish_init(Group, Config0), start_broker(Config1). @@ -1442,6 +1460,146 @@ disable_with_disable_stats_parameter_test(Config) -> passed. +classic_queue_with_stats_disabled_test(Config) -> + QArgs = #{arguments => #{'x-max-length' => 100}}, + PolicyArgs = #{pattern => <<".*">>, + definition => #{'max-length' => 1024}, + priority => 0, + 'apply-to' => <<"queues">>}, + OpPolicyArgs = #{pattern => <<".*">>, + definition => #{'max-length' => 2048}, + priority => 0, + 'apply-to' => <<"queues">>}, + + http_put(Config, "/queues/%2F/test-classic-queue", QArgs, {group, '2xx'}), + http_put(Config, "/policies/%2F/test-policy", PolicyArgs, {group, '2xx'}), + http_put(Config, "/operator-policies/%2F/test-op-policy", OpPolicyArgs, {group, '2xx'}), + + await_condition(fun() -> + Queue = http_get(Config, "/queues/%2F/test-classic-queue", ?OK), + maps:get(policy, Queue, undefined) =:= <<"test-policy">> + end), + + Queue = http_get(Config, "/queues/%2F/test-classic-queue", ?OK), + + ?assertEqual(<<"test-policy">>, maps:get(policy, Queue)), + ?assertEqual(<<"test-op-policy">>, maps:get(operator_policy, Queue)), + ?assert(maps:is_key(effective_policy_definition, Queue)), + EffectiveDef = maps:get(effective_policy_definition, Queue), + ?assertEqual(1024, maps:get('max-length', EffectiveDef)), + + %% Verify the list endpoint also returns policy fields + Queues = http_get(Config, "/queues", ?OK), + [ListQ] = [Q || Q <- Queues, maps:get(name, Q) =:= <<"test-classic-queue">>], + ?assertEqual(<<"test-policy">>, maps:get(policy, ListQ)), + ?assertEqual(<<"test-op-policy">>, maps:get(operator_policy, ListQ)), + + http_delete(Config, "/queues/%2F/test-classic-queue", {group, '2xx'}), + http_delete(Config, "/policies/%2F/test-policy", {group, '2xx'}), + http_delete(Config, "/operator-policies/%2F/test-op-policy", {group, '2xx'}), + + passed. + +quorum_queue_with_stats_disabled_test(Config) -> + QArgs = #{durable => true, + arguments => #{'x-queue-type' => 'quorum', + 'x-delivery-limit' => 40}}, + PolicyArgs = #{pattern => <<".*">>, + definition => #{'queue-leader-locator' => <<"balanced">>}, + priority => 0, + 'apply-to' => <<"queues">>}, + OpPolicyArgs = #{pattern => <<".*">>, + definition => #{'max-length' => 1024}, + priority => 0, + 'apply-to' => <<"queues">>}, + + http_put(Config, "/queues/%2F/test-quorum-queue", QArgs, {group, '2xx'}), + http_put(Config, "/policies/%2F/test-policy", PolicyArgs, {group, '2xx'}), + http_put(Config, "/operator-policies/%2F/test-op-policy", OpPolicyArgs, {group, '2xx'}), + + await_condition(fun() -> + Queue = http_get(Config, "/queues/%2F/test-quorum-queue", ?OK), + maps:get(policy, Queue, undefined) =:= <<"test-policy">> + end), + + Queue = http_get(Config, "/queues/%2F/test-quorum-queue", ?OK), + + ?assertEqual(<<"test-policy">>, maps:get(policy, Queue)), + ?assertEqual(<<"test-op-policy">>, maps:get(operator_policy, Queue)), + ?assert(maps:is_key(effective_policy_definition, Queue)), + EffectiveDef = maps:get(effective_policy_definition, Queue), + ?assertEqual(1024, maps:get('max-length', EffectiveDef)), + ?assertEqual(<<"balanced">>, maps:get('queue-leader-locator', EffectiveDef)), + ?assertEqual(40, maps:get(delivery_limit, Queue)), + + http_delete(Config, "/queues/%2F/test-quorum-queue", {group, '2xx'}), + http_delete(Config, "/policies/%2F/test-policy", {group, '2xx'}), + http_delete(Config, "/operator-policies/%2F/test-op-policy", {group, '2xx'}), + + passed. + +quorum_queue_default_delivery_limit_with_stats_disabled_test(Config) -> + QArgs = #{durable => true, + arguments => #{'x-queue-type' => 'quorum'}}, + PolicyArgs = #{pattern => <<".*">>, + definition => #{'queue-leader-locator' => <<"balanced">>}, + priority => 0, + 'apply-to' => <<"queues">>}, + + http_put(Config, "/queues/%2F/test-qq-default-dl", QArgs, {group, '2xx'}), + http_put(Config, "/policies/%2F/test-policy", PolicyArgs, {group, '2xx'}), + + await_condition(fun() -> + Queue = http_get(Config, "/queues/%2F/test-qq-default-dl", ?OK), + maps:get(policy, Queue, undefined) =:= <<"test-policy">> + end), + + Queue = http_get(Config, "/queues/%2F/test-qq-default-dl", ?OK), + + ?assertEqual(<<"test-policy">>, maps:get(policy, Queue)), + ?assertEqual(20, maps:get(delivery_limit, Queue)), + + http_delete(Config, "/queues/%2F/test-qq-default-dl", {group, '2xx'}), + http_delete(Config, "/policies/%2F/test-policy", {group, '2xx'}), + + passed. + +stream_queue_with_stats_disabled_test(Config) -> + QArgs = #{durable => true, + arguments => #{'x-queue-type' => 'stream'}}, + PolicyArgs = #{pattern => <<".*">>, + definition => #{'queue-leader-locator' => <<"balanced">>}, + priority => 0, + 'apply-to' => <<"queues">>}, + OpPolicyArgs = #{pattern => <<".*">>, + definition => #{'max-length-bytes' => 1000000000}, + priority => 0, + 'apply-to' => <<"queues">>}, + + http_put(Config, "/queues/%2F/test-stream-queue", QArgs, {group, '2xx'}), + http_put(Config, "/policies/%2F/test-policy", PolicyArgs, {group, '2xx'}), + http_put(Config, "/operator-policies/%2F/test-op-policy", OpPolicyArgs, {group, '2xx'}), + + await_condition(fun() -> + Queue = http_get(Config, "/queues/%2F/test-stream-queue", ?OK), + maps:get(policy, Queue, undefined) =:= <<"test-policy">> + end), + + Queue = http_get(Config, "/queues/%2F/test-stream-queue", ?OK), + + ?assertEqual(<<"test-policy">>, maps:get(policy, Queue)), + ?assertEqual(<<"test-op-policy">>, maps:get(operator_policy, Queue)), + ?assert(maps:is_key(effective_policy_definition, Queue)), + EffectiveDef = maps:get(effective_policy_definition, Queue), + ?assertEqual(<<"balanced">>, maps:get('queue-leader-locator', EffectiveDef)), + ?assertEqual(1000000000, maps:get('max-length-bytes', EffectiveDef)), + + http_delete(Config, "/queues/%2F/test-stream-queue", {group, '2xx'}), + http_delete(Config, "/policies/%2F/test-policy", {group, '2xx'}), + http_delete(Config, "/operator-policies/%2F/test-op-policy", {group, '2xx'}), + + passed. + sorting_test(Config) -> QArgs = #{}, PermArgs = [{configure, <<".*">>}, {write, <<".*">>}, {read, <<".*">>}], @@ -1600,37 +1758,6 @@ local_port(Conn) -> {ok, Port} = inet:port(Sock), Port. -spawn_invalid(_Config, 0) -> - ok; -spawn_invalid(Config, N) -> - Self = self(), - spawn(fun() -> - timer:sleep(rand:uniform(250)), - {ok, Sock} = gen_tcp:connect("localhost", amqp_port(Config), [list]), - ok = gen_tcp:send(Sock, "Some Data"), - receive_msg(Self) - end), - spawn_invalid(Config, N-1). - -receive_msg(Self) -> - receive - {tcp, _, [$A, $M, $Q, $P | _]} -> - Self ! done - after - 60000 -> - Self ! no_reply - end. - -wait_for_answers(0) -> - ok; -wait_for_answers(N) -> - receive - done -> - wait_for_answers(N-1); - no_reply -> - throw(no_reply) - end. - publish(Ch) -> amqp_channel:call(Ch, #'basic.publish'{exchange = <<"">>, routing_key = <<"myqueue">>}, diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl index 0ad7327dc748..6eadc128d2dc 100644 --- a/deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl @@ -15,7 +15,7 @@ -export([to_amqp_table/1, listener/1, web_context/1, properties/1, basic_properties/1]). -export([record/2, to_basic_properties/1]). -export([addr/1, port/1]). --export([format_nulls/1]). +-export([prepare_for_encoding/1]). -export([print/2, print/1]). -export([format_queue_stats/1, format_queue_basic_stats/1, @@ -555,23 +555,39 @@ strip_pids([Any | T], Acc) -> strip_pids([], Acc) -> Acc. -%% Format for JSON replies. Transforms '' into null -format_nulls(Items) when is_list(Items) -> - [format_null_item(Pair) || Pair <- Items]; -format_nulls(Item) -> +%% Format for JSON replies. Transforms '' into null and deduplicates keys. +prepare_for_encoding(Items) when is_list(Items) -> + dedup_keys([format_null_item(Pair) || Pair <- Items]); +prepare_for_encoding(Item) -> format_null_item(Item). format_null_item({Key, ''}) -> {Key, null}; format_null_item({Key, Value}) when is_list(Value) -> - {Key, format_nulls(Value)}; + {Key, prepare_for_encoding(Value)}; format_null_item({Key, Value}) -> {Key, Value}; format_null_item([{_K, _V} | _T] = L) -> - format_nulls(L); + prepare_for_encoding(L); format_null_item(Value) -> Value. +%% Keep only the first occurrence of each key in a proplist. +dedup_keys(Items) -> + dedup_keys(Items, sets:new([{version, 2}]), []). + +dedup_keys([], _Seen, Acc) -> + lists:reverse(Acc); +dedup_keys([{Key, _} = Item | Rest], Seen, Acc) -> + case sets:is_element(Key, Seen) of + true -> + dedup_keys(Rest, Seen, Acc); + false -> + dedup_keys(Rest, sets:add_element(Key, Seen), [Item | Acc]) + end; +dedup_keys([Item | Rest], Seen, Acc) -> + dedup_keys(Rest, Seen, [Item | Acc]). + -spec clean_consumer_details(proplists:proplist()) -> proplists:proplist(). clean_consumer_details(Obj) -> case pget(consumer_details, Obj) of diff --git a/deps/rabbitmq_tracing/src/rabbit_tracing_consumer.erl b/deps/rabbitmq_tracing/src/rabbit_tracing_consumer.erl index fabc1b7ee908..465cb2d7a5c8 100644 --- a/deps/rabbitmq_tracing/src/rabbit_tracing_consumer.erl +++ b/deps/rabbitmq_tracing/src/rabbit_tracing_consumer.erl @@ -90,8 +90,9 @@ init(Args0) -> end. handle_call(info_all, _From, State = #state{vhost = V, queue = Q}) -> + StatsDisabled = rabbit_mgmt_util:disable_stats(), [QInfo] = rabbit_mgmt_db:augment_queues( - [rabbit_mgmt_wm_queue:queue(V, Q)], + [rabbit_mgmt_wm_queue:queue(V, Q, #{management_stats_disabled => StatsDisabled})], rabbit_mgmt_util:no_range(), basic), {reply, [{queue, rabbit_mgmt_format:strip_pids(QInfo)}], State};