Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] add timeout for health check read. #21990

Merged
merged 6 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -34,6 +35,7 @@
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
Expand Down Expand Up @@ -80,6 +82,12 @@ public class BrokersBase extends AdminResource {
// log a full thread dump when a deadlock is detected in healthcheck once every 10 minutes
// to prevent excessive logging
private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000L;
// there is a timeout of 60 seconds default in the client(readTimeoutMs), so we need to set the timeout
// a bit shorter than 60 seconds to avoid the client timeout exception thrown before the server timeout exception.
// or we can't propagate the server timeout exception to the client.
private static final Duration HEALTH_CHECK_READ_TIMEOUT = Duration.ofSeconds(58);
private static final TimeoutException HEALTH_CHECK_TIMEOUT_EXCEPTION =
FutureUtil.createTimeoutException("Timeout", BrokersBase.class, "healthCheckRecursiveReadNext(...)");
private volatile long threadDumpLoggedTimestamp;

@GET
Expand Down Expand Up @@ -434,7 +442,10 @@ private CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion
});
throw FutureUtil.wrapToCompletionException(createException);
}).thenCompose(reader -> producer.sendAsync(messageStr)
.thenCompose(__ -> healthCheckRecursiveReadNext(reader, messageStr))
.thenCompose(__ -> FutureUtil.addTimeoutHandling(
healthCheckRecursiveReadNext(reader, messageStr),
HEALTH_CHECK_READ_TIMEOUT, pulsar().getBrokerService().executor(),
() -> HEALTH_CHECK_TIMEOUT_EXCEPTION))
.whenComplete((__, ex) -> {
closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName)
.whenComplete((unused, innerEx) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.testng.Assert.assertTrue;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -31,13 +32,21 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.compaction.Compactor;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.springframework.util.CollectionUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -236,4 +245,58 @@ public void testHealthCheckupV2() throws Exception {
))
);
}

class DummyProducerBuilder<T> extends ProducerBuilderImpl<T> {
// This is a dummy producer builder to test the health check timeout
// the producer constructed by this builder will not send any message
public DummyProducerBuilder(PulsarClientImpl client, Schema schema) {
super(client, schema);
}

@Override
public CompletableFuture<Producer<T>> createAsync() {
CompletableFuture<Producer<T>> future = new CompletableFuture<>();
super.createAsync().thenAccept(producer -> {
Producer<T> spyProducer = Mockito.spy(producer);
Mockito.doReturn(CompletableFuture.completedFuture(MessageId.earliest))
.when(spyProducer).sendAsync(Mockito.any());
future.complete(spyProducer);
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
return future;
}
}

@Test
public void testHealthCheckTimeOut() throws Exception {
final String testHealthCheckTopic = String.format("persistent://pulsar/localhost:%s/healthcheck",
pulsar.getConfig().getWebServicePort().get());
PulsarClient client = pulsar.getClient();
PulsarClient spyClient = Mockito.spy(client);
Mockito.doReturn(new DummyProducerBuilder<>((PulsarClientImpl) spyClient, Schema.BYTES))
.when(spyClient).newProducer(Schema.STRING);
// use reflection to replace the client in the broker
Field field = PulsarService.class.getDeclaredField("client");
field.setAccessible(true);
field.set(pulsar, spyClient);
try {
admin.brokers().healthcheck(TopicVersion.V2);
throw new Exception("Should not reach here");
} catch (PulsarAdminException e) {
log.info("Exception caught", e);
assertTrue(e.getMessage().contains("LowOverheadTimeoutException"));
}
// To ensure we don't have any subscription, the producers and readers are closed.
Awaitility.await().untilAsserted(() ->
assertTrue(CollectionUtils.isEmpty(admin.topics()
.getSubscriptions(testHealthCheckTopic).stream()
// All system topics are using compaction, even though is not explicitly set in the policies.
.filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION))
.collect(Collectors.toList())
))
);
}

}
Loading