Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] timeout when broker registry hangs and monitor broker registry (ExtensibleLoadManagerImpl only) #23382

Merged
merged 6 commits into from
Oct 3, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add health check verfiication
heesung-sn committed Oct 2, 2024
commit ead02461d9477a7306397cc9c5e207520eeb888f
Original file line number Diff line number Diff line change
@@ -48,6 +48,7 @@
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService.State;
@@ -368,20 +369,26 @@ public void isReady(@Suspended AsyncResponse asyncResponse) {
@ApiOperation(value = "Run a healthCheck against the broker")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Everything is OK"),
@ApiResponse(code = 307, message = "Current broker is not the target broker"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Cluster doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error")})
public void healthCheck(@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "Topic Version")
@QueryParam("topicVersion") TopicVersion topicVersion) {
@QueryParam("topicVersion") TopicVersion topicVersion,
@QueryParam("brokerId") String brokerId) {
validateSuperUserAccessAsync()
.thenAccept(__ -> checkDeadlockedThreads())
.thenCompose(__ -> maybeRedirectToBroker(
StringUtils.isBlank(brokerId) ? pulsar().getBrokerId() : brokerId))
.thenCompose(__ -> internalRunHealthCheck(topicVersion))
.thenAccept(__ -> {
LOG.info("[{}] Successfully run health check.", clientAppId());
asyncResponse.resume(Response.ok("ok").build());
}).exceptionally(ex -> {
LOG.error("[{}] Fail to run health check.", clientAppId(), ex);
if (!isRedirectException(ex)) {
LOG.error("[{}] Fail to run health check.", clientAppId(), ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
Original file line number Diff line number Diff line change
@@ -54,6 +54,10 @@ public class BrokerRegistryImpl implements BrokerRegistry {

private final PulsarService pulsar;

private static final int MAX_REGISTER_RETRY = 10;

private static final int MAX_REGISTER_RETRY_DELAY_IN_MILLIS = 1000;

private final ServiceConfiguration conf;

private final BrokerLookupData brokerLookupData;
@@ -149,6 +153,35 @@ public CompletableFuture<Void> registerAsync() {
});
}

private void doRegisterAsyncWithRetries(int retry, CompletableFuture<Void> future) {
this.scheduler.schedule(() -> {
registerAsync().whenComplete((__, e) -> {
if (e != null) {
if (retry == MAX_REGISTER_RETRY) {
future.completeExceptionally(new PulsarServerException("Stopped registering self retries", e));
} else {
doRegisterAsyncWithRetries(retry + 1, future);
}
} else {
future.complete(null);
}
});
}, Math.min(MAX_REGISTER_RETRY_DELAY_IN_MILLIS, retry * retry * 50), TimeUnit.MILLISECONDS);
}

private CompletableFuture<Void> registerAsyncWithRetries() {
var future = registerAsync();
return future.handle((__, e) -> {
if (e != null) {
var retryFuture = new CompletableFuture<Void>();
doRegisterAsyncWithRetries(1, retryFuture);
return retryFuture.join();
} else {
return null;
}
});
}

@Override
public synchronized void unregister() throws MetadataStoreException {
if (state.compareAndSet(State.Registered, State.Unregistering)) {
@@ -235,17 +268,25 @@ private void handleMetadataStoreNotification(Notification t) {
// The registered node is an ephemeral node that could be deleted when the metadata store client's session
// is expired. In this case, we should register again.
final var brokerId = t.getPath().substring(LOADBALANCE_BROKERS_ROOT.length() + 1);

CompletableFuture<Void> register;
if (t.getType() == NotificationType.Deleted && getBrokerId().equals(brokerId)) {
registerAsync();
register = registerAsyncWithRetries();
} else {
register = CompletableFuture.completedFuture(null);
}
if (listeners.isEmpty()) {
return;
}
this.scheduler.submit(() -> {
for (BiConsumer<String, NotificationType> listener : listeners) {
listener.accept(brokerId, t.getType());
// Make sure to run the listeners after re-registered.
register.thenAccept(__ -> {
if (listeners.isEmpty()) {
return;
}
this.scheduler.submit(() -> {
for (BiConsumer<String, NotificationType> listener : listeners) {
listener.accept(brokerId, t.getType());
}
});
});

} catch (RejectedExecutionException e) {
// Executor is shutting down
}
Original file line number Diff line number Diff line change
@@ -86,11 +86,13 @@
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Reflections;
@@ -108,13 +110,16 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs to clean immediately
private static final long MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS = 10;
private static final long MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS = 10 * 60 * 1000;
private static final long MAX_BROKER_HEALTH_CHECK_RETRY = 3;
private static final long MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS = 1000;
private final PulsarService pulsar;
private final ServiceConfiguration config;
private final Schema<ServiceUnitStateData> schema;
private final Map<String, CompletableFuture<String>> getOwnerRequests;
private final String brokerId;
private final Map<String, CompletableFuture<Void>> cleanupJobs;
private final StateChangeListeners stateChangeListeners;

private BrokerRegistry brokerRegistry;
private LeaderElectionService leaderElectionService;

@@ -350,6 +355,11 @@ protected LeaderElectionService getLeaderElectionService() {
.get().getLeaderElectionService();
}

@VisibleForTesting
protected PulsarAdmin getPulsarAdmin() throws PulsarServerException {
return pulsar.getAdminClient();
}

@Override
public synchronized void close() throws PulsarServerException {
channelState = Closed;
@@ -1255,20 +1265,24 @@ private MetadataState getMetadataState() {
}

private void handleBrokerCreationEvent(String broker) {
CompletableFuture<Void> future = cleanupJobs.remove(broker);
if (future != null) {
future.cancel(false);
totalInactiveBrokerCleanupCancelledCnt++;
log.info("Successfully cancelled the ownership cleanup for broker:{}."
+ " Active cleanup job count:{}",
broker, cleanupJobs.size());
} else {
if (debug()) {
log.info("No needs to cancel the ownership cleanup for broker:{}."
+ " There was no scheduled cleanup job. Active cleanup job count:{}",
broker, cleanupJobs.size());
}
}

healthCheckBrokerAsync(broker)
.thenAccept(__ -> {
CompletableFuture<Void> future = cleanupJobs.remove(broker);
if (future != null) {
future.cancel(false);
totalInactiveBrokerCleanupCancelledCnt++;
log.info("Successfully cancelled the ownership cleanup for broker:{}."
+ " Active cleanup job count:{}",
broker, cleanupJobs.size());
} else {
if (debug()) {
log.info("No needs to cancel the ownership cleanup for broker:{}."
+ " There was no scheduled cleanup job. Active cleanup job count:{}",
broker, cleanupJobs.size());
}
}
});
}

private void handleBrokerDeletionEvent(String broker) {
@@ -1431,6 +1445,37 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max
System.currentTimeMillis() - started);
}

private CompletableFuture<Void> healthCheckBrokerAsync(String brokerId) {
CompletableFuture<Void> future = new CompletableFuture<>();
doHealthCheckBrokerAsyncWithRetries(brokerId, 0, future);
return future;
}

private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int retry, CompletableFuture<Void> future) {
try {
var admin = getPulsarAdmin();
admin.brokers().healthcheckAsync(TopicVersion.V2, Optional.of(brokerId))
.whenComplete((__, e) -> {
if (e == null) {
log.info("Completed health-check broker :{}", brokerId, e);
future.complete(null);
return;
}
if (retry == MAX_BROKER_HEALTH_CHECK_RETRY) {
log.error("Failed health-check broker :{}", brokerId, e);
future.completeExceptionally(FutureUtil.unwrapCompletionException(e));
} else {
pulsar.getExecutor()
.schedule(() -> doHealthCheckBrokerAsyncWithRetries(brokerId, retry + 1, future),
Math.min(MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS, retry * retry * 50),
MILLISECONDS);
}
});
} catch (PulsarServerException e) {
future.completeExceptionally(e);
}
}

private synchronized void doCleanup(String broker, boolean gracefully) {
try {
if (getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS)
@@ -1444,6 +1489,23 @@ private synchronized void doCleanup(String broker, boolean gracefully) {
return;
}

// if not gracefully, verify the broker is inactive by health-check.
if (!gracefully) {
try {
healthCheckBrokerAsync(broker).get(
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
log.warn("Found that the broker to clean is healthy. Skip the broker:{}'s orphan bundle cleanup",
broker);
return;
} catch (Exception e) {
if (debug()) {
log.info("Failed to check broker:{} health", broker, e);
}
log.info("Checked the broker:{} health. Continue the orphan bundle cleanup", broker);
}
}


long startTime = System.nanoTime();
log.info("Started ownership cleanup for the inactive broker:{}", broker);
int orphanServiceUnitCleanupCnt = 0;
Loading