From fbf526848a15153727d12ab1ae4956847ec3f50f Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 26 Aug 2024 10:05:21 +0800 Subject: [PATCH] [improve] [broker] Part 2 of PIP-370: add metrics "pulsar_replication_disconnected_count" (#23213) (cherry picked from commit 09a16c26974408de270bcaaf6162b0e2a9a6d203) --- .../prometheus/AggregatedNamespaceStats.java | 1 + .../AggregatedReplicationStats.java | 3 + .../prometheus/NamespaceStatsAggregator.java | 8 +- .../broker/stats/prometheus/TopicStats.java | 2 + .../broker/service/OneWayReplicatorTest.java | 121 ++++++++++++++++++ .../OneWayReplicatorUsingGlobalZKTest.java | 6 + .../AggregatedNamespaceStatsTest.java | 2 + 7 files changed, 142 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java index 3975cd89cfa6b..85ff15c915aa7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java @@ -133,6 +133,7 @@ void updateStats(TopicStats stats) { replStats.replicationBacklog += as.replicationBacklog; replStats.msgRateExpired += as.msgRateExpired; replStats.connectedCount += as.connectedCount; + replStats.disconnectedCount += as.disconnectedCount; replStats.replicationDelayInSeconds += as.replicationDelayInSeconds; }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java index 78f33f874e998..82668de6c35f7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java @@ -41,6 +41,9 @@ public class AggregatedReplicationStats { /** The count of replication-subscriber up and running to replicate to remote cluster. */ public long connectedCount; + /** The count of replication-subscriber that failed to start to replicate to remote cluster. */ + public long disconnectedCount; + /** Time in seconds from the time a message was produced to the time when it is about to be replicated. */ public long replicationDelayInSeconds; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 3728c3edd1e8b..d25af8d289c27 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -301,7 +301,11 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include aggReplStats.msgThroughputOut += replStats.msgThroughputOut; aggReplStats.replicationBacklog += replStats.replicationBacklog; aggReplStats.msgRateExpired += replStats.msgRateExpired; - aggReplStats.connectedCount += replStats.connected ? 1 : 0; + if (replStats.connected) { + aggReplStats.connectedCount += 1; + } else { + aggReplStats.disconnectedCount += 1; + } aggReplStats.replicationDelayInSeconds += replStats.replicationDelayInSeconds; }); @@ -497,6 +501,8 @@ private static void printNamespaceStats(PrometheusMetricStreams stream, Aggregat replStats -> replStats.replicationBacklog, cluster, namespace); writeReplicationStat(stream, "pulsar_replication_connected_count", stats, replStats -> replStats.connectedCount, cluster, namespace); + writeReplicationStat(stream, "pulsar_replication_disconnected_count", stats, + replStats -> replStats.disconnectedCount, cluster, namespace); writeReplicationStat(stream, "pulsar_replication_rate_expired", stats, replStats -> replStats.msgRateExpired, cluster, namespace); writeReplicationStat(stream, "pulsar_replication_delay_in_seconds", stats, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index 27288291d2969..e907760d9d939 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -389,6 +389,8 @@ public static void printTopicStats(PrometheusMetricStreams stream, TopicStats st cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel); writeMetric(stream, "pulsar_replication_connected_count", replStats.connectedCount, cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_replication_disconnected_count", replStats.disconnectedCount, + cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel); writeMetric(stream, "pulsar_replication_rate_expired", replStats.msgRateExpired, cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel); writeMetric(stream, "pulsar_replication_delay_in_seconds", replStats.replicationDelayInSeconds, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 4f0f28d0e9d9f..627a9d82f13ea 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -29,6 +29,7 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import io.netty.util.concurrent.FastThreadLocalThread; import java.lang.reflect.Field; @@ -64,6 +65,7 @@ import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -89,6 +91,8 @@ import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.junit.Assert; +import org.glassfish.jersey.client.JerseyClient; +import org.glassfish.jersey.client.JerseyClientBuilder; import org.mockito.Mockito; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -1128,4 +1132,121 @@ public void testDifferentTopicCreationRule(ReplicationMode replicationMode) thro admin1.namespaces().deleteNamespace(ns); admin2.namespaces().deleteNamespace(ns); } + + @Test + public void testReplicationCountMetrics() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + // 1.Create topic, does not enable replication now. + admin1.topics().createNonPartitionedTopic(topicName); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + + // We inject an error to make the internal producer fail to connect. + final AtomicInteger createProducerCounter = new AtomicInteger(); + final AtomicBoolean failedCreateProducer = new AtomicBoolean(true); + Runnable taskToClearInjection = injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> { + if (topicName.equals(producerCnf.getTopicName())) { + // There is a switch to determine create producer successfully or not. + if (failedCreateProducer.get()) { + log.info("Retry create replicator.producer count: {}", createProducerCounter); + // Release producer and fail callback. + originalProducer.closeAsync(); + throw new RuntimeException("mock error"); + } + return originalProducer; + } + return originalProducer; + }); + + // 2.Enable replication. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + + // Verify: metrics. + // Cluster level: + // - pulsar_replication_connected_count + // - pulsar_replication_disconnected_count + // Namespace level: + // - pulsar_replication_connected_count + // - pulsar_replication_disconnected_count + // Topic level: + // - pulsar_replication_connected_count + // - pulsar_replication_disconnected_count + JerseyClient httpClient = JerseyClientBuilder.createClient(); + Awaitility.await().untilAsserted(() -> { + int topicConnected = 0; + int topicDisconnected = 0; + + String response = httpClient.target(pulsar1.getWebServiceAddress()).path("/metrics/") + .request().get(String.class); + Multimap metricMap = PrometheusMetricsClient.parseMetrics(response); + if (!metricMap.containsKey("pulsar_replication_disconnected_count")) { + fail("Expected 1 disconnected replicator."); + } + for (PrometheusMetricsClient.Metric metric : metricMap.get("pulsar_replication_connected_count")) { + if (cluster1.equals(metric.tags.get("cluster")) + && nonReplicatedNamespace.equals(metric.tags.get("namespace")) + && topicName.equals(metric.tags.get("topic"))) { + topicConnected += Double.valueOf(metric.value).intValue(); + } + } + for (PrometheusMetricsClient.Metric metric : metricMap.get("pulsar_replication_disconnected_count")) { + if (cluster1.equals(metric.tags.get("cluster")) + && nonReplicatedNamespace.equals(metric.tags.get("namespace")) + && topicName.equals(metric.tags.get("topic"))) { + topicDisconnected += Double.valueOf(metric.value).intValue(); + } + } + log.info("{}, {},", topicConnected, topicDisconnected); + assertEquals(topicConnected, 0); + assertEquals(topicDisconnected, 1); + }); + + // Let replicator connect successfully. + failedCreateProducer.set(false); + // Verify: metrics. + // Cluster level: + // - pulsar_replication_connected_count + // - pulsar_replication_disconnected_count + // Namespace level: + // - pulsar_replication_connected_count + // - pulsar_replication_disconnected_count + // Topic level: + // - pulsar_replication_connected_count + // - pulsar_replication_disconnected_count + Awaitility.await().atMost(Duration.ofSeconds(130)).untilAsserted(() -> { + int topicConnected = 0; + int topicDisconnected = 0; + + String response = httpClient.target(pulsar1.getWebServiceAddress()).path("/metrics/") + .request().get(String.class); + Multimap metricMap = PrometheusMetricsClient.parseMetrics(response); + if (!metricMap.containsKey("pulsar_replication_disconnected_count")) { + fail("Expected 1 disconnected replicator."); + } + for (PrometheusMetricsClient.Metric metric : metricMap.get("pulsar_replication_connected_count")) { + if (cluster1.equals(metric.tags.get("cluster")) + && nonReplicatedNamespace.equals(metric.tags.get("namespace")) + && topicName.equals(metric.tags.get("topic"))) { + topicConnected += Double.valueOf(metric.value).intValue(); + } + } + for (PrometheusMetricsClient.Metric metric : metricMap.get("pulsar_replication_disconnected_count")) { + if (cluster1.equals(metric.tags.get("cluster")) + && nonReplicatedNamespace.equals(metric.tags.get("namespace")) + && topicName.equals(metric.tags.get("topic"))) { + topicDisconnected += Double.valueOf(metric.value).intValue(); + } + } + log.info("{}, {}", topicConnected, topicDisconnected); + assertEquals(topicConnected, 1); + assertEquals(topicDisconnected, 0); + }); + + // cleanup. + taskToClearInjection.run(); + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); + waitReplicatorStopped(topicName); + admin1.topics().delete(topicName, false); + admin2.topics().delete(topicName, false); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index 34810bbe9057b..d99969fbaa7e5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -167,4 +167,10 @@ public void testConfigReplicationStartAt() throws Exception { public void testDifferentTopicCreationRule(ReplicationMode replicationMode) throws Exception { super.testDifferentTopicCreationRule(replicationMode); } + + @Test(enabled = false) + @Override + public void testReplicationCountMetrics() throws Exception { + super.testReplicationCountMetrics(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java index 0e12d75f74fa0..11358eb1e2c1c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java @@ -101,6 +101,7 @@ public void testSimpleAggregation() { replStats2.msgThroughputOut = 1536.0; replStats2.replicationBacklog = 99; replStats2.connectedCount = 1; + replStats2.disconnectedCount = 2; replStats2.msgRateExpired = 3.0; replStats2.replicationDelayInSeconds = 20; topicStats2.replicationStats.put(namespace, replStats2); @@ -148,6 +149,7 @@ public void testSimpleAggregation() { assertEquals(nsReplStats.msgThroughputOut, 1792.0); assertEquals(nsReplStats.replicationBacklog, 100); assertEquals(nsReplStats.connectedCount, 1); + assertEquals(nsReplStats.disconnectedCount, 2); assertEquals(nsReplStats.msgRateExpired, 6.0); assertEquals(nsReplStats.replicationDelayInSeconds, 40);