From 15f876a3525c8e7e54e276cc049a2c5d98f06864 Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Tue, 28 Jan 2020 18:37:21 +0900 Subject: [PATCH] Add timeout to search for web service URLs to avoid web threads getting stuck (#6124) --- .../pulsar/broker/namespace/NamespaceService.java | 13 ++++++++----- .../apache/pulsar/broker/web/PulsarWebResource.java | 9 +++++++++ 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 91eea0717b697..0770a4d8bde52 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -196,22 +196,25 @@ private NamespaceBundle getFullBundle(NamespaceName fqnn) throws Exception { * * If the service unit is not owned, return an empty optional */ - public Optional getWebServiceUrl(ServiceUnitId suName, boolean authoritative, boolean isRequestHttps, boolean readOnly) - throws Exception { + public Optional getWebServiceUrl(ServiceUnitId suName, boolean authoritative, boolean isRequestHttps, + boolean readOnly) throws Exception { if (suName instanceof TopicName) { TopicName name = (TopicName) suName; if (LOG.isDebugEnabled()) { LOG.debug("Getting web service URL of topic: {} - auth: {}", name, authoritative); } - return this.internalGetWebServiceUrl(getBundle(name), authoritative, isRequestHttps, readOnly).get(); + return this.internalGetWebServiceUrl(getBundle(name), authoritative, isRequestHttps, readOnly) + .get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS); } if (suName instanceof NamespaceName) { - return this.internalGetWebServiceUrl(getFullBundle((NamespaceName) suName), authoritative, isRequestHttps, readOnly).get(); + return this.internalGetWebServiceUrl(getFullBundle((NamespaceName) suName), authoritative, isRequestHttps, + readOnly).get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS); } if (suName instanceof NamespaceBundle) { - return this.internalGetWebServiceUrl((NamespaceBundle) suName, authoritative, isRequestHttps, readOnly).get(); + return this.internalGetWebServiceUrl((NamespaceBundle) suName, authoritative, isRequestHttps, readOnly) + .get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS); } throw new IllegalArgumentException("Unrecognized class of NamespaceBundle: " + suName.getClass().getName()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 166551c6f6034..941df28254fbe 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import javax.servlet.ServletContext; import javax.servlet.http.HttpServletRequest; @@ -542,6 +543,10 @@ public void validateBundleOwnership(NamespaceBundle bundle, boolean authoritativ log.debug("Redirecting the rest call to {}", redirect); throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); } + } catch (TimeoutException te) { + String msg = String.format("Finding owner for ServiceUnit %s timed out", bundle); + log.error(msg, te); + throw new RestException(Status.INTERNAL_SERVER_ERROR, msg); } catch (IllegalArgumentException iae) { // namespace format is not valid log.debug(String.format("Failed to find owner for ServiceUnit %s", bundle), iae); @@ -590,6 +595,10 @@ protected void validateTopicOwnership(TopicName topicName, boolean authoritative log.debug("Redirecting the rest call to {}", redirect); throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); } + } catch (TimeoutException te) { + String msg = String.format("Finding owner for topic %s timed out", topicName); + log.error(msg, te); + throw new RestException(Status.INTERNAL_SERVER_ERROR, msg); } catch (IllegalArgumentException iae) { // namespace format is not valid log.debug(String.format("Failed to find owner for topic :%s", topicName), iae);