diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java index 677c04ad9a6f4..34826f8dab78a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java @@ -323,8 +323,7 @@ public ResourceGroup getNamespaceResourceGroup(NamespaceName namespaceName) { protected boolean incrementUsage(String tenantName, String nsName, ResourceGroupMonitoringClass monClass, BytesAndMessagesCount incStats) throws PulsarAdminException { - final String tenantAndNsString = tenantName + "/" + nsName; - final ResourceGroup nsRG = this.namespaceToRGsMap.get(tenantAndNsString); + final ResourceGroup nsRG = this.namespaceToRGsMap.get(NamespaceName.get(tenantName, nsName)); final ResourceGroup tenantRG = this.tenantToRGsMap.get(tenantName); if (tenantRG == null && nsRG == null) { return false; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java index 1ea83a00affe9..d3874fa2e1c0d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java @@ -517,11 +517,18 @@ private void testProduceConsumeUsageOnRG(String[] topicStrings) throws Exception Assert.assertEquals(recvdNumMsgs, TotalExpectedMessagesToReceive); Assert.assertEquals(numConsumerExceptions, 0); + boolean tenantRGEqualsNsRG = tenantRGEqualsNamespaceRG(topicStrings); + // If the tenant and NS are on different RGs, the bytes/messages get counted once on the + // tenant RG, and again on the namespace RG. This double-counting is avoided if tenant-RG == NS-RG. + // This is a known (and discussed) artifact in the implementation. + // 'ScaleFactor' is a way to incorporate that effect in the verification. + final int scaleFactor = tenantRGEqualsNsRG ? 1 : 2; + // Verify producer and consumer side stats. - this.verifyRGProdConsStats(topicStrings, sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, true, true); + this.verifyRGProdConsStats(topicStrings, sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, scaleFactor, true, true); // Verify the metrics corresponding to the operations in this test. - this.verifyRGMetrics(topicStrings, sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, true, true); + this.verifyRGMetrics(sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, scaleFactor, true, true); unRegisterTenantsAndNamespaces(topicStrings); // destroyTopics can be called after createTopics() is added back @@ -535,9 +542,9 @@ private void testProduceConsumeUsageOnRG(String[] topicStrings) throws Exception private void verifyRGProdConsStats(String[] topicStrings, int sentNumBytes, int sentNumMsgs, int recvdNumBytes, int recvdNumMsgs, - boolean checkProduce, boolean checkConsume) throws Exception { + int scaleFactor, boolean checkProduce, + boolean checkConsume) throws Exception { - boolean tenantRGEqualsNsRG = tenantRGEqualsNamespaceRG(topicStrings); BrokerService bs = pulsar.getBrokerService(); Map topicStatsMap = bs.getTopicStats(); @@ -557,12 +564,6 @@ private void verifyRGProdConsStats(String[] topicStrings, BytesAndMessagesCount totalNsRGConsCounts = new BytesAndMessagesCount(); BytesAndMessagesCount prodCounts, consCounts; - // If the tenant and NS are on different RGs, the bytes/messages get counted once on the - // tenant RG, and again on the namespace RG. This double-counting is avoided if tenant-RG == NS-RG. - // This is a known (and discussed) artifact in the implementation. - // 'ScaleFactor' is a way to incorporate that effect in the verification. - final int scaleFactor = tenantRGEqualsNsRG ? 1 : 2; - // Since the following walk is on topics, keep track of the RGs for which we have already gathered stats, // so that we do not double-accumulate stats if multiple topics refer to the same RG. HashSet RGsWithPublishStatsGathered = new HashSet<>(); @@ -643,24 +644,22 @@ private void verifyRGProdConsStats(String[] topicStrings, if (checkProduce) { prodCounts = ResourceGroup.accumulateBMCount(totalTenantRGProdCounts, totalNsRGProdCounts); - Assert.assertEquals(prodCounts.messages, sentNumMsgs); + Assert.assertEquals(prodCounts.messages, sentNumMsgs * scaleFactor); Assert.assertTrue(prodCounts.bytes >= ExpectedNumBytesSent); } if (checkConsume) { consCounts = ResourceGroup.accumulateBMCount(totalTenantRGConsCounts, totalNsRGConsCounts); - Assert.assertEquals(consCounts.messages, recvdNumMsgs); + Assert.assertEquals(consCounts.messages, recvdNumMsgs * scaleFactor); Assert.assertTrue(consCounts.bytes >= ExpectedNumBytesReceived); } } // Check the metrics for the RGs involved - private void verifyRGMetrics(String[] topicStrings, - int sentNumBytes, int sentNumMsgs, + private void verifyRGMetrics(int sentNumBytes, int sentNumMsgs, int recvdNumBytes, int recvdNumMsgs, - boolean checkProduce, boolean checkConsume) throws Exception { - - tenantRGEqualsNamespaceRG(topicStrings); + int scaleFactor, boolean checkProduce, + boolean checkConsume) throws Exception { final int ExpectedNumBytesSent = sentNumBytes + PER_MESSAGE_METADATA_OHEAD * sentNumMsgs; final int ExpectedNumBytesReceived = recvdNumBytes + PER_MESSAGE_METADATA_OHEAD * recvdNumMsgs; long totalTenantRegisters = 0; @@ -729,12 +728,12 @@ private void verifyRGMetrics(String[] topicStrings, // So, we take the residuals into account when comparing against the expected. if (checkProduce && mc == ResourceGroupMonitoringClass.Publish) { Assert.assertEquals(totalUsedMessages[mcIdx] - residualSentNumMessages, - sentNumMsgs); + sentNumMsgs * scaleFactor); Assert.assertTrue(totalUsedBytes[mcIdx] - residualSentNumBytes >= ExpectedNumBytesSent); } else if (checkConsume && mc == ResourceGroupMonitoringClass.Dispatch) { Assert.assertEquals(totalUsedMessages[mcIdx] - residualRecvdNumMessages, - recvdNumMsgs); + recvdNumMsgs * scaleFactor); Assert.assertTrue(totalUsedBytes[mcIdx] - residualRecvdNumBytes >= ExpectedNumBytesReceived); } @@ -745,9 +744,9 @@ private void verifyRGMetrics(String[] topicStrings, // Update the residuals for next round of tests. residualSentNumBytes += sentNumBytes; - residualSentNumMessages += sentNumMsgs; + residualSentNumMessages += sentNumMsgs * scaleFactor; residualRecvdNumBytes += recvdNumBytes; - residualRecvdNumMessages += recvdNumMsgs; + residualRecvdNumMessages += recvdNumMsgs * scaleFactor; Assert.assertEquals(totalUpdates, 0); // currently, we don't update the RGs in this UT diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterTest.java index a7b42d0bb9055..395f0869c287b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterTest.java @@ -45,7 +45,6 @@ public class ResourceGroupRateLimiterTest extends BrokerTestBase { new org.apache.pulsar.common.policies.data.ResourceGroup(); final String namespaceName = "prop/ns-abc"; final String persistentTopicString = "persistent://prop/ns-abc/test-topic"; - final String nonPersistentTopicString = "non-persistent://prop/ns-abc/test-topic"; final int MESSAGE_SIZE = 10; @BeforeClass diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java index a0350b0f86e88..4b84760573532 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java @@ -131,10 +131,9 @@ public void measureOpsTime() throws PulsarAdminException { rgs.unRegisterNameSpace(rgName, tenantAndNamespaceName); // The overhead of a RG lookup - ResourceGroup retRG; mSecsStart = System.currentTimeMillis(); for (int ix = 0; ix < numPerfTestIterations; ix++) { - retRG = rgs.resourceGroupGet(rg.resourceGroupName); + rgs.resourceGroupGet(rg.resourceGroupName); } mSecsEnd = System.currentTimeMillis(); diffMsecs = mSecsEnd - mSecsStart; @@ -221,7 +220,6 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep // maxUsageReportSuppressRounds iterations. So, if we run for maxUsageReportSuppressRounds iterations, // we should see needToReportLocalUsage() return true at least once. Set myBoolSet = new HashSet<>(); - myBoolSet.clear(); for (int idx = 0; idx < ResourceGroupService.MaxUsageReportSuppressRounds; idx++) { needToReport = retRG.setUsageInMonitoredEntity(monClass, nwUsage); myBoolSet.add(needToReport);