Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][flaky-test]NamespaceServiceTest.flaky/testModularLoadManagerRemoveBundleAndLoad #17487

Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import java.util.function.Predicate;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -85,6 +87,7 @@
import org.apache.pulsar.policies.data.loadbalancer.BundleData;
import org.awaitility.Awaitility;
import org.mockito.stubbing.Answer;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -746,19 +749,18 @@ public void testModularLoadManagerRemoveInactiveBundleFromLoadData() throws Exce
public void testModularLoadManagerRemoveBundleAndLoad() throws Exception {
final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";
final String namespace = "prop/ns-abc";
final String bundleName = namespace + "/0x00000000_0xffffffff";
final String topic1 = "persistent://" + namespace + "/topic1";
final String topic2 = "persistent://" + namespace + "/topic2";

// configure broker with ModularLoadManager
conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
conf.setForceDeleteNamespaceAllowed(true);
// Make sure LoadReportUpdaterTask has a 100% chance to write ZK.
conf.setLoadBalancerReportUpdateMaxIntervalMinutes(-1);
restartBroker();

LoadManager loadManager = spy(pulsar.getLoadManager().get());
Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager");
loadManagerField.setAccessible(true);
doReturn(true).when(loadManager).isCentralized();
loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager));
LoadManager loadManager = pulsar.getLoadManager().get();
NamespaceName nsname = NamespaceName.get(namespace);

@Cleanup
Expand All @@ -778,10 +780,10 @@ public void testModularLoadManagerRemoveBundleAndLoad() throws Exception {

//create znode for bundle-data
pulsar.getBrokerService().updateRates();
loadManager.writeLoadReportOnZookeeper();
loadManager.writeResourceQuotasToZooKeeper();

String path = BUNDLE_DATA_PATH + "/" + nsname.toString() + "/0x00000000_0xffffffff";
waitResourceDataUpdateToZK(loadManager,
loadData -> loadData.getBundleData().containsKey(bundleName));
String path = BUNDLE_DATA_PATH + "/" + bundleName;

Optional<GetResult> getResult = pulsar.getLocalMetadataStore().get(path).get();
assertTrue(getResult.isPresent());
Expand All @@ -792,12 +794,46 @@ public void testModularLoadManagerRemoveBundleAndLoad() throws Exception {
TimeUnit.SECONDS.sleep(5);

// update broker bundle report to zk
loadManager.writeLoadReportOnZookeeper();
loadManager.writeResourceQuotasToZooKeeper();
waitResourceDataUpdateToZK(loadManager,
loadData -> !loadData.getBundleData().containsKey(bundleName));

getResult = pulsar.getLocalMetadataStore().get(path).get();
assertFalse(getResult.isPresent());
}

private void waitResourceDataUpdateToZK(
LoadManager loadManager, Predicate<LoadData> utilChecker) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the param utilChecker is useless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was left over from the previous solution and I forgot to delete it. Already deleted, Thanks

CompletableFuture<Void> waitForBrokerChangeNotice = registryBrokerDataChangeNotice();
// Manually trigger "LoadReportUpdaterTask"
loadManager.writeLoadReportOnZookeeper();
waitForBrokerChangeNotice.join();
// Wait until "ModularLoadManager" completes processing the ZK notification.
ModularLoadManagerWrapper modularLoadManagerWrapper = (ModularLoadManagerWrapper) loadManager;
ModularLoadManagerImpl modularLoadManager = (ModularLoadManagerImpl) modularLoadManagerWrapper.getLoadManager();
ScheduledExecutorService scheduler = Whitebox.getInternalState(modularLoadManager, "scheduler");
CompletableFuture<Void> waitForNoticeHandleFinishByLoadManager = new CompletableFuture<>();
scheduler.execute(() -> {
waitForNoticeHandleFinishByLoadManager.complete(null);
});
waitForNoticeHandleFinishByLoadManager.join();
// Manually trigger "LoadResourceQuotaUpdaterTask"
loadManager.writeResourceQuotasToZooKeeper();
}

public CompletableFuture<Void> registryBrokerDataChangeNotice() {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":"
+ (conf.getWebServicePort().isPresent() ? conf.getWebServicePort().get()
: conf.getWebServicePortTls().get());
String brokerDataPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
pulsar.getLocalMetadataStore().registerListener(notice -> {
if (brokerDataPath.equals(notice.getPath())){
if (!completableFuture.isDone()) {
completableFuture.complete(null);
}
}
});
return completableFuture;
}

@SuppressWarnings("unchecked")
Expand Down