Skip to content

Commit

Permalink
[improve][broker] PIP-383: Support granting/revoking permissions for …
Browse files Browse the repository at this point in the history
…multiple topics (#23372)
  • Loading branch information
Technoboy- committed Oct 10, 2024
1 parent dd380ec commit e2712b1
Show file tree
Hide file tree
Showing 11 changed files with 413 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
Expand Down Expand Up @@ -223,6 +226,16 @@ CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespac
CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
String authDataJson);

default CompletableFuture<Void> grantPermissionAsync(List<GrantTopicPermissionOptions> options) {
return FutureUtil.failedFuture(new IllegalStateException(
String.format("grantPermissionAsync is not supported by the Authorization")));
}

default CompletableFuture<Void> revokePermissionAsync(List<RevokeTopicPermissionOptions> options) {
return FutureUtil.failedFuture(new IllegalStateException(
String.format("revokePermissionAsync is not supported by the Authorization")));
}


/**
* Revoke authorization-action permission on a topic to the given client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.util.concurrent.TimeUnit.SECONDS;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -32,6 +33,8 @@
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationParameters;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
Expand Down Expand Up @@ -181,6 +184,14 @@ public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<Aut
return provider.grantPermissionAsync(topicName, actions, role, authDataJson);
}

public CompletableFuture<Void> grantPermissionAsync(List<GrantTopicPermissionOptions> options) {
return provider.grantPermissionAsync(options);
}

public CompletableFuture<Void> revokePermissionAsync(List<RevokeTopicPermissionOptions> options) {
return provider.revokePermissionAsync(options);
}

/**
* Revoke authorization-action permission on a topic to the given client.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
Expand Down Expand Up @@ -251,6 +255,80 @@ public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<Aut
});
}

public CompletableFuture<Void> grantPermissionAsync(List<GrantTopicPermissionOptions> options) {
return checkNamespace(options.stream().map(o -> TopicName.get(o.getTopic()).getNamespace()))
.thenCompose(__ -> getPoliciesReadOnlyAsync())
.thenCompose(readonly -> {
if (readonly) {
if (log.isDebugEnabled()) {
log.debug("Policies are read-only. Broker cannot do read-write operations");
}
throw new IllegalStateException("policies are in readonly mode");
}
TopicName topicName = TopicName.get(options.get(0).getTopic());
return pulsarResources.getNamespaceResources()
.setPoliciesAsync(topicName.getNamespaceObject(), policies -> {
options.stream().forEach(o -> {
final String topicUri = TopicName.get(o.getTopic()).toString();
policies.auth_policies.getTopicAuthentication()
.computeIfAbsent(topicUri, __ -> new HashMap<>())
.put(o.getRole(), o.getActions());
});
return policies;
}).whenComplete((__, ex) -> {
if (ex != null) {
log.error("Failed to grant permissions for {}", options);
} else {
log.info("Successfully granted access for {}", options);
}
});
});
}

@Override
public CompletableFuture<Void> revokePermissionAsync(List<RevokeTopicPermissionOptions> options) {
return checkNamespace(options.stream().map(o -> TopicName.get(o.getTopic()).getNamespace()))
.thenCompose(__ -> getPoliciesReadOnlyAsync())
.thenCompose(readonly -> {
if (readonly) {
if (log.isDebugEnabled()) {
log.debug("Policies are read-only. Broker cannot do read-write operations");
}
throw new IllegalStateException("policies are in readonly mode");
}
TopicName topicName = TopicName.get(options.get(0).getTopic());
return pulsarResources.getNamespaceResources()
.setPoliciesAsync(topicName.getNamespaceObject(), policies -> {
options.stream().forEach(o -> {
final String topicUri = TopicName.get(o.getTopic()).toString();
policies.auth_policies.getTopicAuthentication()
.computeIfPresent(topicUri, (topicNameUri, roles) -> {
roles.remove(o.getRole());
if (roles.isEmpty()) {
return null;
}
return roles;
});
});
return policies;
}).whenComplete((__, ex) -> {
if (ex != null) {
log.error("Failed to revoke permissions for {}", options, ex);
} else {
log.info("Successfully revoke permissions for {}", options);
}
});
});
}

private CompletableFuture<Void> checkNamespace(Stream<String> namespaces) {
boolean sameNamespace = namespaces.distinct().count() == 1;
if (!sameNamespace) {
throw new IllegalArgumentException("The namespace should be the same");
}
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> revokePermissionAsync(TopicName topicName, String role) {
return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,4 +935,15 @@ protected void validateOffloadPolicies(OffloadPoliciesImpl offloadPolicies) {
"The bucket must be specified for namespace offload.");
}
}

protected CompletableFuture<Void> internalCheckTopicExists(TopicName topicName) {
return pulsar().getNamespaceService().checkTopicExists(topicName)
.thenAccept(info -> {
boolean exists = info.isExists();
info.recycle();
if (!exists) {
throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString()));
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
Expand All @@ -65,8 +66,10 @@
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.NamedEntity;
Expand Down Expand Up @@ -613,6 +616,78 @@ protected CompletableFuture<Void> internalGrantPermissionOnNamespaceAsync(String
});
}

protected CompletableFuture<Void> internalGrantPermissionOnTopicsAsync(List<GrantTopicPermissionOptions> options) {
return checkNamespace(options.stream().map(o -> TopicName.get(o.getTopic()).getNamespace()))
.thenCompose(__ -> validateAdminAccessForTenantAsync(
TopicName.get(options.get(0).getTopic()).getTenant())
).thenCompose(__ -> internalCheckTopicExists(options.stream().map(o -> TopicName.get(o.getTopic()))))
.thenCompose(__ -> getAuthorizationService().grantPermissionAsync(options))
.thenAccept(unused -> log.info("[{}] Successfully granted access for {}", clientAppId(), options))
.exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
//The IllegalArgumentException and the IllegalStateException were historically thrown by the
// grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
if (realCause instanceof MetadataStoreException.NotFoundException
|| realCause instanceof IllegalArgumentException) {
log.warn("[{}] Failed to grant permissions for namespace {}: does not exist", clientAppId(),
namespaceName, ex);
throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
} else if (realCause instanceof MetadataStoreException.BadVersionException
|| realCause instanceof IllegalStateException) {
log.warn("[{}] Failed to grant permissions for namespace {}: {}",
clientAppId(), namespaceName, ex.getCause().getMessage(), ex);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} else {
log.error("[{}] Failed to grant permissions for namespace {}",
clientAppId(), namespaceName, ex);
throw new RestException(realCause);
}
});
}

protected CompletableFuture<Void> internalRevokePermissionOnTopicsAsync(
List<RevokeTopicPermissionOptions> options) {
return checkNamespace(options.stream().map(o -> TopicName.get(o.getTopic()).getNamespace()))
.thenCompose(__ -> validateAdminAccessForTenantAsync(
TopicName.get(options.get(0).getTopic()).getTenant()))
.thenCompose(__ -> internalCheckTopicExists(options.stream().map(o -> TopicName.get(o.getTopic()))))
.thenCompose(__ -> getAuthorizationService().revokePermissionAsync(options))
.thenAccept(unused -> log.info("[{}] Successfully revoke access for {}", clientAppId(), options))
.exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
//The IllegalArgumentException and the IllegalStateException were historically thrown by the
// grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
if (realCause instanceof MetadataStoreException.NotFoundException
|| realCause instanceof IllegalArgumentException) {
log.warn("[{}] Failed to revoke permissions for namespace {}: does not exist", clientAppId(),
namespaceName, ex);
throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
} else if (realCause instanceof MetadataStoreException.BadVersionException
|| realCause instanceof IllegalStateException) {
log.warn("[{}] Failed to revoke permissions for namespace {}: {}",
clientAppId(), namespaceName, ex.getCause().getMessage(), ex);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} else {
log.error("[{}] Failed to revoke permissions for namespace {}",
clientAppId(), namespaceName, ex);
throw new RestException(realCause);
}
});
}

private CompletableFuture<Void> checkNamespace(Stream<String> namespaces) {
boolean sameNamespace = namespaces.distinct().count() == 1;
if (!sameNamespace) {
throw new RestException(Status.BAD_REQUEST, "The namespace should be the same");
}
return CompletableFuture.completedFuture(null);
}

private CompletableFuture<Void> internalCheckTopicExists(Stream<TopicName> topicNameStream) {
List<TopicName> topicNames = topicNameStream.collect(Collectors.toList());
return CompletableFuture.allOf(topicNames.stream().map(topic -> internalCheckTopicExists(topic))
.toArray(CompletableFuture[]::new));
}

protected CompletableFuture<Void> internalGrantPermissionOnSubscriptionAsync(String subscription,
Set<String> roles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.admin.impl.OffloaderObjectsScannerUtils;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -314,6 +316,48 @@ public void grantPermissionOnNamespace(@Suspended AsyncResponse asyncResponse,
});
}

@POST
@Path("/grantPermissionsOnTopics")
@ApiOperation(value = "Grant new permissions to a role on multi-topics.")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Operation successful"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 500, message = "Internal server error") })
public void grantPermissionsOnTopics(@Suspended final AsyncResponse asyncResponse,
List<GrantTopicPermissionOptions> options) {
internalGrantPermissionOnTopicsAsync(options)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("[{}] Failed to grant permissions {}",
clientAppId(), options, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
@Path("/revokePermissionsOnTopics")
@ApiOperation(value = "Revoke new permissions to a role on multi-topics.")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Operation successful"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 500, message = "Internal server error") })
public void revokePermissionsOnTopics(@Suspended final AsyncResponse asyncResponse,
List<RevokeTopicPermissionOptions> options) {
internalRevokePermissionOnTopicsAsync(options)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("[{}] Failed to revoke permissions {}",
clientAppId(), options, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
@Path("/{property}/{namespace}/permissions/subscription/{subscription}")
@ApiOperation(hidden = true, value = "Grant a new permission to roles for a subscription."
Expand Down
Loading

0 comments on commit e2712b1

Please sign in to comment.