diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java index f8ec52bfe3c5a..541a645f18bf3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java @@ -458,14 +458,13 @@ protected boolean setUsageInMonitoredEntity(ResourceGroupMonitoringClass monClas bytesUsed = monEntity.usedLocallySinceLastReport.bytes; messagesUsed = monEntity.usedLocallySinceLastReport.messages; - + monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0; if (sendReport) { p.setBytesPerPeriod(bytesUsed); p.setMessagesPerPeriod(messagesUsed); monEntity.lastReportedValues.bytes = bytesUsed; monEntity.lastReportedValues.messages = messagesUsed; monEntity.numSuppressedUsageReports = 0; - monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0; monEntity.totalUsedLocally.bytes += bytesUsed; monEntity.totalUsedLocally.messages += messagesUsed; monEntity.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java index f47f7686743b6..1468a2d5db85d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.resourcegroup; +import java.util.UUID; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.PerMonitoringClassFields; @@ -257,6 +258,67 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep Assert.assertEquals(rgs.getNumResourceGroups(), 0); } + @Test + public void testResourceGroupResetUsedLocallySinceLastReport() throws PulsarAdminException { + org.apache.pulsar.common.policies.data.ResourceGroup rgConfig = + new org.apache.pulsar.common.policies.data.ResourceGroup(); + final String rgName = UUID.randomUUID().toString(); + rgConfig.setPublishRateInBytes(15000L); + rgConfig.setPublishRateInMsgs(100); + rgConfig.setDispatchRateInBytes(40000L); + rgConfig.setDispatchRateInMsgs(500); + + this.pulsar.getResourceGroupServiceManager().resourceGroupCreate(rgName, rgConfig); + + ResourceGroup retRG = this.pulsar.getResourceGroupServiceManager().resourceGroupGet(rgName); + + PerMonitoringClassFields monClassFields = null; + // Case1: Suppress report ResourceUsage. + for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) { + monClassFields = retRG.monitoringClassFields[value.ordinal()]; + monClassFields.usedLocallySinceLastReport.bytes = monClassFields.lastReportedValues.bytes = 10; + monClassFields.usedLocallySinceLastReport.messages = monClassFields.lastReportedValues.messages = 10; + monClassFields.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis(); + } + + ResourceUsage resourceUsage = new ResourceUsage(); + retRG.rgFillResourceUsage(resourceUsage); + Assert.assertFalse(resourceUsage.hasDispatch()); + Assert.assertFalse(resourceUsage.hasPublish()); + + for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) { + monClassFields = retRG.monitoringClassFields[value.ordinal()]; + Assert.assertEquals(monClassFields.usedLocallySinceLastReport.messages, 0L); + Assert.assertEquals(monClassFields.usedLocallySinceLastReport.bytes, 0L); + } + + // Case2: Report ResourceUsage. + for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) { + monClassFields = retRG.monitoringClassFields[value.ordinal()]; + monClassFields.usedLocallySinceLastReport.bytes = monClassFields.lastReportedValues.bytes * 2; + monClassFields.usedLocallySinceLastReport.messages = monClassFields.lastReportedValues.messages * 2; + } + + resourceUsage = new ResourceUsage(); + retRG.rgFillResourceUsage(resourceUsage); + Assert.assertTrue(resourceUsage.hasDispatch()); + NetworkUsage dispatch = resourceUsage.getDispatch(); + Assert.assertNotNull(monClassFields); + Assert.assertEquals(dispatch.getBytesPerPeriod(), monClassFields.lastReportedValues.bytes); + Assert.assertEquals(dispatch.getMessagesPerPeriod(), monClassFields.lastReportedValues.messages); + + Assert.assertTrue(resourceUsage.hasPublish()); + NetworkUsage publish = resourceUsage.getPublish(); + Assert.assertEquals(publish.getBytesPerPeriod(), monClassFields.lastReportedValues.bytes); + Assert.assertEquals(publish.getMessagesPerPeriod(), monClassFields.lastReportedValues.messages); + + for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) { + monClassFields = retRG.monitoringClassFields[value.ordinal()]; + Assert.assertEquals(monClassFields.usedLocallySinceLastReport.messages, 0L); + Assert.assertEquals(monClassFields.usedLocallySinceLastReport.bytes, 0L); + } + } + @Test public void testClose() throws Exception { ResourceGroupService service = new ResourceGroupService(pulsar, TimeUnit.MILLISECONDS, null, null);