Skip to content

Commit

Permalink
KAFKA-12469: Deprecated and corrected topic metrics for consumer (KIP…
Browse files Browse the repository at this point in the history
…-1109) (#18232)

The PR implements the behaviour defined in KIP-1109. It corrects the consumer topic and topic-partition metrics, while deprecating the incorrect ones.

Reviewers: Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>, Andrew Schofield <[email protected]>
  • Loading branch information
apoorvmittal10 authored Dec 31, 2024
1 parent 5f8cf0e commit f88cf57
Show file tree
Hide file tree
Showing 3 changed files with 353 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import org.apache.kafka.common.metrics.stats.WindowedCount;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;

/**
* The {@link FetchMetricsManager} class provides wrapper methods to record lag, lead, latency, and fetch metrics.
* It keeps an internal ID of the assigned set of partitions which is updated to ensure the set of metrics it
Expand Down Expand Up @@ -101,32 +103,38 @@ void recordRecordsFetched(int records) {

void recordBytesFetched(String topic, int bytes) {
String name = topicBytesFetchedMetricName(topic);
Sensor bytesFetched = new SensorBuilder(metrics, name, () -> topicTags(topic))
.withAvg(metricsRegistry.topicFetchSizeAvg)
.withMax(metricsRegistry.topicFetchSizeMax)
.withMeter(metricsRegistry.topicBytesConsumedRate, metricsRegistry.topicBytesConsumedTotal)
.build();
maybeRecordDeprecatedBytesFetched(name, topic, bytes);

Sensor bytesFetched = new SensorBuilder(metrics, name, () -> Map.of("topic", topic))
.withAvg(metricsRegistry.topicFetchSizeAvg)
.withMax(metricsRegistry.topicFetchSizeMax)
.withMeter(metricsRegistry.topicBytesConsumedRate, metricsRegistry.topicBytesConsumedTotal)
.build();
bytesFetched.record(bytes);
}

void recordRecordsFetched(String topic, int records) {
String name = topicRecordsFetchedMetricName(topic);
Sensor recordsFetched = new SensorBuilder(metrics, name, () -> topicTags(topic))
.withAvg(metricsRegistry.topicRecordsPerRequestAvg)
.withMeter(metricsRegistry.topicRecordsConsumedRate, metricsRegistry.topicRecordsConsumedTotal)
.build();
maybeRecordDeprecatedRecordsFetched(name, topic, records);

Sensor recordsFetched = new SensorBuilder(metrics, name, () -> Map.of("topic", topic))
.withAvg(metricsRegistry.topicRecordsPerRequestAvg)
.withMeter(metricsRegistry.topicRecordsConsumedRate, metricsRegistry.topicRecordsConsumedTotal)
.build();
recordsFetched.record(records);
}

void recordPartitionLag(TopicPartition tp, long lag) {
this.recordsLag.record(lag);

String name = partitionRecordsLagMetricName(tp);
Sensor recordsLag = new SensorBuilder(metrics, name, () -> topicPartitionTags(tp))
.withValue(metricsRegistry.partitionRecordsLag)
.withMax(metricsRegistry.partitionRecordsLagMax)
.withAvg(metricsRegistry.partitionRecordsLagAvg)
.build();
maybeRecordDeprecatedPartitionLag(name, tp, lag);

Sensor recordsLag = new SensorBuilder(metrics, name, () -> mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", String.valueOf(tp.partition()))))
.withValue(metricsRegistry.partitionRecordsLag)
.withMax(metricsRegistry.partitionRecordsLagMax)
.withAvg(metricsRegistry.partitionRecordsLagAvg)
.build();

recordsLag.record(lag);
}
Expand All @@ -135,11 +143,13 @@ void recordPartitionLead(TopicPartition tp, long lead) {
this.recordsLead.record(lead);

String name = partitionRecordsLeadMetricName(tp);
Sensor recordsLead = new SensorBuilder(metrics, name, () -> topicPartitionTags(tp))
.withValue(metricsRegistry.partitionRecordsLead)
.withMin(metricsRegistry.partitionRecordsLeadMin)
.withAvg(metricsRegistry.partitionRecordsLeadAvg)
.build();
maybeRecordDeprecatedPartitionLead(name, tp, lead);

Sensor recordsLead = new SensorBuilder(metrics, name, () -> mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", String.valueOf(tp.partition()))))
.withValue(metricsRegistry.partitionRecordsLead)
.withMin(metricsRegistry.partitionRecordsLeadMin)
.withAvg(metricsRegistry.partitionRecordsLeadAvg)
.build();

recordsLead.record(lead);
}
Expand All @@ -162,16 +172,22 @@ void maybeUpdateAssignment(SubscriptionState subscription) {
metrics.removeSensor(partitionRecordsLagMetricName(tp));
metrics.removeSensor(partitionRecordsLeadMetricName(tp));
metrics.removeMetric(partitionPreferredReadReplicaMetricName(tp));
// Remove deprecated metrics.
metrics.removeSensor(deprecatedMetricName(partitionRecordsLagMetricName(tp)));
metrics.removeSensor(deprecatedMetricName(partitionRecordsLeadMetricName(tp)));
metrics.removeMetric(deprecatedPartitionPreferredReadReplicaMetricName(tp));
}
}

for (TopicPartition tp : newAssignedPartitions) {
if (!this.assignedPartitions.contains(tp)) {
maybeRecordDeprecatedPreferredReadReplica(tp, subscription);

MetricName metricName = partitionPreferredReadReplicaMetricName(tp);
metrics.addMetricIfAbsent(
metricName,
null,
(Gauge<Integer>) (config, now) -> subscription.preferredReadReplica(tp, 0L).orElse(-1)
metricName,
null,
(Gauge<Integer>) (config, now) -> subscription.preferredReadReplica(tp, 0L).orElse(-1)
);
}
}
Expand All @@ -181,6 +197,67 @@ void maybeUpdateAssignment(SubscriptionState subscription) {
}
}

@Deprecated // To be removed in Kafka 5.0 release.
private void maybeRecordDeprecatedBytesFetched(String name, String topic, int bytes) {
if (shouldReportDeprecatedMetric(topic)) {
Sensor deprecatedBytesFetched = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicTags(topic))
.withAvg(metricsRegistry.topicFetchSizeAvg)
.withMax(metricsRegistry.topicFetchSizeMax)
.withMeter(metricsRegistry.topicBytesConsumedRate, metricsRegistry.topicBytesConsumedTotal)
.build();
deprecatedBytesFetched.record(bytes);
}
}

@Deprecated // To be removed in Kafka 5.0 release.
private void maybeRecordDeprecatedRecordsFetched(String name, String topic, int records) {
if (shouldReportDeprecatedMetric(topic)) {
Sensor deprecatedRecordsFetched = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicTags(topic))
.withAvg(metricsRegistry.topicRecordsPerRequestAvg)
.withMeter(metricsRegistry.topicRecordsConsumedRate, metricsRegistry.topicRecordsConsumedTotal)
.build();
deprecatedRecordsFetched.record(records);
}
}

@Deprecated // To be removed in Kafka 5.0 release.
private void maybeRecordDeprecatedPartitionLag(String name, TopicPartition tp, long lag) {
if (shouldReportDeprecatedMetric(tp.topic())) {
Sensor deprecatedRecordsLag = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicPartitionTags(tp))
.withValue(metricsRegistry.partitionRecordsLag)
.withMax(metricsRegistry.partitionRecordsLagMax)
.withAvg(metricsRegistry.partitionRecordsLagAvg)
.build();

deprecatedRecordsLag.record(lag);
}
}

@Deprecated // To be removed in Kafka 5.0 release.
private void maybeRecordDeprecatedPartitionLead(String name, TopicPartition tp, double lead) {
if (shouldReportDeprecatedMetric(tp.topic())) {
Sensor deprecatedRecordsLead = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicPartitionTags(tp))
.withValue(metricsRegistry.partitionRecordsLead)
.withMin(metricsRegistry.partitionRecordsLeadMin)
.withAvg(metricsRegistry.partitionRecordsLeadAvg)
.build();

deprecatedRecordsLead.record(lead);
}
}

@Deprecated // To be removed in Kafka 5.0 release.
private void maybeRecordDeprecatedPreferredReadReplica(TopicPartition tp, SubscriptionState subscription) {
if (shouldReportDeprecatedMetric(tp.topic())) {
MetricName metricName = deprecatedPartitionPreferredReadReplicaMetricName(tp);
metrics.addMetricIfAbsent(
metricName,
null,
(Gauge<Integer>) (config, now) -> subscription.preferredReadReplica(tp, 0L).orElse(-1)
);
}
}

private static String topicBytesFetchedMetricName(String topic) {
return "topic." + topic + ".bytes-fetched";
}
Expand All @@ -197,22 +274,34 @@ private static String partitionRecordsLagMetricName(TopicPartition tp) {
return tp + ".records-lag";
}

private static String deprecatedMetricName(String name) {
return name + ".deprecated";
}

private static boolean shouldReportDeprecatedMetric(String topic) {
return topic.contains(".");
}

private MetricName partitionPreferredReadReplicaMetricName(TopicPartition tp) {
Map<String, String> metricTags = mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", String.valueOf(tp.partition())));
return this.metrics.metricInstance(metricsRegistry.partitionPreferredReadReplica, metricTags);
}

@Deprecated
private MetricName deprecatedPartitionPreferredReadReplicaMetricName(TopicPartition tp) {
Map<String, String> metricTags = topicPartitionTags(tp);
return this.metrics.metricInstance(metricsRegistry.partitionPreferredReadReplica, metricTags);
}

@Deprecated
static Map<String, String> topicTags(String topic) {
Map<String, String> metricTags = new HashMap<>(1);
metricTags.put("topic", topic.replace('.', '_'));
return metricTags;
return Map.of("topic", topic.replace('.', '_'));
}

@Deprecated
static Map<String, String> topicPartitionTags(TopicPartition tp) {
Map<String, String> metricTags = new HashMap<>(2);
metricTags.put("topic", tp.topic().replace('.', '_'));
metricTags.put("partition", String.valueOf(tp.partition()));
return metricTags;
return mkMap(mkEntry("topic", tp.topic().replace('.', '_')),
mkEntry("partition", String.valueOf(tp.partition())));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

public class FetchMetricsRegistry {

private static final String DEPRECATED_TOPIC_METRICS_MESSAGE = "Note: For topic names with periods (.), an additional "
+ "metric with underscores is emitted. However, the periods replaced metric is deprecated. Please use the metric with actual topic name instead.";

public MetricNameTemplate fetchSizeAvg;
public MetricNameTemplate fetchSizeMax;
public MetricNameTemplate bytesConsumedRate;
Expand Down Expand Up @@ -110,39 +113,39 @@ public FetchMetricsRegistry(Set<String> tags, String metricGrpPrefix) {
topicTags.add("topic");

this.topicFetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName,
"The average number of bytes fetched per request for a topic", topicTags);
"The average number of bytes fetched per request for a topic. " + DEPRECATED_TOPIC_METRICS_MESSAGE, topicTags);
this.topicFetchSizeMax = new MetricNameTemplate("fetch-size-max", groupName,
"The maximum number of bytes fetched per request for a topic", topicTags);
"The maximum number of bytes fetched per request for a topic. " + DEPRECATED_TOPIC_METRICS_MESSAGE, topicTags);
this.topicBytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName,
"The average number of bytes consumed per second for a topic", topicTags);
"The average number of bytes consumed per second for a topic. " + DEPRECATED_TOPIC_METRICS_MESSAGE, topicTags);
this.topicBytesConsumedTotal = new MetricNameTemplate("bytes-consumed-total", groupName,
"The total number of bytes consumed for a topic", topicTags);
"The total number of bytes consumed for a topic. " + DEPRECATED_TOPIC_METRICS_MESSAGE, topicTags);

this.topicRecordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName,
"The average number of records in each request for a topic", topicTags);
"The average number of records in each request for a topic. " + DEPRECATED_TOPIC_METRICS_MESSAGE, topicTags);
this.topicRecordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName,
"The average number of records consumed per second for a topic", topicTags);
"The average number of records consumed per second for a topic. " + DEPRECATED_TOPIC_METRICS_MESSAGE, topicTags);
this.topicRecordsConsumedTotal = new MetricNameTemplate("records-consumed-total", groupName,
"The total number of records consumed for a topic", topicTags);
"The total number of records consumed for a topic. " + DEPRECATED_TOPIC_METRICS_MESSAGE, topicTags);

/* Partition level */
Set<String> partitionTags = new HashSet<>(topicTags);
partitionTags.add("partition");
this.partitionRecordsLag = new MetricNameTemplate("records-lag", groupName,
"The latest lag of the partition", partitionTags);
"The latest lag of the partition. " + DEPRECATED_TOPIC_METRICS_MESSAGE, partitionTags);
this.partitionRecordsLagMax = new MetricNameTemplate("records-lag-max", groupName,
"The max lag of the partition", partitionTags);
"The max lag of the partition. " + DEPRECATED_TOPIC_METRICS_MESSAGE, partitionTags);
this.partitionRecordsLagAvg = new MetricNameTemplate("records-lag-avg", groupName,
"The average lag of the partition", partitionTags);
"The average lag of the partition. " + DEPRECATED_TOPIC_METRICS_MESSAGE, partitionTags);
this.partitionRecordsLead = new MetricNameTemplate("records-lead", groupName,
"The latest lead of the partition", partitionTags);
"The latest lead of the partition. " + DEPRECATED_TOPIC_METRICS_MESSAGE, partitionTags);
this.partitionRecordsLeadMin = new MetricNameTemplate("records-lead-min", groupName,
"The min lead of the partition", partitionTags);
"The min lead of the partition. " + DEPRECATED_TOPIC_METRICS_MESSAGE, partitionTags);
this.partitionRecordsLeadAvg = new MetricNameTemplate("records-lead-avg", groupName,
"The average lead of the partition", partitionTags);
"The average lead of the partition. " + DEPRECATED_TOPIC_METRICS_MESSAGE, partitionTags);
this.partitionPreferredReadReplica = new MetricNameTemplate(
"preferred-read-replica", groupName,
"The current read replica for the partition, or -1 if reading from leader", partitionTags);
"The current read replica for the partition, or -1 if reading from leader. " + DEPRECATED_TOPIC_METRICS_MESSAGE, partitionTags);
}

public List<MetricNameTemplate> getAllTemplates() {
Expand Down
Loading

0 comments on commit f88cf57

Please sign in to comment.