Skip to content

Commit

Permalink
[broker] fix GetTopicsOfNamespace with binary lookup service not ch…
Browse files Browse the repository at this point in the history
…eck auth (#11172)

(cherry picked from commit 64b44a5)
  • Loading branch information
freeznet authored and michaeljmarshall committed Dec 10, 2021
1 parent 8e97d58 commit deb9b0b
Show file tree
Hide file tree
Showing 2 changed files with 559 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandNewTxn;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.CommandUtils;
Expand Down Expand Up @@ -1612,30 +1613,97 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
});
}

private CompletableFuture<Boolean> isNamespaceOperationAllowed(NamespaceName namespaceName,
NamespaceOperation operation) {
CompletableFuture<Boolean> isProxyAuthorizedFuture;
CompletableFuture<Boolean> isAuthorizedFuture;
if (service.isAuthorizationEnabled()) {
if (originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationService().allowNamespaceOperationAsync(
namespaceName, operation, originalPrincipal, getAuthenticationData());
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
isAuthorizedFuture = service.getAuthorizationService().allowNamespaceOperationAsync(
namespaceName, operation, authRole, authenticationData);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
isAuthorizedFuture = CompletableFuture.completedFuture(true);
}
return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
if (!isProxyAuthorized) {
log.warn("OriginalRole {} is not authorized to perform operation {} on namespace {}",
originalPrincipal, operation, namespaceName);
}
if (!isAuthorized) {
log.warn("Role {} is not authorized to perform operation {} on namespace {}",
authRole, operation, namespaceName);
}
return isProxyAuthorized && isAuthorized;
});
}

@Override
protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
final long requestId = commandGetTopicsOfNamespace.getRequestId();
final String namespace = commandGetTopicsOfNamespace.getNamespace();
final CommandGetTopicsOfNamespace.Mode mode = commandGetTopicsOfNamespace.getMode();
final NamespaceName namespaceName = NamespaceName.get(namespace);

getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
.thenAccept(topics -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}",
remoteAddress, namespace, requestId, topics.size());
}
commandSender.sendGetTopicsOfNamespaceResponse(topics, requestId);
})
.exceptionally(ex -> {
log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}",
remoteAddress, namespace, requestId);
commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(new ServerMetadataException(ex)),
ex.getMessage());

return null;
});
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
if (invalidOriginalPrincipal(originalPrincipal)) {
final String msg = "Valid Proxy Client role should be provided for getTopicsOfNamespaceRequest ";
log.warn("[{}] {} with role {} and proxyClientAuthRole {} on namespace {}", remoteAddress, msg,
authRole, originalPrincipal, namespaceName);
commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, msg);
lookupSemaphore.release();
return;
}
isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
if (isAuthorized) {
getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
.thenAccept(topics -> {
if (log.isDebugEnabled()) {
log.debug(
"[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}",
remoteAddress, namespace, requestId, topics.size());
}
commandSender.sendGetTopicsOfNamespaceResponse(topics, requestId);
lookupSemaphore.release();
})
.exceptionally(ex -> {
log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}",
remoteAddress, namespace, requestId);
commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(new ServerMetadataException(ex)),
ex.getMessage());
lookupSemaphore.release();
return null;
});
} else {
final String msg = "Proxy Client is not authorized to GetTopicsOfNamespace";
log.warn("[{}] {} with role {} on namespace {}", remoteAddress, msg, getPrincipal(), namespaceName);
commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, msg);
lookupSemaphore.release();
}
return null;
}).exceptionally(ex -> {
logNamespaceNameAuthException(remoteAddress, "GetTopicsOfNamespace", getPrincipal(),
Optional.of(namespaceName), ex);
final String msg = "Exception occurred while trying to authorize GetTopicsOfNamespace";
commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, msg);
lookupSemaphore.release();
return null;
});
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed GetTopicsOfNamespace lookup due to too many lookup-requests {}", remoteAddress,
namespaceName);
}
commandSender.sendErrorResponse(requestId, ServerError.TooManyRequests,
"Failed due to too many pending lookup requests");
}
}

@Override
Expand Down Expand Up @@ -2270,4 +2338,16 @@ private static void logAuthException(SocketAddress remoteAddress, String operati
remoteAddress, operation, principal, topicString, ex);
}
}

private static void logNamespaceNameAuthException(SocketAddress remoteAddress, String operation,
String principal, Optional<NamespaceName> namespaceName, Throwable ex) {
String namespaceNameString = namespaceName.map(t -> ", namespace=" + t.toString()).orElse("");
if (ex instanceof AuthenticationException) {
log.info("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}",
remoteAddress, operation, principal, namespaceNameString, ex.getMessage());
} else {
log.error("[{}] Error trying to authenticate: operation={}, principal={}{}",
remoteAddress, operation, principal, namespaceNameString, ex);
}
}
}
Loading

0 comments on commit deb9b0b

Please sign in to comment.