diff --git a/deps/rabbit/src/rabbit_global_counters.erl b/deps/rabbit/src/rabbit_global_counters.erl index 49fc9a06fe53..32cc8964c037 100644 --- a/deps/rabbit/src/rabbit_global_counters.erl +++ b/deps/rabbit/src/rabbit_global_counters.erl @@ -132,14 +132,14 @@ boot_step() -> [begin %% Protocol counters - Protocol = {protocol, Proto}, - init([Protocol]), + Protocol = #{protocol => Proto}, + init(Protocol), rabbit_msg_size_metrics:init(Proto), %% Protocol & Queue Type counters - init([Protocol, {queue_type, rabbit_classic_queue}]), - init([Protocol, {queue_type, rabbit_quorum_queue}]), - init([Protocol, {queue_type, rabbit_stream_queue}]) + init(Protocol#{queue_type => rabbit_classic_queue}), + init(Protocol#{queue_type => rabbit_quorum_queue}), + init(Protocol#{queue_type => rabbit_stream_queue}) end || Proto <- [amqp091, amqp10]], %% Dead Letter counters @@ -147,11 +147,11 @@ boot_step() -> %% Streams never dead letter. %% %% Source classic queue dead letters. - init([{queue_type, rabbit_classic_queue}, {dead_letter_strategy, disabled}], + init(#{queue_type => rabbit_classic_queue, dead_letter_strategy => disabled}, [?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER, ?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER, ?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER]), - init([{queue_type, rabbit_classic_queue}, {dead_letter_strategy, at_most_once}], + init(#{queue_type => rabbit_classic_queue, dead_letter_strategy => at_most_once}, [?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER, ?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER, ?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER]), @@ -159,19 +159,19 @@ boot_step() -> %% Source quorum queue dead letters. %% Only quorum queues can dead letter due to delivery-limit exceeded. %% Only quorum queues support dead letter strategy at-least-once. - init([{queue_type, rabbit_quorum_queue}, {dead_letter_strategy, disabled}], + init(#{queue_type => rabbit_quorum_queue, dead_letter_strategy => disabled}, [?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER, ?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER, ?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER, ?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT_COUNTER ]), - init([{queue_type, rabbit_quorum_queue}, {dead_letter_strategy, at_most_once}], + init(#{queue_type => rabbit_quorum_queue, dead_letter_strategy => at_most_once}, [?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER, ?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER, ?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER, ?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT_COUNTER ]), - init([{queue_type, rabbit_quorum_queue}, {dead_letter_strategy, at_least_once}], + init(#{queue_type => rabbit_quorum_queue, dead_letter_strategy => at_least_once}, [?MESSAGES_DEAD_LETTERED_CONFIRMED_COUNTER, ?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER, ?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER, @@ -181,21 +181,21 @@ boot_step() -> init(Labels) -> init(Labels, []). -init(Labels = [{protocol, Protocol}, {queue_type, QueueType}], Extra) -> +init(Labels = #{protocol := Protocol, queue_type := QueueType}, Extra) -> _ = seshat:new_group(?MODULE), - Counters = seshat:new(?MODULE, Labels, ?PROTOCOL_QUEUE_TYPE_COUNTERS ++ Extra), + Counters = seshat:new(?MODULE, Labels, ?PROTOCOL_QUEUE_TYPE_COUNTERS ++ Extra, Labels), persistent_term:put({?MODULE, Protocol, QueueType}, Counters); -init(Labels = [{protocol, Protocol}], Extra) -> +init(Labels = #{protocol := Protocol}, Extra) -> _ = seshat:new_group(?MODULE), - Counters = seshat:new(?MODULE, Labels, ?PROTOCOL_COUNTERS ++ Extra), + Counters = seshat:new(?MODULE, Labels, ?PROTOCOL_COUNTERS ++ Extra, Labels), persistent_term:put({?MODULE, Protocol}, Counters); -init(Labels = [{queue_type, QueueType}, {dead_letter_strategy, DLS}], DeadLetterCounters) -> +init(Labels = #{queue_type := QueueType, dead_letter_strategy := DLS}, DeadLetterCounters) -> _ = seshat:new_group(?MODULE), - Counters = seshat:new(?MODULE, Labels, DeadLetterCounters), + Counters = seshat:new(?MODULE, Labels, DeadLetterCounters, Labels), persistent_term:put({?MODULE, QueueType, DLS}, Counters). overview() -> - seshat:overview(?MODULE). + seshat:counters(?MODULE). prometheus_format() -> seshat:format(?MODULE). diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index be9d5b42b06f..ba6460a9c064 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -276,6 +276,7 @@ setup(_Context) -> {default_ra_system, ?RA_SYSTEM}]}], [{persistent, true}]), RaServerConfig = #{cluster_name => ?RA_CLUSTER_NAME, + metrics_labels => #{ra_system => ?RA_SYSTEM, module => ?MODULE}, friendly_name => ?RA_FRIENDLY_NAME}, case khepri:start(?RA_SYSTEM, RaServerConfig) of {ok, ?STORE_ID} -> diff --git a/deps/rabbit/src/rabbit_observer_cli_quorum_queues.erl b/deps/rabbit/src/rabbit_observer_cli_quorum_queues.erl index 1c9b72a0cea1..ee0864924239 100644 --- a/deps/rabbit/src/rabbit_observer_cli_quorum_queues.erl +++ b/deps/rabbit/src/rabbit_observer_cli_quorum_queues.erl @@ -123,11 +123,11 @@ sheet_header() -> sheet_body(PrevState) -> {_, RaStates} = rabbit_quorum_queue:all_replica_states(), Body = [begin - #resource{name = Name, virtual_host = Vhost} = R = amqqueue:get_name(Q), + #resource{name = Name, virtual_host = Vhost} = amqqueue:get_name(Q), case rabbit_amqqueue:pid_of(Q) of none -> empty_row(Name); - {QName, _QNode} = _QQ -> + {QName, _QNode} = ServerId -> case whereis(QName) of undefined -> empty_row(Name); @@ -139,7 +139,12 @@ sheet_body(PrevState) -> _ -> QQCounters = maps:get({QName, node()}, ra_counters:overview()), {ok, InternalName} = rabbit_queue_type_util:qname_to_internal_name(#resource{virtual_host = Vhost, name= Name}), - [{_, CT, SnapIdx, LA, CI, LW, CL}] = ets:lookup(ra_metrics, R), + #{snapshot_index := SnapIdx, + last_written_index := LW, + term := CT, + commit_latency := CL, + commit_index := CI, + last_applied := LA} = ra:key_metrics(ServerId), [ Pid, QName, diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 5ae9a8a73973..d068d51bb57d 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -1598,12 +1598,10 @@ transfer_leadership(Q, Destination) -> end. queue_length(Q) -> - Name = amqqueue:get_name(Q), - case ets:lookup(ra_metrics, Name) of - [] -> 0; - [{_, _, SnapIdx, _, _, LastIdx, _}] -> - LastIdx - SnapIdx - end. + ServerId = amqqueue:get_pid(Q), + #{snapshot_index := SnapIdx, + last_written_index := LastIdx} = key_metrics_rpc(ServerId), + LastIdx - SnapIdx. get_replicas(Q) -> get_nodes(Q). @@ -1985,6 +1983,7 @@ make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval, CheckpointInterval, Membership, MacVersion) -> QName = amqqueue:get_name(Q), + #resource{name = QNameBin} = QName, RaMachine = ra_machine(Q), [{ClusterName, _} | _] = Members = members(Q), UId = ra:new_uid(ra_lib:to_binary(ClusterName)), @@ -2000,6 +1999,8 @@ make_ra_conf(Q, ServerId, TickTimeout, uid => UId, friendly_name => FName, metrics_key => QName, + metrics_labels => #{vhost => amqqueue:get_vhost(Q), + queue => QNameBin}, initial_members => Members, log_init_args => LogCfg, tick_timeout => TickTimeout, diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index f2594ac538a2..28837c048765 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -1349,6 +1349,7 @@ make_ra_conf(Node, Nodes, MinMacVersion) -> uid => UId, friendly_name => atom_to_list(?MODULE), metrics_key => ?MODULE, + metrics_labels => #{ra_system => ?RA_SYSTEM, module => ?MODULE}, initial_members => Members, log_init_args => #{uid => UId}, tick_timeout => TickTimeout, diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 99b1ab64906e..201fc99125d5 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -6967,11 +6967,11 @@ formatted_state(Pid) -> proplists:get_value("State", L2). get_global_counters(Config) -> - get_global_counters0(Config, [{protocol, amqp10}]). + get_global_counters0(Config, #{protocol => amqp10}). get_global_counters(Config, QType) -> - get_global_counters0(Config, [{protocol, amqp10}, - {queue_type, QType}]). + get_global_counters0(Config, #{protocol => amqp10, + queue_type => QType}). get_global_counters0(Config, Key) -> Overview = rpc(Config, rabbit_global_counters, overview, []), diff --git a/deps/rabbit/test/dead_lettering_SUITE.erl b/deps/rabbit/test/dead_lettering_SUITE.erl index 489f4e154e41..e6d25b8c5f42 100644 --- a/deps/rabbit/test/dead_lettering_SUITE.erl +++ b/deps/rabbit/test/dead_lettering_SUITE.erl @@ -1936,7 +1936,7 @@ counted(Metric, Config) -> metric(QueueType, Strategy, Metric, OldCounters). metric(QueueType, Strategy, Metric, Counters) -> - Metrics = maps:get([{queue_type, QueueType}, {dead_letter_strategy, Strategy}], Counters), + Metrics = maps:get(#{queue_type => QueueType, dead_letter_strategy => Strategy}, Counters), maps:get(Metric, Metrics). group_name(Config) -> diff --git a/deps/rabbit/test/queue_type_SUITE.erl b/deps/rabbit/test/queue_type_SUITE.erl index 6de4a29d2fc4..bbd4c6fc15ca 100644 --- a/deps/rabbit/test/queue_type_SUITE.erl +++ b/deps/rabbit/test/queue_type_SUITE.erl @@ -162,7 +162,7 @@ smoke(Config) -> ok = publish_and_confirm(Ch, <<"non-existent_queue">>, <<"msg4">>), ConsumerTag3 = <<"ctag3">>, ok = subscribe(Ch, QName, ConsumerTag3), - ProtocolCounters = maps:get([{protocol, amqp091}], get_global_counters(Config)), + ProtocolCounters = maps:get(#{protocol => amqp091}, get_global_counters(Config)), ?assertEqual(#{ messages_confirmed_total => 4, messages_received_confirm_total => 4, @@ -177,7 +177,7 @@ smoke(Config) -> "rabbit_" ++ binary_to_list(?config(queue_type, Config)) ++ "_queue"), - ProtocolQueueTypeCounters = maps:get([{protocol, amqp091}, {queue_type, QueueType}], + ProtocolQueueTypeCounters = maps:get(#{protocol => amqp091, queue_type => QueueType}, get_global_counters(Config)), ?assertEqual(#{ messages_acknowledged_total => 3, @@ -196,7 +196,7 @@ smoke(Config) -> ?assertMatch( #{consumers := 0, publishers := 0}, - maps:get([{protocol, amqp091}], get_global_counters(Config))), + maps:get(#{protocol => amqp091}, get_global_counters(Config))), ok. diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 2ae9f23d4060..a2c19569425d 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -3145,6 +3145,9 @@ reconnect_consumer_and_wait_channel_down(Config) -> {#'basic.deliver'{redelivered = false}, _} -> wait_for_messages_ready(Servers, RaName, 0), wait_for_messages_pending_ack(Servers, RaName, 1) + after 30000 -> + flush(1), + exit(basic_deliver_timeout) end, Up = [Leader, F2], rabbit_ct_broker_helpers:block_traffic_between(F1, Leader), diff --git a/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl b/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl index a6949065253f..20e0842c865a 100644 --- a/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl @@ -991,5 +991,5 @@ counted(Metric, Config) -> metric(Metric, OldCounters). metric(Metric, Counters) -> - Metrics = maps:get([{queue_type, rabbit_quorum_queue}, {dead_letter_strategy, at_least_once}], Counters), + Metrics = maps:get(#{queue_type => rabbit_quorum_queue, dead_letter_strategy => at_least_once}, Counters), maps:get(Metric, Metrics). diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_external_stats.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_external_stats.erl index 6e32fef39e7e..072617ec13e9 100644 --- a/deps/rabbitmq_management_agent/src/rabbit_mgmt_external_stats.erl +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_external_stats.erl @@ -267,20 +267,8 @@ i(context_switches, State) -> {Sw, 0} = erlang:statistics(context_switches), {State, Sw}; i(ra_open_file_metrics, State) -> - {State, [{ra_log_wal, ra_metrics(ra_log_wal)}, - {ra_log_segment_writer, ra_metrics(ra_log_segment_writer)}]}. - -ra_metrics(K) -> - try - case ets:lookup(ra_open_file_metrics, whereis(K)) of - [] -> 0; - [{_, C}] -> C - end - catch - error:badarg -> - %% On startup the mgmt might start before ra does - 0 - end. + {State, [{ra_log_wal, 0}, + {ra_log_segment_writer, 0}]}. resource_alarm_set(Source) -> lists:member({{resource_limit, Source, node()},[]}, @@ -421,7 +409,7 @@ update_state(State0) -> get_fhc_stats() -> dict:to_list(dict:merge(fun(_, V1, V2) -> V1 + V2 end, dict:from_list(zero_fhc_stats()), - dict:from_list(get_ra_io_metrics()))). + dict:from_list(get_zero_ra_io_metrics()))). zero_fhc_stats() -> [{{Op, Counter}, 0} || Op <- [io_read, io_write], @@ -435,5 +423,17 @@ zero_fhc_stats() -> queue_index_write, queue_index_read], Counter <- [count]]. -get_ra_io_metrics() -> - lists:sort(ets:tab2list(ra_io_metrics)). +get_zero_ra_io_metrics() -> + %% not tracked anymore + [{{io_file_handle_open_attempt,count},0}, + {{io_file_handle_open_attempt,time},0}, + {{io_read,bytes},0}, + {{io_read,count},0}, + {{io_read,time},0}, + {{io_seek,count},0}, + {{io_seek,time},0}, + {{io_sync,count},0}, + {{io_sync,time},0}, + {{io_write,bytes},0}, + {{io_write,count},0}, + {{io_write,time},0}]. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl index 8ecbd85b66ab..3f9882bdaae7 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl @@ -80,15 +80,15 @@ init_global_counters() -> lists:foreach(fun init_global_counters/1, [?MQTT_PROTO_V3, ?MQTT_PROTO_V4, ?MQTT_PROTO_V5]), - rabbit_global_counters:init([{queue_type, ?QUEUE_TYPE_QOS_0}, {dead_letter_strategy, disabled}], + rabbit_global_counters:init(#{queue_type => ?QUEUE_TYPE_QOS_0, dead_letter_strategy => disabled}, [?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER]). init_global_counters(ProtoVer) -> - Proto = {protocol, ProtoVer}, - rabbit_global_counters:init([Proto]), - rabbit_global_counters:init([Proto, {queue_type, rabbit_classic_queue}]), - rabbit_global_counters:init([Proto, {queue_type, rabbit_quorum_queue}]), - rabbit_global_counters:init([Proto, {queue_type, ?QUEUE_TYPE_QOS_0}]), + Proto = #{protocol => ProtoVer}, + rabbit_global_counters:init(Proto), + rabbit_global_counters:init(Proto#{queue_type => rabbit_classic_queue}), + rabbit_global_counters:init(Proto#{queue_type => rabbit_quorum_queue}), + rabbit_global_counters:init(Proto#{queue_type => ?QUEUE_TYPE_QOS_0}), rabbit_msg_size_metrics:init(ProtoVer). persist_static_configuration() -> diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index f50b19b42c6f..acc6ec95ace1 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -715,7 +715,7 @@ global_counters(Config) -> messages_delivered_get_manual_ack_total => 0, messages_get_empty_total => 0, messages_redelivered_total => 0}, - get_global_counters(Config, ProtoVer, 0, [{queue_type, rabbit_classic_queue}])), + get_global_counters(Config, ProtoVer, 0, #{queue_type => rabbit_classic_queue})), ?assertEqual(#{messages_delivered_total => 1, messages_acknowledged_total => 0, messages_delivered_consume_auto_ack_total => 1, @@ -724,7 +724,7 @@ global_counters(Config) -> messages_delivered_get_manual_ack_total => 0, messages_get_empty_total => 0, messages_redelivered_total => 0}, - get_global_counters(Config, ProtoVer, 0, [{queue_type, rabbit_mqtt_qos0_queue}])), + get_global_counters(Config, ProtoVer, 0, #{queue_type => rabbit_mqtt_qos0_queue})), {ok, _, _} = emqtt:unsubscribe(C, Topic1), ?assertEqual(1, maps:get(consumers, get_global_counters(Config, ProtoVer))), diff --git a/deps/rabbitmq_mqtt/test/reader_SUITE.erl b/deps/rabbitmq_mqtt/test/reader_SUITE.erl index 7638242350f4..8f2cff108349 100644 --- a/deps/rabbitmq_mqtt/test/reader_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/reader_SUITE.erl @@ -261,11 +261,11 @@ rabbit_mqtt_qos0_queue_overflow(Config) -> QType = rabbit_mqtt_qos0_queue, #{ - [{protocol, ProtoVer}, {queue_type, QType}] := + #{protocol => ProtoVer, queue_type => QType} := #{messages_delivered_total := 0, messages_delivered_consume_auto_ack_total := 0}, - [{queue_type, QType}, {dead_letter_strategy, disabled}] := + #{queue_type => QType, dead_letter_strategy => disabled} := #{messages_dead_lettered_maxlen_total := NumDeadLettered} } = rabbit_ct_broker_helpers:rpc(Config, rabbit_global_counters, overview, []), @@ -320,11 +320,11 @@ rabbit_mqtt_qos0_queue_overflow(Config) -> ExpectedNumDeadLettered = NumDeadLettered + NumDropped, ?assertMatch( #{ - [{protocol, ProtoVer}, {queue_type, QType}] := + #{protocol => ProtoVer, queue_type => QType} := #{messages_delivered_total := NumReceived, messages_delivered_consume_auto_ack_total := NumReceived}, - [{queue_type, QType}, {dead_letter_strategy, disabled}] := + #{queue_type => QType, dead_letter_strategy => disabled} := #{messages_dead_lettered_maxlen_total := ExpectedNumDeadLettered} }, rabbit_ct_broker_helpers:rpc(Config, rabbit_global_counters, overview, [])), diff --git a/deps/rabbitmq_mqtt/test/util.erl b/deps/rabbitmq_mqtt/test/util.erl index 954f0c664585..782a81bd043d 100644 --- a/deps/rabbitmq_mqtt/test/util.erl +++ b/deps/rabbitmq_mqtt/test/util.erl @@ -77,16 +77,16 @@ get_global_counters(Config, ProtoVer) -> get_global_counters(Config, ProtoVer, 0). get_global_counters(Config, ProtoVer, Node) -> - get_global_counters(Config, ProtoVer, Node, []). - -get_global_counters(Config, v3, Node, QType) -> - get_global_counters(Config, ?MQTT_PROTO_V3, Node, QType); -get_global_counters(Config, v4, Node, QType) -> - get_global_counters(Config, ?MQTT_PROTO_V4, Node, QType); -get_global_counters(Config, v5, Node, QType) -> - get_global_counters(Config, ?MQTT_PROTO_V5, Node, QType); -get_global_counters(Config, Proto, Node, QType) -> - maps:get([{protocol, Proto}] ++ QType, + get_global_counters(Config, ProtoVer, Node, #{}). + +get_global_counters(Config, v3, Node, Labels) -> + get_global_counters(Config, ?MQTT_PROTO_V3, Node, Labels); +get_global_counters(Config, v4, Node, Labels) -> + get_global_counters(Config, ?MQTT_PROTO_V4, Node, Labels); +get_global_counters(Config, v5, Node, Labels) -> + get_global_counters(Config, ?MQTT_PROTO_V5, Node, Labels); +get_global_counters(Config, Proto, Node, Labels) -> + maps:get(Labels#{protocol => Proto}, rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_global_counters, overview, [])). get_events(Node) -> diff --git a/deps/rabbitmq_mqtt/test/v5_SUITE.erl b/deps/rabbitmq_mqtt/test/v5_SUITE.erl index 87483af840f9..cbc39f41b879 100644 --- a/deps/rabbitmq_mqtt/test/v5_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/v5_SUITE.erl @@ -2196,7 +2196,7 @@ dead_letter_metric(Metric, Config) -> dead_letter_metric(Metric, Config, Strategy) -> Counters = rpc(Config, rabbit_global_counters, overview, []), - Map = maps:get([{queue_type, rabbit_classic_queue}, {dead_letter_strategy, Strategy}], Counters), + Map = maps:get(#{queue_type => rabbit_classic_queue, dead_letter_strategy => Strategy}, Counters), maps:get(Metric, Map). assert_nothing_received() -> diff --git a/deps/rabbitmq_prometheus/docker/grafana/dashboards/RabbitMQ-Quorum-Queues-Raft.json b/deps/rabbitmq_prometheus/docker/grafana/dashboards/RabbitMQ-Quorum-Queues-Raft.json index 137aa22cb9cc..ddaaabaf53b2 100644 --- a/deps/rabbitmq_prometheus/docker/grafana/dashboards/RabbitMQ-Quorum-Queues-Raft.json +++ b/deps/rabbitmq_prometheus/docker/grafana/dashboards/RabbitMQ-Quorum-Queues-Raft.json @@ -418,7 +418,7 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "expr": "rabbitmq_raft_entry_commit_latency_seconds * on(instance, job) group_left(rabbitmq_cluster, rabbitmq_node) rabbitmq_identity_info{rabbitmq_cluster=\"$rabbitmq_cluster\", namespace=\"$namespace\",rabbitmq_endpoint=\"$endpoint\"}", + "expr": "rabbitmq_raft_commit_latency_seconds * on(instance, job) group_left(rabbitmq_cluster, rabbitmq_node) rabbitmq_identity_info{rabbitmq_cluster=\"$rabbitmq_cluster\", namespace=\"$namespace\",rabbitmq_endpoint=\"$endpoint\"}", "format": "time_series", "instant": false, "intervalFactor": 1, @@ -935,7 +935,7 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "expr": "sum(rate(rabbitmq_raft_term_total[60s]) * on(instance, job) group_left(rabbitmq_cluster, rabbitmq_node) rabbitmq_identity_info{rabbitmq_cluster=\"$rabbitmq_cluster\", namespace=\"$namespace\",rabbitmq_endpoint=\"$endpoint\"}) by(rabbitmq_node)", + "expr": "sum(rate(rabbitmq_raft_term[60s]) * on(instance, job) group_left(rabbitmq_cluster, rabbitmq_node) rabbitmq_identity_info{rabbitmq_cluster=\"$rabbitmq_cluster\", namespace=\"$namespace\",rabbitmq_endpoint=\"$endpoint\"}) by(rabbitmq_node)", "format": "time_series", "instant": false, "intervalFactor": 1, diff --git a/deps/rabbitmq_prometheus/metrics.md b/deps/rabbitmq_prometheus/metrics.md index 5b173ac52191..7f61b0d3af94 100644 --- a/deps/rabbitmq_prometheus/metrics.md +++ b/deps/rabbitmq_prometheus/metrics.md @@ -247,20 +247,26 @@ These metrics are specific to the stream protocol. ### Raft -| Metric | Description | -| --- | --- | -| rabbitmq_raft_entry_commit_latency_seconds | Time taken for an entry to be committed | -| rabbitmq_raft_log_commit_index | Raft log commit index | -| rabbitmq_raft_log_last_applied_index | Raft log last applied index | -| rabbitmq_raft_log_last_written_index | Raft log last written index | -| rabbitmq_raft_log_snapshot_index | Raft log snapshot index | -| rabbitmq_raft_term_total | Current Raft term number | +| Metric | Description | +| --- | --- | +| rabbitmq_raft_commit_latency_seconds | Approximate time taken from an entry being written to the log until it is committed | +| rabbitmq_raft_commit_index | Current commit index | +| rabbitmq_raft_last_applied | Last applied index. Can go backwards if a ra server is restarted | +| rabbitmq_raft_last_written_index | Last fully written and fsynced index of the log | +| rabbitmq_raft_snapshot_index | Current snapshot index | +| rabbitmq_raft_term | Current term | +| rabbitmq_raft_num_segments | Number of non-empty segments files | +| rabbitmq_raft_wal_files | Number of write-ahead log files created | +| rabbitmq_raft_segments | Number of segments written | +| rabbitmq_raft_mem_tables | Number of in-memory tables handled | +| rabbitmq_raft_entries | Number of entries written | +| rabbitmq_raft_bytes_written | Number of bytes written | ### Federation | Metric | Description | | --- | --- | -| rabbitmq_federation_links | Federations Links count grouped by Link status | +| rabbitmq_federation_links | Federations Links count grouped by Link status | ## Telemetry diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index 1e1b00b23aa9..7f6ed70d56dc 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -106,15 +106,6 @@ {2, ?MICROSECOND, io_seek_time_seconds_total, counter, "Total I/O seek time", io_seek_time} ]}, - {ra_metrics, [ - {2, undefined, raft_term_total, counter, "Current Raft term number"}, - {3, undefined, raft_log_snapshot_index, gauge, "Raft log snapshot index"}, - {4, undefined, raft_log_last_applied_index, gauge, "Raft log last applied index"}, - {5, undefined, raft_log_commit_index, gauge, "Raft log commit index"}, - {6, undefined, raft_log_last_written_index, gauge, "Raft log last written index"}, - {7, ?MILLISECOND, raft_entry_commit_latency_seconds, gauge, "Time taken for a log entry to be committed"} - ]}, - {auth_attempt_metrics, [ {2, undefined, auth_attempts_total, counter, "Total number of authentication attempts"}, {3, undefined, auth_attempts_succeeded_total, counter, "Total number of successful authentication attempts"}, @@ -331,8 +322,11 @@ collect_mf(_Registry, Callback) -> collect(PerObjectMetrics, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback), totals(Callback), case PerObjectMetrics of - true -> emit_identity_info(<<"per-object">>, Callback); - false -> emit_identity_info(<<"aggregated">>, Callback) + true -> + emit_identity_info(<<"per-object">>, Callback), + emit_queue_info(?METRIC_NAME_PREFIX, false, Callback); + false -> + emit_identity_info(<<"aggregated">>, Callback) end, ok. @@ -701,7 +695,6 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics; Table == exchange_metrics; Table == queue_exchange_metrics; Table == channel_queue_exchange_metrics; - Table == ra_metrics; Table == channel_process_metrics -> Result = ets:foldl(fun %% For queue_coarse_metrics @@ -723,33 +716,10 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics; {T, V1 + A1, V2 + A2, V3 + A3, V4 + A4}; ({_, V1, V2, V3, V4}, {T, A1, A2, A3, A4}) -> {T, V1 + A1, V2 + A2, V3 + A3, V4 + A4}; - ({_, V1, V2, V3, V4, V5, V6}, {T, A1, A2, A3, A4, A5, A6}) -> - %% ra_metrics: raft_entry_commit_latency_seconds needs to be an average - {T, V1 + A1, V2 + A2, V3 + A3, V4 + A4, V5 + A5, accumulate_count_and_sum(V6, A6)}; ({_, V1, V2, V3, V4, V5, V6, V7, _}, {T, A1, A2, A3, A4, A5, A6, A7}) -> {T, V1 + A1, V2 + A2, V3 + A3, V4 + A4, V5 + A5, V6 + A6, V7 + A7} end, empty(Table), Table), - case Table of - %% raft_entry_commit_latency_seconds needs to be an average - ra_metrics -> - {Count, Sum} = element(7, Result), - [setelement(7, Result, division(Sum, Count))]; - _ -> - [Result] - end; -get_data(ra_metrics = Table, true, _) -> - ets:foldl( - fun ({#resource{kind = queue}, _, _, _, _, _, _} = Row, Acc) -> - %% Metrics for QQ records use the queue resource as the table - %% key. The queue name and vhost will be rendered as tags. - [Row | Acc]; - ({ClusterName, _, _, _, _, _, _} = Row, Acc) when is_atom(ClusterName) -> - %% Other Ra clusters like Khepri and the stream coordinator use - %% the cluster name as the metrics key. Transform this into a - %% value that can be rendered as a "raft_cluster" tag. - Row1 = setelement(1, Row, #{<<"raft_cluster">> => atom_to_binary(ClusterName, utf8)}), - [Row1 | Acc] - end, [], Table); + [Result]; get_data(exchange_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter)-> ets:foldl(fun ({#resource{kind = exchange, virtual_host = VHost}, _, _, _, _, _} = Row, Acc) when @@ -912,22 +882,12 @@ sum_queue_metrics(Props, {T, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, sum(proplists:get_value(segments, Props), A17) }. -division(0, 0) -> - 0; -division(A, B) -> - A / B. - -accumulate_count_and_sum(Value, {Count, Sum}) -> - {Count + 1, Sum + Value}. - empty(T) when T == channel_queue_exchange_metrics; T == queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count -> {T, 0}; empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics -> {T, 0, 0, 0}; empty(T) when T == channel_exchange_metrics; T == exchange_metrics; T == queue_coarse_metrics; T == connection_metrics -> {T, 0, 0, 0, 0}; -empty(T) when T == ra_metrics -> - {T, 0, 0, 0, 0, 0, {0, 0}}; empty(T) when T == channel_queue_metrics; T == queue_delivery_metrics; T == channel_metrics -> {T, 0, 0, 0, 0, 0, 0, 0}; empty(queue_metrics = T) -> diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_raft_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_raft_metrics_collector.erl new file mode 100644 index 000000000000..5391eded6b3b --- /dev/null +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_raft_metrics_collector.erl @@ -0,0 +1,142 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% +-module(prometheus_rabbitmq_raft_metrics_collector). + +-behaviour(prometheus_collector). + +-export([register/0, + deregister_cleanup/1, + collect_mf/2]). + +-import(prometheus_model_helpers, [create_mf/4, + counter_metric/2]). + +-define(METRIC_NAME_PREFIX, <<"rabbitmq_raft_">>). +-define(DETAILED_METRIC_NAME_PREFIX, <<"rabbitmq_detailed_raft_">>). + +%%==================================================================== +%% Collector API +%%==================================================================== + +register() -> + ok = prometheus_registry:register_collector(?MODULE). + +deregister_cleanup(_) -> + ok. + +collect_mf('per-object', Callback) -> + collect_per_object_metrics(?METRIC_NAME_PREFIX, Callback); +collect_mf('detailed', Callback) -> + case get(prometheus_mf_filter) of + undefined -> + ok; + MFNames -> + case lists:member(ra_metrics, MFNames) of + true -> + collect_detailed_metrics(?DETAILED_METRIC_NAME_PREFIX, Callback); + false -> + ok + end + end; +collect_mf(_Registry, Callback) -> + case application:get_env(rabbitmq_prometheus, return_per_object_metrics, false) of + false -> + collect_aggregate_metrics(?METRIC_NAME_PREFIX, Callback); + true -> + collect_per_object_metrics(?METRIC_NAME_PREFIX, Callback) + end. + +%% INTERNAL + +collect_aggregate_metrics(Prefix, Callback) -> + collect_max_values(Prefix, Callback), + collect_key_component_metrics(Prefix, Callback). + +collect_per_object_metrics(Prefix, Callback) -> + collect_key_component_metrics(Prefix, Callback), + collect_key_per_object_metrics(Prefix, Callback). + +collect_detailed_metrics(Prefix, Callback) -> + VHostFilterFun = case get(prometheus_vhost_filter) of + undefined -> + fun(_) -> true end; + VHosts -> + fun(#{vhost := V}) -> + lists:member(V, VHosts); + (_) -> + false + end + end, + + collect_key_component_metrics(Prefix, Callback), + collect_all_matching_metrics(Prefix, Callback, VHostFilterFun). + +collect_key_per_object_metrics(Prefix, Callback) -> + QQMetrics = [term, + snapshot_index, + last_applied, + commit_index, + last_written_index, + commit_latency, + num_segments], + maps:foreach( + fun(Name, #{type := Type, help := Help, values := Values}) -> + Callback( + create_mf(<>, + Help, + Type, + Values)) + end, + seshat:format(ra, #{labels => as_binary, metrics => QQMetrics})). + +collect_all_matching_metrics(Prefix, Callback, VHostFilterFun) -> + maps:foreach( + fun(Name, #{type := Type, help := Help, values := Values0}) -> + Values = maps:filter(fun(#{vhost := V}, _) -> + VHostFilterFun(V); + (_, _) -> true + end, Values0), + Callback( + create_mf(<>, + Help, + Type, + Values)) + end, + seshat:format(ra, #{labels => as_binary, metrics => all, filter_fun => VHostFilterFun})). + +collect_max_values(Prefix, Callback) -> + %% max values for QQ metrics + %% eg. + %% rabbitmq_raft_num_segments{queue="q1",vhost="/"} 5.0 + %% rabbitmq_raft_num_segments{queue="q2",vhost="/"} 10.0 + %% becomes + %% rabbitmq_raft_max_num_segments 10.0 + QQMetrics = [num_segments], + maps:foreach( + fun(Name, #{type := Type, help := Help, values := Values}) -> + Max = lists:max(maps:values(Values)), + Callback( + create_mf(<>, + Help, + Type, + #{#{} => Max})) + + end, + seshat:format(ra, #{labels => as_binary, metrics => QQMetrics})). + +collect_key_component_metrics(Prefix, Callback) -> + WALMetrics = [wal_files, bytes_written, mem_tables], + SegmentWriterMetrics = [entries, segments], + maps:foreach( + fun(Name, #{type := Type, help := Help, values := Values}) -> + Callback( + create_mf(<>, + Help, + Type, + Values)) + end, + seshat:format(ra, #{labels => as_binary, metrics => WALMetrics ++ SegmentWriterMetrics})). diff --git a/deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl b/deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl index 50a796839baf..b0cbf2c38697 100644 --- a/deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl +++ b/deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl @@ -13,26 +13,33 @@ build_dispatcher() -> {ok, _} = application:ensure_all_started(prometheus), - prometheus_registry:register_collectors([ + CoreCollectors = [ prometheus_rabbitmq_core_metrics_collector, prometheus_rabbitmq_global_metrics_collector, prometheus_rabbitmq_message_size_metrics_collector, + prometheus_rabbitmq_raft_metrics_collector, prometheus_rabbitmq_alarm_metrics_collector, prometheus_rabbitmq_dynamic_collector, - prometheus_process_collector]), - prometheus_registry:register_collectors('per-object', [ + prometheus_process_collector], + PerObjectCollectors = CoreCollectors ++ [ prometheus_vm_system_info_collector, prometheus_vm_dist_collector, prometheus_vm_memory_collector, prometheus_mnesia_collector, prometheus_vm_statistics_collector, - prometheus_vm_msacc_collector, - prometheus_rabbitmq_core_metrics_collector, - prometheus_rabbitmq_global_metrics_collector, - prometheus_rabbitmq_message_size_metrics_collector - ]), + prometheus_vm_msacc_collector + ], + prometheus_registry:register_collectors( + case application:get_env(rabbitmq_prometheus, return_per_object_metrics, false) of + false -> CoreCollectors; + true -> PerObjectCollectors + end + ), + prometheus_registry:register_collectors('per-object', + CoreCollectors ++ PerObjectCollectors), prometheus_registry:register_collectors('detailed', [ - prometheus_rabbitmq_core_metrics_collector + prometheus_rabbitmq_core_metrics_collector, + prometheus_rabbitmq_raft_metrics_collector ]), prometheus_registry:register_collectors('memory-breakdown', [ prometheus_rabbitmq_core_metrics_collector diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index e37db1296a84..44ad5de7307f 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -72,7 +72,8 @@ groups() -> vhost_status_metric, exchange_bindings_metric, exchange_names_metric, - stream_pub_sub_metrics + stream_pub_sub_metrics, + detailed_raft_metrics_test ]}, {special_chars, [], [core_metrics_special_chars]}, {authentication, [], [basic_auth]} @@ -158,6 +159,12 @@ init_per_group(detailed_metrics, Config0) -> Q <- [ <<"queue-with-messages">>, <<"queue-with-consumer">> ] ], + amqp_channel:call(DefaultCh, + #'queue.declare'{queue = <<"a_quorum_queue">>, + durable = true, + arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}] + }), + DefaultConsumer = sleeping_consumer(), #'basic.consume_ok'{consumer_tag = DefaultCTag} = amqp_channel:subscribe(DefaultCh, #'basic.consume'{queue = <<"default-queue-with-consumer">>}, DefaultConsumer), @@ -392,7 +399,6 @@ aggregated_metrics_test(Config) -> ?assertEqual(match, re:run(Body, "^rabbitmq_process_open_fds ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_process_max_fds ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_io_read_ops_total ", [{capture, none}, multiline])), - ?assertEqual(match, re:run(Body, "^rabbitmq_raft_term_total ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_queue_messages_ready ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_queue_consumers ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "TYPE rabbitmq_auth_attempts_total", [{capture, none}, multiline])), @@ -402,8 +408,13 @@ aggregated_metrics_test(Config) -> ?assertEqual(match, re:run(Body, "^rabbitmq_io_read_time_seconds_total ", [{capture, none}, multiline])), %% Check the first TOTALS metric value ?assertEqual(match, re:run(Body, "^rabbitmq_connections ", [{capture, none}, multiline])), - %% Check raft_entry_commit_latency_seconds because we are aggregating it - ?assertEqual(match, re:run(Body, "^rabbitmq_raft_entry_commit_latency_seconds ", [{capture, none}, multiline])). + ?assertEqual(nomatch, re:run(Body, "^rabbitmq_raft_commit_latency_seconds", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_raft_bytes_written.*ra_log_segment_writer", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_raft_bytes_written.*ra_log_wal", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_raft_entries{", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_raft_mem_tables{", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_raft_segments{", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_raft_wal_files{", [{capture, none}, multiline])). endpoint_per_object_metrics(Config) -> per_object_metrics_test(Config, "/metrics/per-object"). @@ -431,7 +442,7 @@ per_object_metrics_test(Config, Path) -> ?assertEqual(match, re:run(Body, "^rabbitmq_process_open_fds ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_process_max_fds ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_io_read_ops_total ", [{capture, none}, multiline])), - ?assertEqual(match, re:run(Body, "^rabbitmq_raft_term_total{", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_raft_term{", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_queue_messages_ready{", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_queue_consumers{", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "TYPE rabbitmq_auth_attempts_total", [{capture, none}, multiline])), @@ -439,9 +450,10 @@ per_object_metrics_test(Config, Path) -> %% Check the first metric value in each ETS table that requires converting ?assertEqual(match, re:run(Body, "^rabbitmq_erlang_uptime_seconds ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_io_read_time_seconds_total ", [{capture, none}, multiline])), - ?assertEqual(match, re:run(Body, "^rabbitmq_raft_entry_commit_latency_seconds{", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_raft_commit_latency_seconds{", [{capture, none}, multiline])), %% Check the first TOTALS metric value - ?assertEqual(match, re:run(Body, "^rabbitmq_connections ", [{capture, none}, multiline])). + ?assertEqual(match, re:run(Body, "^rabbitmq_connections ", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_raft_num_segments{", [{capture, none}, multiline])). memory_breakdown_metrics_test(Config) -> {_Headers, Body} = http_get_with_pal(Config, "/metrics/memory-breakdown", [], 200), @@ -555,7 +567,8 @@ queue_consumer_count_all_vhosts_per_object_test(Config) -> #{queue => "vhost-2-queue-with-consumer",vhost => "vhost-2"} => [1], #{queue => "vhost-2-queue-with-messages",vhost => "vhost-2"} => [0], #{queue => "default-queue-with-consumer",vhost => "/"} => [1], - #{queue => "default-queue-with-messages",vhost => "/"} => [0]}, + #{queue => "default-queue-with-messages",vhost => "/"} => [0], + #{queue => "a_quorum_queue",vhost => "/"} => [0]}, rabbitmq_detailed_queue_info => #{#{queue => "default-queue-with-consumer", @@ -581,7 +594,10 @@ queue_consumer_count_all_vhosts_per_object_test(Config) -> #{queue => "vhost-2-queue-with-messages", vhost => "vhost-2", queue_type => "rabbit_classic_queue", - membership => "leader"} => [1]} + membership => "leader"} => [1], + #{membership => "leader", + queue => "a_quorum_queue",vhost => "/", + queue_type => "rabbit_quorum_queue"} => [1]} }, %% No vhost given, all should be returned @@ -599,7 +615,8 @@ queue_coarse_metrics_per_object_test(Config) -> Expected2 = #{#{queue => "vhost-2-queue-with-consumer", vhost => "vhost-2"} => [11], #{queue => "vhost-2-queue-with-messages", vhost => "vhost-2"} => [11]}, ExpectedD = #{#{queue => "default-queue-with-consumer", vhost => "/"} => [3], - #{queue => "default-queue-with-messages", vhost => "/"} => [3]}, + #{queue => "default-queue-with-messages", vhost => "/"} => [3], + #{queue => "a_quorum_queue",vhost => "/"} => [0]}, {_, Body1} = http_get_with_pal(Config, "/metrics/detailed?vhost=vhost-1&family=queue_coarse_metrics", [], 200), ?assertEqual(Expected1, @@ -707,7 +724,8 @@ queue_metrics_per_object_test(Config) -> Expected2 = #{#{queue => "vhost-2-queue-with-consumer", vhost => "vhost-2"} => [11], #{queue => "vhost-2-queue-with-messages", vhost => "vhost-2"} => [1]}, ExpectedD = #{#{queue => "default-queue-with-consumer", vhost => "/"} => [3], - #{queue => "default-queue-with-messages", vhost => "/"} => [1]}, + #{queue => "default-queue-with-messages", vhost => "/"} => [1], + #{queue => "a_quorum_queue",vhost => "/"} => [0]}, {_, Body1} = http_get_with_pal(Config, "/metrics/detailed?vhost=vhost-1&family=queue_metrics", [], 200), ?assertEqual(Expected1, map_get(rabbitmq_detailed_queue_messages_ram, parse_response(Body1))), @@ -838,6 +856,27 @@ core_metrics_special_chars(Config) -> maps:to_list(LabelValue3)), ok. +detailed_raft_metrics_test(Config) -> + ComponentMetrics = #{#{module => "ra_log_wal", ra_system => "coordination"} => ["1.0"], + #{module => "ra_log_wal", ra_system => "quorum_queues"} => ["1.0"]}, + QQMetrics = #{#{queue => "a_quorum_queue", vhost => "/"} => ["1.0"]}, + + {_, Body1} = http_get_with_pal(Config, "/metrics/detailed?family=ra_metrics&vhost=foo", [], 200), + %% no queues in vhost foo, so no QQ metrics + ?assertEqual(ComponentMetrics, + map_get(rabbitmq_detailed_raft_wal_files, parse_response(Body1))), + ?assertEqual(undefined, + maps:get(rabbitmq_detailed_raft_term, parse_response(Body1), undefined)), + + {_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=ra_metrics&vhost=/", [], 200), + %% there's a queue in vhost / + ?assertEqual(ComponentMetrics, + map_get(rabbitmq_detailed_raft_wal_files, parse_response(Body2))), + ?assertEqual(QQMetrics, + map_get(rabbitmq_detailed_raft_term, parse_response(Body2))), + + ok. + basic_auth(Config) -> http_get(Config, [{"accept-encoding", "deflate"}], 401), AuthHeader = rabbit_mgmt_test_util:auth_header("guest", "guest"), diff --git a/deps/rabbitmq_stream/src/rabbit_stream.erl b/deps/rabbitmq_stream/src/rabbit_stream.erl index 5d7547cdf8cc..d68e7ff144d7 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream.erl @@ -41,10 +41,10 @@ start(_Type, _Args) -> rabbit_stream_metrics:init(), - rabbit_global_counters:init([{protocol, stream}], + rabbit_global_counters:init(#{protocol => stream}, ?PROTOCOL_COUNTERS), - rabbit_global_counters:init([{protocol, stream}, - {queue_type, ?STREAM_QUEUE_TYPE}]), + rabbit_global_counters:init(#{protocol => stream, + queue_type => ?STREAM_QUEUE_TYPE}), rabbit_stream_sup:start_link(). tls_host() -> diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index e7a40363ad14..df3d62b1c38e 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -1619,7 +1619,7 @@ get_osiris_counters(Config) -> []). get_global_counters(Config) -> - maps:get([{protocol, stream}], + maps:get(#{protocol => stream}, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_global_counters, diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 62388150c94e..f6302abcfa13 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -49,16 +49,16 @@ dep_jose = hex 1.11.10 dep_khepri = hex 0.17.1 dep_khepri_mnesia_migration = hex 0.8.0 dep_meck = hex 1.0.0 -dep_osiris = git https://github.com/rabbitmq/osiris v1.8.8 -dep_prometheus = hex 4.11.0 -dep_ra = hex 2.16.12 +dep_osiris = git https://github.com/rabbitmq/osiris v1.9.0 +dep_prometheus = hex 5.1.1 +dep_ra = hex 2.17.0 dep_ranch = hex 2.2.0 dep_recon = hex 2.5.6 dep_redbug = hex 2.1.0 dep_systemd = hex 0.6.1 dep_thoas = hex 1.2.1 dep_observer_cli = hex 1.8.2 -dep_seshat = git https://github.com/rabbitmq/seshat v0.6.1 +dep_seshat = hex 1.0.0 dep_stdout_formatter = hex 0.2.4 dep_sysmon_handler = hex 1.3.0