Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12469: Deprecated and corrected topic metrics for consumer (KIP-1109) #18232

Merged
merged 7 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import org.apache.kafka.common.metrics.stats.WindowedCount;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;

/**
* The {@link FetchMetricsManager} class provides wrapper methods to record lag, lead, latency, and fetch metrics.
* It keeps an internal ID of the assigned set of partitions which is updated to ensure the set of metrics it
Expand Down Expand Up @@ -101,32 +103,38 @@ void recordRecordsFetched(int records) {

void recordBytesFetched(String topic, int bytes) {
String name = topicBytesFetchedMetricName(topic);
Sensor bytesFetched = new SensorBuilder(metrics, name, () -> topicTags(topic))
.withAvg(metricsRegistry.topicFetchSizeAvg)
.withMax(metricsRegistry.topicFetchSizeMax)
.withMeter(metricsRegistry.topicBytesConsumedRate, metricsRegistry.topicBytesConsumedTotal)
.build();
maybeRecordDeprecatedBytesFetched(name, topic, bytes);

Sensor bytesFetched = new SensorBuilder(metrics, name, () -> Map.of("topic", topic))
.withAvg(metricsRegistry.topicFetchSizeAvg)
.withMax(metricsRegistry.topicFetchSizeMax)
.withMeter(metricsRegistry.topicBytesConsumedRate, metricsRegistry.topicBytesConsumedTotal)
.build();
bytesFetched.record(bytes);
}

void recordRecordsFetched(String topic, int records) {
String name = topicRecordsFetchedMetricName(topic);
Sensor recordsFetched = new SensorBuilder(metrics, name, () -> topicTags(topic))
.withAvg(metricsRegistry.topicRecordsPerRequestAvg)
.withMeter(metricsRegistry.topicRecordsConsumedRate, metricsRegistry.topicRecordsConsumedTotal)
.build();
maybeRecordDeprecatedRecordsFetched(name, topic, records);

Sensor recordsFetched = new SensorBuilder(metrics, name, () -> Map.of("topic", topic))
.withAvg(metricsRegistry.topicRecordsPerRequestAvg)
.withMeter(metricsRegistry.topicRecordsConsumedRate, metricsRegistry.topicRecordsConsumedTotal)
.build();
recordsFetched.record(records);
}

void recordPartitionLag(TopicPartition tp, long lag) {
this.recordsLag.record(lag);

String name = partitionRecordsLagMetricName(tp);
Sensor recordsLag = new SensorBuilder(metrics, name, () -> topicPartitionTags(tp))
.withValue(metricsRegistry.partitionRecordsLag)
.withMax(metricsRegistry.partitionRecordsLagMax)
.withAvg(metricsRegistry.partitionRecordsLagAvg)
.build();
maybeRecordDeprecatedPartitionLag(name, tp, lag);

Sensor recordsLag = new SensorBuilder(metrics, name, () -> mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", String.valueOf(tp.partition()))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we do the same for the deprecated metrics too for better consistency?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why you're still using the Utils.mkMap methods instead of Map.of from Java 9+?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma please see my previous comment #18232 (comment)

Copy link
Member

@ijuma ijuma Jan 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if we want insertion ordering to be preserved, we should change SensorBuilder to use a LinkedHashMap (SequencedMap would be better, but that requires Java 21). Having this implicit requirement and hoping people get it right is very error prone.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agree a explicit requirement to send an ordered map would be better in builder itself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have opened https://issues.apache.org/jira/browse/KAFKA-18390 to address @ijuma's comment. The fix is targeted for version 4.1.0, as it may introduce significant changes to the codebase.

.withValue(metricsRegistry.partitionRecordsLag)
.withMax(metricsRegistry.partitionRecordsLagMax)
.withAvg(metricsRegistry.partitionRecordsLagAvg)
.build();

recordsLag.record(lag);
}
Expand All @@ -135,11 +143,13 @@ void recordPartitionLead(TopicPartition tp, long lead) {
this.recordsLead.record(lead);

String name = partitionRecordsLeadMetricName(tp);
Sensor recordsLead = new SensorBuilder(metrics, name, () -> topicPartitionTags(tp))
.withValue(metricsRegistry.partitionRecordsLead)
.withMin(metricsRegistry.partitionRecordsLeadMin)
.withAvg(metricsRegistry.partitionRecordsLeadAvg)
.build();
maybeRecordDeprecatedPartitionLead(name, tp, lead);

Sensor recordsLead = new SensorBuilder(metrics, name, () -> mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", String.valueOf(tp.partition()))))
.withValue(metricsRegistry.partitionRecordsLead)
.withMin(metricsRegistry.partitionRecordsLeadMin)
.withAvg(metricsRegistry.partitionRecordsLeadAvg)
.build();

recordsLead.record(lead);
}
Expand All @@ -162,16 +172,22 @@ void maybeUpdateAssignment(SubscriptionState subscription) {
metrics.removeSensor(partitionRecordsLagMetricName(tp));
metrics.removeSensor(partitionRecordsLeadMetricName(tp));
metrics.removeMetric(partitionPreferredReadReplicaMetricName(tp));
// Remove deprecated metrics.
metrics.removeSensor(deprecatedMetricName(partitionRecordsLagMetricName(tp)));
metrics.removeSensor(deprecatedMetricName(partitionRecordsLeadMetricName(tp)));
metrics.removeMetric(deprecatedPartitionPreferredReadReplicaMetricName(tp));
}
}

for (TopicPartition tp : newAssignedPartitions) {
if (!this.assignedPartitions.contains(tp)) {
maybeRecordDeprecatedPreferredReadReplica(tp, subscription);

MetricName metricName = partitionPreferredReadReplicaMetricName(tp);
metrics.addMetricIfAbsent(
metricName,
null,
(Gauge<Integer>) (config, now) -> subscription.preferredReadReplica(tp, 0L).orElse(-1)
metricName,
null,
(Gauge<Integer>) (config, now) -> subscription.preferredReadReplica(tp, 0L).orElse(-1)
);
}
}
Expand All @@ -181,6 +197,67 @@ void maybeUpdateAssignment(SubscriptionState subscription) {
}
}

@Deprecated // To be removed in Kafka 5.0 release.
private void maybeRecordDeprecatedBytesFetched(String name, String topic, int bytes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be useful to follow the approach used in https://github.com/apache/kafka/pull/11302/files to add "deprecated" in the description of the metric name and update the ops doc about the deprecation.

Copy link
Collaborator Author

@apoorvmittal10 apoorvmittal10 Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the metrics with a specfic tag has been deprectaed hence it's hard to specify same metric as deprecated. Hence I have added Note to consumer topic metrics. For consumer metrics ops need not to ne updated as ops.html uses metrics description to generate details on kafka-site, I have verified that the added Note can be viewed on kafka-site locally. Please let me know if you think it should be handled differently.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally speaking, we should always include a note explaining the deprecation. This should include the first version it was deprecated, when it's expected to be removed and what should be used instead.

Copy link
Collaborator Author

@apoorvmittal10 apoorvmittal10 Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, so currently below Note has been added to metrics.

Current:

"Note: For topic names with periods (.), an additional metric with underscores is emitted.
However, the periods replaced metric is deprecated.
Please use the metric with actual topic name instead."

Example:

Screenshot 2025-01-02 at 11 21 27

Should we change it to:

"Note: For topic names with periods (.), an additional metric with underscores is emitted.
However, the periods replaced metric is deprecated since Kafka 4.1 and shall be removed in Kafka 5.0.
Please use the metric with actual topic name instead."

wdyt?

if (shouldReportDeprecatedMetric(topic)) {
Sensor deprecatedBytesFetched = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicTags(topic))
.withAvg(metricsRegistry.topicFetchSizeAvg)
.withMax(metricsRegistry.topicFetchSizeMax)
.withMeter(metricsRegistry.topicBytesConsumedRate, metricsRegistry.topicBytesConsumedTotal)
.build();
deprecatedBytesFetched.record(bytes);
}
}

@Deprecated // To be removed in Kafka 5.0 release.
private void maybeRecordDeprecatedRecordsFetched(String name, String topic, int records) {
if (shouldReportDeprecatedMetric(topic)) {
Sensor deprecatedRecordsFetched = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicTags(topic))
.withAvg(metricsRegistry.topicRecordsPerRequestAvg)
.withMeter(metricsRegistry.topicRecordsConsumedRate, metricsRegistry.topicRecordsConsumedTotal)
.build();
deprecatedRecordsFetched.record(records);
}
}

@Deprecated // To be removed in Kafka 5.0 release.
private void maybeRecordDeprecatedPartitionLag(String name, TopicPartition tp, long lag) {
if (shouldReportDeprecatedMetric(tp.topic())) {
Sensor deprecatedRecordsLag = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicPartitionTags(tp))
.withValue(metricsRegistry.partitionRecordsLag)
.withMax(metricsRegistry.partitionRecordsLagMax)
.withAvg(metricsRegistry.partitionRecordsLagAvg)
.build();

deprecatedRecordsLag.record(lag);
}
}

@Deprecated // To be removed in Kafka 5.0 release.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please open the jira to ensure we will remove them from 5.0 :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private void maybeRecordDeprecatedPartitionLead(String name, TopicPartition tp, double lead) {
if (shouldReportDeprecatedMetric(tp.topic())) {
Sensor deprecatedRecordsLead = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicPartitionTags(tp))
.withValue(metricsRegistry.partitionRecordsLead)
.withMin(metricsRegistry.partitionRecordsLeadMin)
.withAvg(metricsRegistry.partitionRecordsLeadAvg)
.build();

deprecatedRecordsLead.record(lead);
}
}

@Deprecated // To be removed in Kafka 5.0 release.
private void maybeRecordDeprecatedPreferredReadReplica(TopicPartition tp, SubscriptionState subscription) {
if (shouldReportDeprecatedMetric(tp.topic())) {
MetricName metricName = deprecatedPartitionPreferredReadReplicaMetricName(tp);
metrics.addMetricIfAbsent(
metricName,
null,
(Gauge<Integer>) (config, now) -> subscription.preferredReadReplica(tp, 0L).orElse(-1)
);
}
}

private static String topicBytesFetchedMetricName(String topic) {
return "topic." + topic + ".bytes-fetched";
}
Expand All @@ -197,22 +274,34 @@ private static String partitionRecordsLagMetricName(TopicPartition tp) {
return tp + ".records-lag";
}

private static String deprecatedMetricName(String name) {
return name + ".deprecated";
}

private static boolean shouldReportDeprecatedMetric(String topic) {
return topic.contains(".");
}

private MetricName partitionPreferredReadReplicaMetricName(TopicPartition tp) {
Map<String, String> metricTags = mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", String.valueOf(tp.partition())));
return this.metrics.metricInstance(metricsRegistry.partitionPreferredReadReplica, metricTags);
}

@Deprecated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should never deprecate something without including a @deprecated javadoc with the details I outlined in a separate comment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I have added details here #18232 (comment), please let me know your thoughts.

private MetricName deprecatedPartitionPreferredReadReplicaMetricName(TopicPartition tp) {
Map<String, String> metricTags = topicPartitionTags(tp);
return this.metrics.metricInstance(metricsRegistry.partitionPreferredReadReplica, metricTags);
}

@Deprecated
static Map<String, String> topicTags(String topic) {
Map<String, String> metricTags = new HashMap<>(1);
metricTags.put("topic", topic.replace('.', '_'));
return metricTags;
return Map.of("topic", topic.replace('.', '_'));
}

@Deprecated
static Map<String, String> topicPartitionTags(TopicPartition tp) {
Map<String, String> metricTags = new HashMap<>(2);
metricTags.put("topic", tp.topic().replace('.', '_'));
metricTags.put("partition", String.valueOf(tp.partition()));
return metricTags;
return mkMap(mkEntry("topic", tp.topic().replace('.', '_')),
mkEntry("partition", String.valueOf(tp.partition())));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

public class FetchMetricsRegistry {

private static final String DEPRECATED_TOPIC_METRICS_MESSAGE = "Note: For topic names with periods (.), an additional "
+ "metric with underscores is emitted. However, the periods replaced metric is deprecated. Please use the metric with actual topic name instead.";

public MetricNameTemplate fetchSizeAvg;
public MetricNameTemplate fetchSizeMax;
public MetricNameTemplate bytesConsumedRate;
Expand Down Expand Up @@ -110,39 +113,39 @@ public FetchMetricsRegistry(Set<String> tags, String metricGrpPrefix) {
topicTags.add("topic");

this.topicFetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName,
"The average number of bytes fetched per request for a topic", topicTags);
"The average number of bytes fetched per request for a topic. " + DEPRECATED_TOPIC_METRICS_MESSAGE, topicTags);
this.topicFetchSizeMax = new MetricNameTemplate("fetch-size-max", groupName,
"The maximum number of bytes fetched per request for a topic", topicTags);
"The maximum number of bytes fetched per request for a topic. " + DEPRECATED_TOPIC_METRICS_MESSAGE, topicTags);
this.topicBytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName,
"The average number of bytes consumed per second for a topic", topicTags);
"The average number of bytes consumed per second for a topic. " + DEPRECATED_TOPIC_METRICS_MESSAGE, topicTags);
this.topicBytesConsumedTotal = new MetricNameTemplate("bytes-consumed-total", groupName,
"The total number of bytes consumed for a topic", topicTags);
"The total number of bytes consumed for a topic. " + DEPRECATED_TOPIC_METRICS_MESSAGE, topicTags);

this.topicRecordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName,
"The average number of records in each request for a topic", topicTags);
"The average number of records in each request for a topic. " + DEPRECATED_TOPIC_METRICS_MESSAGE, topicTags);
this.topicRecordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName,
"The average number of records consumed per second for a topic", topicTags);
"The average number of records consumed per second for a topic. " + DEPRECATED_TOPIC_METRICS_MESSAGE, topicTags);
this.topicRecordsConsumedTotal = new MetricNameTemplate("records-consumed-total", groupName,
"The total number of records consumed for a topic", topicTags);
"The total number of records consumed for a topic. " + DEPRECATED_TOPIC_METRICS_MESSAGE, topicTags);

/* Partition level */
Set<String> partitionTags = new HashSet<>(topicTags);
partitionTags.add("partition");
this.partitionRecordsLag = new MetricNameTemplate("records-lag", groupName,
"The latest lag of the partition", partitionTags);
"The latest lag of the partition. " + DEPRECATED_TOPIC_METRICS_MESSAGE, partitionTags);
this.partitionRecordsLagMax = new MetricNameTemplate("records-lag-max", groupName,
"The max lag of the partition", partitionTags);
"The max lag of the partition. " + DEPRECATED_TOPIC_METRICS_MESSAGE, partitionTags);
this.partitionRecordsLagAvg = new MetricNameTemplate("records-lag-avg", groupName,
"The average lag of the partition", partitionTags);
"The average lag of the partition. " + DEPRECATED_TOPIC_METRICS_MESSAGE, partitionTags);
this.partitionRecordsLead = new MetricNameTemplate("records-lead", groupName,
"The latest lead of the partition", partitionTags);
"The latest lead of the partition. " + DEPRECATED_TOPIC_METRICS_MESSAGE, partitionTags);
this.partitionRecordsLeadMin = new MetricNameTemplate("records-lead-min", groupName,
"The min lead of the partition", partitionTags);
"The min lead of the partition. " + DEPRECATED_TOPIC_METRICS_MESSAGE, partitionTags);
this.partitionRecordsLeadAvg = new MetricNameTemplate("records-lead-avg", groupName,
"The average lead of the partition", partitionTags);
"The average lead of the partition. " + DEPRECATED_TOPIC_METRICS_MESSAGE, partitionTags);
this.partitionPreferredReadReplica = new MetricNameTemplate(
"preferred-read-replica", groupName,
"The current read replica for the partition, or -1 if reading from leader", partitionTags);
"The current read replica for the partition, or -1 if reading from leader. " + DEPRECATED_TOPIC_METRICS_MESSAGE, partitionTags);
}

public List<MetricNameTemplate> getAllTemplates() {
Expand Down
Loading
Loading