Skip to content

Commit

Permalink
[improve][broker] improved the leadership change idempotency (Extensi…
Browse files Browse the repository at this point in the history
…bleLoadManagerImpl only)
  • Loading branch information
heesung-sn committed Aug 8, 2024
1 parent 3e7dbb4 commit 3cf89c4
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS

private static final String ELECTION_ROOT = "/loadbalance/extension/leader";

private static final Set<String> INTERNAL_TOPICS =
public static final Set<String> INTERNAL_TOPICS =
Set.of(BROKER_LOAD_DATA_STORE_TOPIC, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TOPIC);

@VisibleForTesting
Expand All @@ -146,7 +146,10 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS
@Getter
private IsolationPoliciesHelper isolationPoliciesHelper;

@Getter
private LoadDataStore<BrokerLoadData> brokerLoadDataStore;

@Getter
private LoadDataStore<TopBundlesLoadData> topBundlesLoadDataStore;

private LoadManagerScheduler unloadScheduler;
Expand Down Expand Up @@ -259,6 +262,7 @@ public enum Role {
Follower
}

@Getter
private volatile Role role;

/**
Expand Down Expand Up @@ -840,6 +844,10 @@ public static boolean isInternalTopic(String topic) {

@VisibleForTesting
synchronized void playLeader() {
if (role == Leader) {
log.info("This broker:{} is already the leader.", pulsar.getBrokerId());
return;
}
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getBrokerId(), role, Leader);
int retry = 0;
Expand Down Expand Up @@ -903,6 +911,7 @@ synchronized void playFollower() {
}
unloadScheduler.close();
serviceUnitStateChannel.cancelOwnershipMonitor();
closeInternalTopics();
brokerLoadDataStore.init();
topBundlesLoadDataStore.close();
topBundlesLoadDataStore.startProducer();
Expand Down Expand Up @@ -1006,12 +1015,13 @@ public void disableBroker() throws Exception {
private void closeInternalTopics() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (String name : INTERNAL_TOPICS) {
futures.add(pulsar.getBrokerService().getTopicIfExists(name)
.thenAccept(topicOptional -> topicOptional.ifPresent(topic -> topic.close(true)))
.exceptionally(__ -> {
log.warn("Failed to close internal topic:{}", name);
return null;
}));
pulsar.getBrokerService()
.getTopicReference(name)
.ifPresent(topic -> futures.add(topic.close(true)
.exceptionally(__ -> {
log.warn("Failed to close internal topic:{}", name);
return null;
})));
}
try {
FutureUtil.waitForAll(futures)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
Expand All @@ -122,7 +121,6 @@
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.TableViewImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
Expand All @@ -134,6 +132,7 @@
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -1186,55 +1185,49 @@ private void assertLookupSLANamespaceOwner(PulsarService pulsar,
assertEquals(result, expectedBrokerServiceUrl);
}

@Test(priority = 10)
public void testTopBundlesLoadDataStoreTableViewFromChannelOwner() throws Exception {
var topBundlesLoadDataStorePrimary =
(LoadDataStore) FieldUtils.readDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", true);
var serviceUnitStateChannelPrimary =
(ServiceUnitStateChannelImpl) FieldUtils.readDeclaredField(primaryLoadManager,
"serviceUnitStateChannel", true);
var tvPrimary =
(TableViewImpl) FieldUtils.readDeclaredField(topBundlesLoadDataStorePrimary, "tableView", true);

var topBundlesLoadDataStoreSecondary =
(LoadDataStore) FieldUtils.readDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", true);
var tvSecondary =
(TableViewImpl) FieldUtils.readDeclaredField(topBundlesLoadDataStoreSecondary, "tableView", true);

if (serviceUnitStateChannelPrimary.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
assertNotNull(tvPrimary);
assertNull(tvSecondary);
} else {
assertNull(tvPrimary);
assertNotNull(tvSecondary);

private void makePrimaryAsLeader() throws Exception {
log.info("makePrimaryAsLeader");
if (channel2.isChannelOwner()) {
pulsar2.getLeaderElectionService().close();
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
assertTrue(channel1.isChannelOwner());
});
pulsar2.getLeaderElectionService().start();
}

restartBroker();
pulsar1 = pulsar;
setPrimaryLoadManager();
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
Sets.newHashSet(this.conf.getClusterName()));

var serviceUnitStateChannelPrimaryNew =
(ServiceUnitStateChannelImpl) FieldUtils.readDeclaredField(primaryLoadManager,
"serviceUnitStateChannel", true);
var topBundlesLoadDataStorePrimaryNew =
(LoadDataStore) FieldUtils.readDeclaredField(primaryLoadManager, "topBundlesLoadDataStore"
, true);
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
assertFalse(serviceUnitStateChannelPrimaryNew.isChannelOwnerAsync().get(5, TimeUnit.SECONDS));
assertNotNull(FieldUtils.readDeclaredField(topBundlesLoadDataStoreSecondary, "tableView"
, true));
assertNull(FieldUtils.readDeclaredField(topBundlesLoadDataStorePrimaryNew, "tableView"
, true));
}
);
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
assertTrue(channel1.isChannelOwner());
});
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
assertFalse(channel2.isChannelOwner());
});
}

@Test
public void testRoleChange() throws Exception {
var topBundlesLoadDataStorePrimary = (LoadDataStore<TopBundlesLoadData>)
FieldUtils.readDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", true);
private void makeSecondaryAsLeader() throws Exception {
log.info("makeSecondaryAsLeader");
if (channel1.isChannelOwner()) {
pulsar1.getLeaderElectionService().close();
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
assertTrue(channel2.isChannelOwner());
});
pulsar1.getLeaderElectionService().start();
}

Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
assertTrue(channel2.isChannelOwner());
});
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
assertFalse(channel1.isChannelOwner());
});
}

@Test(timeOut = 30 * 1000)
public void testRoleChangeIdempotency() throws Exception {

makePrimaryAsLeader();

var topBundlesLoadDataStorePrimary = primaryLoadManager.getTopBundlesLoadDataStore();
var topBundlesLoadDataStorePrimarySpy = spy(topBundlesLoadDataStorePrimary);
AtomicInteger countPri = new AtomicInteger(3);
AtomicInteger countPri2 = new AtomicInteger(3);
Expand All @@ -1255,8 +1248,7 @@ public void testRoleChange() throws Exception {
return null;
}).when(topBundlesLoadDataStorePrimarySpy).closeTableView();

var topBundlesLoadDataStoreSecondary = (LoadDataStore<TopBundlesLoadData>)
FieldUtils.readDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", true);
var topBundlesLoadDataStoreSecondary = secondaryLoadManager.getTopBundlesLoadDataStore();
var topBundlesLoadDataStoreSecondarySpy = spy(topBundlesLoadDataStoreSecondary);
AtomicInteger countSec = new AtomicInteger(3);
AtomicInteger countSec2 = new AtomicInteger(3);
Expand Down Expand Up @@ -1284,58 +1276,129 @@ public void testRoleChange() throws Exception {
topBundlesLoadDataStoreSecondarySpy, true);


if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
primaryLoadManager.playLeader();
secondaryLoadManager.playFollower();
verify(topBundlesLoadDataStorePrimarySpy, times(3)).startTableView();
verify(topBundlesLoadDataStorePrimarySpy, times(5)).closeTableView();
verify(topBundlesLoadDataStoreSecondarySpy, times(0)).startTableView();
verify(topBundlesLoadDataStoreSecondarySpy, times(3)).closeTableView();
} else {
primaryLoadManager.playFollower();
secondaryLoadManager.playLeader();
verify(topBundlesLoadDataStoreSecondarySpy, times(3)).startTableView();
verify(topBundlesLoadDataStoreSecondarySpy, times(5)).closeTableView();
verify(topBundlesLoadDataStorePrimarySpy, times(0)).startTableView();
verify(topBundlesLoadDataStorePrimarySpy, times(3)).closeTableView();
}

primaryLoadManager.playLeader();
secondaryLoadManager.playFollower();
verify(topBundlesLoadDataStorePrimarySpy, times(0)).startTableView();
verify(topBundlesLoadDataStorePrimarySpy, times(0)).closeTableView();
verify(topBundlesLoadDataStoreSecondarySpy, times(0)).startTableView();
verify(topBundlesLoadDataStoreSecondarySpy, times(3)).closeTableView();


primaryLoadManager.playFollower();
secondaryLoadManager.playFollower();
assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
primaryLoadManager.getRole());
assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
secondaryLoadManager.getRole());

if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
FieldUtils.readDeclaredField(primaryLoadManager, "role", true));
assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
FieldUtils.readDeclaredField(secondaryLoadManager, "role", true));
} else {
assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
FieldUtils.readDeclaredField(primaryLoadManager, "role", true));
assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
FieldUtils.readDeclaredField(secondaryLoadManager, "role", true));
}

primaryLoadManager.playLeader();
secondaryLoadManager.playLeader();
assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
primaryLoadManager.getRole());
assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
secondaryLoadManager.getRole());

if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
FieldUtils.readDeclaredField(primaryLoadManager, "role", true));
assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
FieldUtils.readDeclaredField(secondaryLoadManager, "role", true));
} else {
assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
FieldUtils.readDeclaredField(primaryLoadManager, "role", true));
assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
FieldUtils.readDeclaredField(secondaryLoadManager, "role", true));
}
} finally {
FieldUtils.writeDeclaredField(primaryLoadManager, "topBundlesLoadDataStore",
topBundlesLoadDataStorePrimary, true);
FieldUtils.writeDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore",
topBundlesLoadDataStoreSecondary, true);
}
}
@Test(timeOut = 30 * 1000)
public void testRoleChange() throws Exception {
makePrimaryAsLeader();

var leader = primaryLoadManager;
var follower = secondaryLoadManager;

BrokerLoadData brokerLoadExpected = new BrokerLoadData();
SystemResourceUsage usage = new SystemResourceUsage();
var cpu = new ResourceUsage(1.0, 100.0);
String key = "b1";
usage.setCpu(cpu);
brokerLoadExpected.update(usage, 0, 0, 0, 0, 0, 0, conf);
String bundle = "public/default/0x00000000_0xffffffff";
TopBundlesLoadData topBundlesExpected = new TopBundlesLoadData();
topBundlesExpected.getTopBundlesLoadData().clear();
topBundlesExpected.getTopBundlesLoadData().add(new TopBundlesLoadData.BundleLoadData(bundle, new NamespaceBundleStats()));

follower.getBrokerLoadDataStore().pushAsync(key, brokerLoadExpected);
follower.getTopBundlesLoadDataStore().pushAsync(bundle, topBundlesExpected);

Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {

assertNotNull(FieldUtils.readDeclaredField(leader.getTopBundlesLoadDataStore(), "tableView", true));
assertNull(FieldUtils.readDeclaredField(follower.getTopBundlesLoadDataStore(), "tableView", true));

for (String internalTopic : ExtensibleLoadManagerImpl.INTERNAL_TOPICS) {
assertTrue(leader.pulsar.getBrokerService().getTopicReference(internalTopic)
.isPresent());
assertTrue(follower.pulsar.getBrokerService().getTopicReference(internalTopic)
.isEmpty());

assertTrue(leader.pulsar.getNamespaceService()
.isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get());
assertFalse(follower.pulsar.getNamespaceService()
.isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get());
}

var actualBrokerLoadLeader = leader.getBrokerLoadDataStore().get(key);
if (actualBrokerLoadLeader.isPresent()) {
assertEquals(actualBrokerLoadLeader.get(), brokerLoadExpected);
}

var actualTopBundlesLeader = leader.getTopBundlesLoadDataStore().get(bundle);
if (actualTopBundlesLeader.isPresent()) {
assertEquals(actualTopBundlesLeader.get(), topBundlesExpected);
}

var actualBrokerLoadFollower = follower.getBrokerLoadDataStore().get(key);
if (actualBrokerLoadFollower.isPresent()) {
assertEquals(actualBrokerLoadFollower.get(), brokerLoadExpected);
}
});

makeSecondaryAsLeader();

var leader2 = secondaryLoadManager;
var follower2 = primaryLoadManager;

brokerLoadExpected.update(usage, 1, 0, 0, 0, 0, 0, conf);
topBundlesExpected.getTopBundlesLoadData().get(0).stats().msgRateIn = 1;

follower.getBrokerLoadDataStore().pushAsync(key, brokerLoadExpected);
follower.getTopBundlesLoadDataStore().pushAsync(bundle, topBundlesExpected);

Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
assertNotNull(FieldUtils.readDeclaredField(leader2.getTopBundlesLoadDataStore(), "tableView", true));
assertNull(FieldUtils.readDeclaredField(follower2.getTopBundlesLoadDataStore(), "tableView", true));

for (String internalTopic : ExtensibleLoadManagerImpl.INTERNAL_TOPICS) {
assertTrue(leader2.pulsar.getBrokerService().getTopicReference(internalTopic)
.isPresent());
assertTrue(follower2.pulsar.getBrokerService().getTopicReference(internalTopic)
.isEmpty());

assertTrue(leader2.pulsar.getNamespaceService()
.isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get());
assertFalse(follower2.pulsar.getNamespaceService()
.isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get());
}


var actualBrokerLoadLeader = leader2.getBrokerLoadDataStore().get(key);
assertEquals(actualBrokerLoadLeader.get(), brokerLoadExpected);

var actualTopBundlesLeader = leader2.getTopBundlesLoadDataStore().get(bundle);
assertEquals(actualTopBundlesLeader.get(), topBundlesExpected);

var actualBrokerLoadFollower = follower2.getBrokerLoadDataStore().get(key);
assertEquals(actualBrokerLoadFollower.get(), brokerLoadExpected);
});
}

@Test
public void testGetMetrics() throws Exception {
Expand Down

0 comments on commit 3cf89c4

Please sign in to comment.