Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix collection get bug in ResourceGroupService #12499

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -323,8 +323,7 @@ public ResourceGroup getNamespaceResourceGroup(NamespaceName namespaceName) {
protected boolean incrementUsage(String tenantName, String nsName,
ResourceGroupMonitoringClass monClass,
BytesAndMessagesCount incStats) throws PulsarAdminException {
final String tenantAndNsString = tenantName + "/" + nsName;
final ResourceGroup nsRG = this.namespaceToRGsMap.get(tenantAndNsString);
final ResourceGroup nsRG = this.namespaceToRGsMap.get(NamespaceName.get(tenantName, nsName));
final ResourceGroup tenantRG = this.tenantToRGsMap.get(tenantName);
if (tenantRG == null && nsRG == null) {
return false;
Original file line number Diff line number Diff line change
@@ -517,11 +517,18 @@ private void testProduceConsumeUsageOnRG(String[] topicStrings) throws Exception
Assert.assertEquals(recvdNumMsgs, TotalExpectedMessagesToReceive);
Assert.assertEquals(numConsumerExceptions, 0);

boolean tenantRGEqualsNsRG = tenantRGEqualsNamespaceRG(topicStrings);
// If the tenant and NS are on different RGs, the bytes/messages get counted once on the
// tenant RG, and again on the namespace RG. This double-counting is avoided if tenant-RG == NS-RG.
// This is a known (and discussed) artifact in the implementation.
// 'ScaleFactor' is a way to incorporate that effect in the verification.
final int scaleFactor = tenantRGEqualsNsRG ? 1 : 2;

// Verify producer and consumer side stats.
this.verifyRGProdConsStats(topicStrings, sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, true, true);
this.verifyRGProdConsStats(topicStrings, sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, scaleFactor, true, true);

// Verify the metrics corresponding to the operations in this test.
this.verifyRGMetrics(topicStrings, sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, true, true);
this.verifyRGMetrics(sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, scaleFactor, true, true);

unRegisterTenantsAndNamespaces(topicStrings);
// destroyTopics can be called after createTopics() is added back
@@ -535,9 +542,9 @@ private void testProduceConsumeUsageOnRG(String[] topicStrings) throws Exception
private void verifyRGProdConsStats(String[] topicStrings,
int sentNumBytes, int sentNumMsgs,
int recvdNumBytes, int recvdNumMsgs,
boolean checkProduce, boolean checkConsume) throws Exception {
int scaleFactor, boolean checkProduce,
boolean checkConsume) throws Exception {

boolean tenantRGEqualsNsRG = tenantRGEqualsNamespaceRG(topicStrings);
BrokerService bs = pulsar.getBrokerService();
Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();

@@ -557,12 +564,6 @@ private void verifyRGProdConsStats(String[] topicStrings,
BytesAndMessagesCount totalNsRGConsCounts = new BytesAndMessagesCount();
BytesAndMessagesCount prodCounts, consCounts;

// If the tenant and NS are on different RGs, the bytes/messages get counted once on the
// tenant RG, and again on the namespace RG. This double-counting is avoided if tenant-RG == NS-RG.
// This is a known (and discussed) artifact in the implementation.
// 'ScaleFactor' is a way to incorporate that effect in the verification.
final int scaleFactor = tenantRGEqualsNsRG ? 1 : 2;

// Since the following walk is on topics, keep track of the RGs for which we have already gathered stats,
// so that we do not double-accumulate stats if multiple topics refer to the same RG.
HashSet<String> RGsWithPublishStatsGathered = new HashSet<>();
@@ -643,24 +644,22 @@ private void verifyRGProdConsStats(String[] topicStrings,

if (checkProduce) {
prodCounts = ResourceGroup.accumulateBMCount(totalTenantRGProdCounts, totalNsRGProdCounts);
Assert.assertEquals(prodCounts.messages, sentNumMsgs);
Assert.assertEquals(prodCounts.messages, sentNumMsgs * scaleFactor);
Assert.assertTrue(prodCounts.bytes >= ExpectedNumBytesSent);
}

if (checkConsume) {
consCounts = ResourceGroup.accumulateBMCount(totalTenantRGConsCounts, totalNsRGConsCounts);
Assert.assertEquals(consCounts.messages, recvdNumMsgs);
Assert.assertEquals(consCounts.messages, recvdNumMsgs * scaleFactor);
Assert.assertTrue(consCounts.bytes >= ExpectedNumBytesReceived);
}
}

// Check the metrics for the RGs involved
private void verifyRGMetrics(String[] topicStrings,
int sentNumBytes, int sentNumMsgs,
private void verifyRGMetrics(int sentNumBytes, int sentNumMsgs,
int recvdNumBytes, int recvdNumMsgs,
boolean checkProduce, boolean checkConsume) throws Exception {

tenantRGEqualsNamespaceRG(topicStrings);
int scaleFactor, boolean checkProduce,
boolean checkConsume) throws Exception {
final int ExpectedNumBytesSent = sentNumBytes + PER_MESSAGE_METADATA_OHEAD * sentNumMsgs;
final int ExpectedNumBytesReceived = recvdNumBytes + PER_MESSAGE_METADATA_OHEAD * recvdNumMsgs;
long totalTenantRegisters = 0;
@@ -729,12 +728,12 @@ private void verifyRGMetrics(String[] topicStrings,
// So, we take the residuals into account when comparing against the expected.
if (checkProduce && mc == ResourceGroupMonitoringClass.Publish) {
Assert.assertEquals(totalUsedMessages[mcIdx] - residualSentNumMessages,
sentNumMsgs);
sentNumMsgs * scaleFactor);
Assert.assertTrue(totalUsedBytes[mcIdx] - residualSentNumBytes
>= ExpectedNumBytesSent);
} else if (checkConsume && mc == ResourceGroupMonitoringClass.Dispatch) {
Assert.assertEquals(totalUsedMessages[mcIdx] - residualRecvdNumMessages,
recvdNumMsgs);
recvdNumMsgs * scaleFactor);
Assert.assertTrue(totalUsedBytes[mcIdx] - residualRecvdNumBytes
>= ExpectedNumBytesReceived);
}
@@ -745,9 +744,9 @@ private void verifyRGMetrics(String[] topicStrings,

// Update the residuals for next round of tests.
residualSentNumBytes += sentNumBytes;
residualSentNumMessages += sentNumMsgs;
residualSentNumMessages += sentNumMsgs * scaleFactor;
residualRecvdNumBytes += recvdNumBytes;
residualRecvdNumMessages += recvdNumMsgs;
residualRecvdNumMessages += recvdNumMsgs * scaleFactor;

Assert.assertEquals(totalUpdates, 0); // currently, we don't update the RGs in this UT

Original file line number Diff line number Diff line change
@@ -45,7 +45,6 @@ public class ResourceGroupRateLimiterTest extends BrokerTestBase {
new org.apache.pulsar.common.policies.data.ResourceGroup();
final String namespaceName = "prop/ns-abc";
final String persistentTopicString = "persistent://prop/ns-abc/test-topic";
final String nonPersistentTopicString = "non-persistent://prop/ns-abc/test-topic";
final int MESSAGE_SIZE = 10;

@BeforeClass
Original file line number Diff line number Diff line change
@@ -131,10 +131,9 @@ public void measureOpsTime() throws PulsarAdminException {
rgs.unRegisterNameSpace(rgName, tenantAndNamespaceName);

// The overhead of a RG lookup
ResourceGroup retRG;
mSecsStart = System.currentTimeMillis();
for (int ix = 0; ix < numPerfTestIterations; ix++) {
retRG = rgs.resourceGroupGet(rg.resourceGroupName);
rgs.resourceGroupGet(rg.resourceGroupName);
}
mSecsEnd = System.currentTimeMillis();
diffMsecs = mSecsEnd - mSecsStart;
@@ -221,7 +220,6 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep
// maxUsageReportSuppressRounds iterations. So, if we run for maxUsageReportSuppressRounds iterations,
// we should see needToReportLocalUsage() return true at least once.
Set<Boolean> myBoolSet = new HashSet<>();
myBoolSet.clear();
for (int idx = 0; idx < ResourceGroupService.MaxUsageReportSuppressRounds; idx++) {
needToReport = retRG.setUsageInMonitoredEntity(monClass, nwUsage);
myBoolSet.add(needToReport);