Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Update TransferShedder underloaded broker check to consider max loaded broker's msgThroughputEMA and update IsExtensibleLoadBalancerImpl check (#22321) #22416

Merged
merged 1 commit into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ public void closeMetadataServiceSession() throws Exception {
}

private void closeLeaderElectionService() throws Exception {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this)) {
ExtensibleLoadManagerImpl.get(loadManager.get()).getLeaderElectionService().close();
} else {
if (this.leaderElectionService != null) {
Expand Down Expand Up @@ -1124,7 +1124,7 @@ protected void closeLocalMetadataStore() throws Exception {
}

protected void startLeaderElectionService() {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this)) {
LOG.info("The load manager extension is enabled. Skipping PulsarService LeaderElectionService.");
return;
}
Expand Down Expand Up @@ -1239,7 +1239,7 @@ protected void startLoadManagementService() throws PulsarServerException {
LOG.info("Starting load management service ...");
this.loadManager.get().start();

if (config.isLoadBalancerEnabled() && !ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
if (config.isLoadBalancerEnabled() && !ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this)) {
LOG.info("Starting load balancer");
if (this.loadReportTask == null) {
long loadReportMinInterval = config.getLoadBalancerReportUpdateMinIntervalMillis();
Expand Down Expand Up @@ -1333,7 +1333,7 @@ public boolean isRunning() {
* @return a reference of the current <code>LeaderElectionService</code> instance.
*/
public LeaderElectionService getLeaderElectionService() {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this)) {
return ExtensibleLoadManagerImpl.get(loadManager.get()).getLeaderElectionService();
} else {
return this.leaderElectionService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,13 +992,13 @@ public CompletableFuture<Void> setNamespaceBundleAffinityAsync(String bundleRang
return CompletableFuture.completedFuture(null);
})
.thenCompose(__ -> {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar())) {
return CompletableFuture.completedFuture(null);
}
return validateLeaderBrokerAsync();
})
.thenAccept(__ -> {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar())) {
return;
}
// For ExtensibleLoadManager, this operation will be ignored.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,6 @@ public ExtensibleLoadManagerImpl() {
this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
}

public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) {
return ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
}

public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerWrapper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public Set<UnloadDecision> findBundlesForUnloading(LoadManagerContext context,
final double targetStd = conf.getLoadBalancerBrokerLoadTargetStd();
boolean transfer = conf.isLoadBalancerTransferEnabled();
if (stats.std() > targetStd
|| isUnderLoaded(context, stats.peekMinBroker(), stats.avg)
|| isUnderLoaded(context, stats.peekMinBroker(), stats)
|| isOverLoaded(context, stats.peekMaxBroker(), stats.avg)) {
unloadConditionHitCount++;
} else {
Expand Down Expand Up @@ -390,7 +390,7 @@ public Set<UnloadDecision> findBundlesForUnloading(LoadManagerContext context,
UnloadDecision.Reason reason;
if (stats.std() > targetStd) {
reason = Overloaded;
} else if (isUnderLoaded(context, stats.peekMinBroker(), stats.avg)) {
} else if (isUnderLoaded(context, stats.peekMinBroker(), stats)) {
reason = Underloaded;
if (debugMode) {
log.info(String.format("broker:%s is underloaded:%s although "
Expand Down Expand Up @@ -669,19 +669,27 @@ public Set<UnloadDecision> findBundlesForUnloading(LoadManagerContext context,
}


private boolean isUnderLoaded(LoadManagerContext context, String broker, double avgLoad) {
private boolean isUnderLoaded(LoadManagerContext context, String broker, LoadStats stats) {
var brokerLoadDataOptional = context.brokerLoadDataStore().get(broker);
if (brokerLoadDataOptional.isEmpty()) {
return false;
}
var brokerLoadData = brokerLoadDataOptional.get();
if (brokerLoadData.getMsgThroughputEMA() < 1) {

var underLoadedMultiplier =
Math.min(0.5, Math.max(0.0, context.brokerConfiguration().getLoadBalancerBrokerLoadTargetStd() / 2.0));

if (brokerLoadData.getWeightedMaxEMA() < stats.avg * underLoadedMultiplier) {
return true;
}

return brokerLoadData.getWeightedMaxEMA()
< avgLoad * Math.min(0.5, Math.max(0.0,
context.brokerConfiguration().getLoadBalancerBrokerLoadTargetStd() / 2));
var maxBrokerLoadDataOptional = context.brokerLoadDataStore().get(stats.peekMaxBroker());
if (maxBrokerLoadDataOptional.isEmpty()) {
return false;
}

return brokerLoadData.getMsgThroughputEMA()
< maxBrokerLoadDataOptional.get().getMsgThroughputEMA() * underLoadedMultiplier;
}

private boolean isOverLoaded(LoadManagerContext context, String broker, double avgLoad) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicN
pulsar.getBrokerId(), optResult.get(), topic);
return CompletableFuture.completedFuture(optResult);
}
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return loadManager.get().findBrokerServiceUrl(Optional.of(topic), bundle);
} else {
// TODO: Add unit tests cover it.
Expand Down Expand Up @@ -306,9 +306,9 @@ private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(Optional<Servi
return CompletableFuture.completedFuture(Optional.empty());
}
CompletableFuture<Optional<LookupResult>> future =
ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)
? loadManager.get().findBrokerServiceUrl(topic, bundle) :
findBrokerServiceUrl(bundle, options);
ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
? loadManager.get().findBrokerServiceUrl(topic, bundle) :
findBrokerServiceUrl(bundle, options);

return future.thenApply(lookupResult -> {
if (lookupResult.isPresent()) {
Expand Down Expand Up @@ -371,7 +371,7 @@ public boolean registerNamespace(NamespaceName nsname, boolean ensureOwned) thro
nsFullBundle = bundleFactory.getFullBundle(nsname);
// v2 namespace will always use full bundle object
final NamespaceEphemeralData otherData;
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl loadManager = ExtensibleLoadManagerImpl.get(this.loadManager.get());
otherData = loadManager.tryAcquiringOwnership(nsFullBundle).get();
} else {
Expand Down Expand Up @@ -783,7 +783,7 @@ public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
long timeout,
TimeUnit timeoutUnit,
boolean closeWithoutWaitingClientDisconnect) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.unloadNamespaceBundleAsync(bundle, destinationBroker);
}
Expand All @@ -805,7 +805,7 @@ public CompletableFuture<Map<String, NamespaceOwnershipStatus>> getOwnedNameSpac
.getIsolationDataPoliciesAsync(pulsar.getConfiguration().getClusterName())
.thenApply(nsIsolationPoliciesOpt -> nsIsolationPoliciesOpt.orElseGet(NamespaceIsolationPolicies::new))
.thenCompose(namespaceIsolationPolicies -> {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager =
ExtensibleLoadManagerImpl.get(loadManager.get());
var statusMap = extensibleLoadManager.getOwnedServiceUnits().stream()
Expand Down Expand Up @@ -883,7 +883,7 @@ public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exceptio
public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, boolean unload,
NamespaceBundleSplitAlgorithm splitAlgorithm,
List<Long> boundaries) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.splitNamespaceBundleAsync(bundle, splitAlgorithm, boundaries);
}
Expand Down Expand Up @@ -1127,7 +1127,7 @@ public OwnershipCache getOwnershipCache() {
}

public Set<NamespaceBundle> getOwnedServiceUnits() {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
return extensibleLoadManager.getOwnedServiceUnits();
}
Expand All @@ -1149,7 +1149,7 @@ public CompletableFuture<Boolean> isServiceUnitOwnedAsync(ServiceUnitId suName)
}

if (suName instanceof NamespaceBundle) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return loadManager.get().checkOwnershipAsync(Optional.empty(), suName);
}
// TODO: Add unit tests cover it.
Expand Down Expand Up @@ -1177,7 +1177,7 @@ public boolean isServiceUnitActive(TopicName topicName) {

public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName topicName) {
// TODO: Add unit tests cover it.
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return getBundleAsync(topicName)
.thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
}
Expand All @@ -1192,7 +1192,7 @@ public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName topicName)

private CompletableFuture<Boolean> isNamespaceOwnedAsync(NamespaceName fqnn) {
// TODO: Add unit tests cover it.
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return getFullBundleAsync(fqnn)
.thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.empty(), bundle));
}
Expand All @@ -1202,7 +1202,7 @@ private CompletableFuture<Boolean> isNamespaceOwnedAsync(NamespaceName fqnn) {

private CompletableFuture<Boolean> isTopicOwnedAsync(TopicName topic) {
// TODO: Add unit tests cover it.
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return getBundleAsync(topic)
.thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topic), bundle));
}
Expand All @@ -1211,7 +1211,7 @@ private CompletableFuture<Boolean> isTopicOwnedAsync(TopicName topic) {

public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) {
// TODO: Add unit tests cover it.
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return getBundleAsync(topicName)
.thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
}
Expand All @@ -1221,7 +1221,7 @@ public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) {

public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle nsBundle) {
CompletableFuture<Void> future;
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
future = extensibleLoadManager.unloadNamespaceBundleAsync(nsBundle, Optional.empty());
} else {
Expand Down Expand Up @@ -1509,7 +1509,7 @@ public PulsarClientImpl getNamespaceClient(ClusterDataImpl cluster) {
}

public CompletableFuture<Optional<NamespaceEphemeralData>> getOwnerAsync(NamespaceBundle bundle) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
return extensibleLoadManager.getOwnershipWithLookupDataAsync(bundle)
.thenCompose(lookupData -> {
Expand All @@ -1530,7 +1530,7 @@ public boolean checkOwnershipPresent(NamespaceBundle bundle) throws Exception {
}

public CompletableFuture<Boolean> checkOwnershipPresentAsync(NamespaceBundle bundle) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
return extensibleLoadManager.getOwnershipAsync(Optional.empty(), bundle)
.thenApply(Optional::isPresent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ public CompletableFuture<Void> validateBundleOwnershipAsync(NamespaceBundle bund
.host(webUrl.get().getHost())
.port(webUrl.get().getPort())
.replaceQueryParam("authoritative", newAuthoritative);
if (!ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
if (!ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
uriBuilder.replaceQueryParam("destinationBroker", null);
}
URI redirect = uriBuilder.build();
Expand Down Expand Up @@ -1010,7 +1010,7 @@ protected boolean isLeaderBroker() {

protected static boolean isLeaderBroker(PulsarService pulsar) {
// For extensible load manager, it doesn't have leader election service on pulsar broker.
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar.getConfig())) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return true;
}
return pulsar.getLeaderElectionService().isLeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1104,16 +1104,17 @@ public void testUnloadBundlesGreaterThanTargetThroughputAfterSplit() throws Ille
assertEquals(stats.std(), 2.5809568279517847E-8);
}


@Test
public void testMinBrokerWithZeroTraffic() throws IllegalAccessException {
public void testMinBrokerWithLowTraffic() throws IllegalAccessException {
UnloadCounter counter = new UnloadCounter();
TransferShedder transferShedder = new TransferShedder(counter);
var ctx = setupContext();
var brokerLoadDataStore = ctx.brokerLoadDataStore();

var load = getCpuLoad(ctx, 4, "broker2:8080");
FieldUtils.writeDeclaredField(load,"msgThroughputEMA", 0, true);
var load = getCpuLoad(ctx, 4, "broker2:8080");
FieldUtils.writeDeclaredField(load, "msgThroughputEMA", 10, true);


brokerLoadDataStore.pushAsync("broker2:8080", load);
brokerLoadDataStore.pushAsync("broker4:8080", getCpuLoad(ctx, 55, "broker4:8080"));
brokerLoadDataStore.pushAsync("broker5:8080", getCpuLoad(ctx, 65, "broker5:8080"));
Expand Down Expand Up @@ -1268,10 +1269,10 @@ public void testOverloadOutlier() {
Assertions.assertThat(res).isIn(
Set.of(new UnloadDecision(
new Unload("broker99:8080", "my-tenant/my-namespace99/0x00000000_0x0FFFFFFF",
Optional.of("broker52:8080")), Success, Overloaded)),
Optional.of("broker52:8080")), Success, Underloaded)),
Set.of(new UnloadDecision(
new Unload("broker99:8080", "my-tenant/my-namespace99/0x00000000_0x0FFFFFFF",
Optional.of("broker83:8080")), Success, Overloaded))
Optional.of("broker83:8080")), Success, Underloaded))
);
assertEquals(counter.getLoadAvg(), 0.019900000000000008, 0.00001);
assertEquals(counter.getLoadStd(), 0.09850375627355534, 0.00001);
Expand Down
Loading