Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix unload operation stuck when use ExtensibleLoadManager #21332

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public void start() throws PulsarServerException {
}
});
});
this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
this.serviceUnitStateChannel = ServiceUnitStateChannelImpl.newInstance(pulsar);
this.brokerRegistry.start();
this.splitManager = new SplitManager(splitCounter);
this.unloadManager = new UnloadManager(unloadCounter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,18 @@ enum MetadataState {
Unstable
}

public static ServiceUnitStateChannelImpl newInstance(PulsarService pulsar) {
return new ServiceUnitStateChannelImpl(pulsar);
}

public ServiceUnitStateChannelImpl(PulsarService pulsar) {
this(pulsar, MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS, OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS);
}

@VisibleForTesting
public ServiceUnitStateChannelImpl(PulsarService pulsar,
long inFlightStateWaitingTimeInMillis,
long ownershipMonitorDelayTimeInSecs) {
this.pulsar = pulsar;
this.config = pulsar.getConfig();
this.lookupServiceAddress = pulsar.getLookupServiceAddress();
Expand All @@ -210,8 +221,8 @@ public ServiceUnitStateChannelImpl(PulsarService pulsar) {
this.stateChangeListeners = new StateChangeListeners();
this.semiTerminalStateWaitingTimeInMillis = config.getLoadBalancerServiceUnitStateTombstoneDelayTimeInSeconds()
* 1000;
this.inFlightStateWaitingTimeInMillis = MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS;
this.ownershipMonitorDelayTimeInSecs = OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS;
this.inFlightStateWaitingTimeInMillis = inFlightStateWaitingTimeInMillis;
this.ownershipMonitorDelayTimeInSecs = ownershipMonitorDelayTimeInSecs;
if (semiTerminalStateWaitingTimeInMillis < inFlightStateWaitingTimeInMillis) {
throw new IllegalArgumentException(
"Invalid Config: loadBalancerServiceUnitStateCleanUpDelayTimeInSeconds < "
Expand Down Expand Up @@ -837,7 +848,7 @@ private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
} finally {
var future = requested.getValue();
if (future != null) {
future.orTimeout(inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS)
future.orTimeout(inFlightStateWaitingTimeInMillis + 5 * 1000, TimeUnit.MILLISECONDS)
.whenComplete((v, e) -> {
if (e != null) {
getOwnerRequests.remove(serviceUnit, future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ public CompletableFuture<Void> waitAsync(CompletableFuture<Void> eventPubFuture,

@Override
public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
if (t != null && inFlightUnloadRequest.containsKey(serviceUnit)) {
if (log.isDebugEnabled()) {
log.debug("Handling {} for service unit {} with exception.", data, serviceUnit, t);
}
this.complete(serviceUnit, t);
return;
}
ServiceUnitState state = ServiceUnitStateData.state(data);
switch (state) {
case Free, Owned -> this.complete(serviceUnit, t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1558,10 +1558,6 @@ public boolean checkOwnershipPresent(NamespaceBundle bundle) throws Exception {

public CompletableFuture<Boolean> checkOwnershipPresentAsync(NamespaceBundle bundle) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
if (bundle.getNamespaceObject().equals(SYSTEM_NAMESPACE)) {
return FutureUtil.failedFuture(new UnsupportedOperationException(
"Ownership check for system namespace is not supported"));
}
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
return extensibleLoadManager.getOwnershipAsync(Optional.empty(), bundle)
.thenApply(Optional::isPresent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2196,6 +2196,21 @@ private CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit
if (serviceUnit.includes(topicName)) {
// Topic needs to be unloaded
log.info("[{}] Unloading topic", topicName);
if (topicFuture.isCompletedExceptionally()) {
lhotari marked this conversation as resolved.
Show resolved Hide resolved
try {
topicFuture.get();
} catch (InterruptedException | ExecutionException ex) {
if (ex.getCause() instanceof ServiceUnitNotReadyException) {
// Topic was already unloaded
if (log.isDebugEnabled()) {
log.debug("[{}] Topic was already unloaded", topicName);
}
return;
} else {
log.warn("[{}] Got exception when closing topic", topicName, ex);
}
}
}
closeFutures.add(topicFuture
.thenCompose(t -> t.isPresent() ? t.get().close(closeWithoutWaitingClientDisconnect)
: CompletableFuture.completedFuture(null)));
Expand Down
Loading