Skip to content

Commit

Permalink
[feat][broker] PIP-321 Introduce allowed-cluster at the namespace lev…
Browse files Browse the repository at this point in the history
…el (#22378) (#22961)

Co-authored-by: Xiangying Meng <[email protected]>
  • Loading branch information
Demogorgon314 and liangyepianzhou authored Jun 26, 2024
1 parent e8b70bf commit 4bac125
Show file tree
Hide file tree
Showing 12 changed files with 527 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -691,9 +691,21 @@ protected CompletableFuture<Void> internalSetNamespaceReplicationClusters(List<S
"Invalid cluster id: " + clusterId);
}
return validatePeerClusterConflictAsync(clusterId, replicationClusterSet)
.thenCompose(__ ->
validateClusterForTenantAsync(
namespaceName.getTenant(), clusterId));
.thenCompose(__ -> getNamespacePoliciesAsync(this.namespaceName)
.thenCompose(nsPolicies -> {
if (nsPolicies.allowed_clusters.isEmpty()) {
return validateClusterForTenantAsync(
namespaceName.getTenant(), clusterId);
}
if (!nsPolicies.allowed_clusters.contains(clusterId)) {
String msg = String.format("Cluster [%s] is not in the "
+ "list of allowed clusters list for namespace "
+ "[%s]", clusterId, namespaceName.toString());
log.info(msg);
throw new RestException(Status.FORBIDDEN, msg);
}
return CompletableFuture.completedFuture(null);
}));
}).collect(Collectors.toList());
return FutureUtil.waitForAll(futures).thenApply(__ -> replicationClusterSet);
}))
Expand Down Expand Up @@ -2710,4 +2722,65 @@ protected CompletableFuture<Boolean> internalGetDispatcherPauseOnAckStatePersist
return policiesOpt.map(p -> p.dispatcherPauseOnAckStatePersistentEnabled).orElse(false);
});
}

protected CompletableFuture<Void> internalSetNamespaceAllowedClusters(List<String> clusterIds) {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ALLOW_CLUSTERS, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
// Allowed clusters in the namespace policy should be included in the allowed clusters in the tenant
// policy.
.thenCompose(__ -> FutureUtil.waitForAll(clusterIds.stream().map(clusterId ->
validateClusterForTenantAsync(namespaceName.getTenant(), clusterId))
.collect(Collectors.toList())))
// Allowed clusters should include all the existed replication clusters and could not contain global
// cluster.
.thenCompose(__ -> {
checkNotNull(clusterIds, "ClusterIds should not be null");
if (clusterIds.contains("global")) {
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot specify global in the list of allowed clusters");
}
return getNamespacePoliciesAsync(this.namespaceName).thenApply(namespacePolicies -> {
namespacePolicies.replication_clusters.forEach(replicationCluster -> {
if (!clusterIds.contains(replicationCluster)) {
throw new RestException(Status.BAD_REQUEST,
String.format("Allowed clusters do not contain the replication cluster %s. "
+ "Please remove the replication cluster if the cluster is not allowed "
+ "for this namespace", replicationCluster));
}
});
return Sets.newHashSet(clusterIds);
});
})
// Verify the allowed clusters are valid and they do not contain the peer clusters.
.thenCompose(allowedClusters -> clustersAsync()
.thenCompose(clusters -> {
List<CompletableFuture<Void>> futures =
allowedClusters.stream().map(clusterId -> {
if (!clusters.contains(clusterId)) {
throw new RestException(Status.FORBIDDEN,
"Invalid cluster id: " + clusterId);
}
return validatePeerClusterConflictAsync(clusterId, allowedClusters);
}).collect(Collectors.toList());
return FutureUtil.waitForAll(futures).thenApply(__ -> allowedClusters);
}))
// Update allowed clusters into policies.
.thenCompose(allowedClusterSet -> updatePoliciesAsync(namespaceName, policies -> {
policies.allowed_clusters = allowedClusterSet;
return policies;
}));
}

protected CompletableFuture<Set<String>> internalGetNamespaceAllowedClustersAsync() {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ALLOW_CLUSTERS, PolicyOperation.READ)
.thenAccept(__ -> {
if (!namespaceName.isGlobal()) {
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot get the allowed clusters for a non-global namespace");
}
}).thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenApply(policies -> policies.allowed_clusters);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -3041,5 +3041,52 @@ public void getDispatcherPauseOnAckStatePersistent(@Suspended final AsyncRespons
});
}


@POST
@Path("/{tenant}/{namespace}/allowedClusters")
@ApiOperation(value = "Set the allowed clusters for a namespace.")
@ApiResponses(value = {
@ApiResponse(code = 400, message = "The list of allowed clusters should include all replication clusters."),
@ApiResponse(code = 403, message = "The requester does not have admin permissions."),
@ApiResponse(code = 404, message = "The specified tenant, cluster, or namespace does not exist."),
@ApiResponse(code = 409, message = "A peer-cluster cannot be part of an allowed-cluster."),
@ApiResponse(code = 412, message = "The namespace is not global or the provided cluster IDs are invalid.")})
public void setNamespaceAllowedClusters(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@ApiParam(value = "List of allowed clusters", required = true)
List<String> clusterIds) {
validateNamespaceName(tenant, namespace);
internalSetNamespaceAllowedClusters(clusterIds)
.thenAccept(asyncResponse::resume)
.exceptionally(e -> {
log.error("[{}] Failed to set namespace allowed clusters on namespace {}",
clientAppId(), namespace, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return null;
});
}

@GET
@Path("/{tenant}/{namespace}/allowedClusters")
@ApiOperation(value = "Get the allowed clusters for a namespace.",
response = String.class, responseContainer = "List")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Namespace is not global")})
public void getNamespaceAllowedClusters(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalGetNamespaceAllowedClustersAsync()
.thenAccept(asyncResponse::resume)
.exceptionally(e -> {
log.error("[{}] Failed to get namespace allowed clusters on namespace {}", clientAppId(),
namespace, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return null;
});
}

private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1873,52 +1873,78 @@ public CompletableFuture<Void> checkReplication() {
if (log.isDebugEnabled()) {
log.debug("[{}] Checking replication status", name);
}

List<String> configuredClusters = topicPolicies.getReplicationClusters().get();
if (CollectionUtils.isEmpty(configuredClusters)) {
log.warn("[{}] No replication clusters configured", name);
return CompletableFuture.completedFuture(null);
}

int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();

String localCluster = brokerService.pulsar().getConfiguration().getClusterName();

// if local cluster is removed from global namespace cluster-list : then delete topic forcefully
// because pulsar doesn't serve global topic without local repl-cluster configured.
if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
log.info("Deleting topic [{}] because local cluster is not part of "
+ " global namespace repl list {}", topic, configuredClusters);
return deleteForcefully();
}

removeTerminatedReplicators(replicators);
List<CompletableFuture<Void>> futures = new ArrayList<>();

// Check for missing replicators
for (String cluster : configuredClusters) {
if (cluster.equals(localCluster)) {
continue;
return checkAllowedCluster(localCluster).thenCompose(success -> {
if (!success) {
// if local cluster is removed from global namespace cluster-list : then delete topic forcefully
// because pulsar doesn't serve global topic without local repl-cluster configured.
return deleteForcefully();
}
if (!replicators.containsKey(cluster)) {
futures.add(startReplicator(cluster));
}
}

// Check for replicators to be stopped
replicators.forEach((cluster, replicator) -> {
// Update message TTL
((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLInSeconds);
if (!cluster.equals(localCluster)) {
if (!configuredClusters.contains(cluster)) {
futures.add(removeReplicator(cluster));
int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();

removeTerminatedReplicators(replicators);
List<CompletableFuture<Void>> futures = new ArrayList<>();

// The replication clusters at namespace level will get local cluster when creating a namespace.
// If there are only one cluster in the replication clusters, it means the replication is not enabled.
// If the cluster 1 and cluster 2 use the same configuration store and the namespace is created in cluster1
// without enabling geo-replication, then the replication clusters always has cluster1.
//
// When a topic under the namespace is load in the cluster2, the `cluster1` may be identified as
// remote cluster and start geo-replication. This check is to avoid the above case.
if (!(configuredClusters.size() == 1 && replicators.isEmpty())) {
// Check for missing replicators
for (String cluster : configuredClusters) {
if (cluster.equals(localCluster)) {
continue;
}
if (!replicators.containsKey(cluster)) {
futures.add(startReplicator(cluster));
}
}
// Check for replicators to be stopped
replicators.forEach((cluster, replicator) -> {
// Update message TTL
((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLInSeconds);
if (!cluster.equals(localCluster)) {
if (!configuredClusters.contains(cluster)) {
futures.add(removeReplicator(cluster));
}
}
});
}
});

futures.add(checkShadowReplication());
futures.add(checkShadowReplication());

return FutureUtil.waitForAll(futures);
return FutureUtil.waitForAll(futures);
});
}

private CompletableFuture<Boolean> checkAllowedCluster(String localCluster) {
List<String> replicationClusters = topicPolicies.getReplicationClusters().get();
return brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject()).thenCompose(policiesOptional -> {
Set<String> allowedClusters = Set.of();
if (policiesOptional.isPresent()) {
allowedClusters = policiesOptional.get().allowed_clusters;
}
if (TopicName.get(topic).isGlobal() && !replicationClusters.contains(localCluster)
&& !allowedClusters.contains(localCluster)) {
log.warn("Local cluster {} is not part of global namespace repl list {} and allowed list {}",
localCluster, replicationClusters, allowedClusters);
return CompletableFuture.completedFuture(false);
} else {
return CompletableFuture.completedFuture(true);
}
});
}

private CompletableFuture<Void> checkShadowReplication() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -901,14 +901,16 @@ public static CompletableFuture<ClusterDataImpl> checkLocalOrGetPeerReplicationC
log.warn(msg);
validationFuture.completeExceptionally(new RestException(Status.NOT_FOUND,
"Namespace is deleted"));
} else if (policies.replication_clusters.isEmpty()) {
} else if (policies.replication_clusters.isEmpty() && policies.allowed_clusters.isEmpty()) {
String msg = String.format(
"Namespace does not have any clusters configured : local_cluster=%s ns=%s",
localCluster, namespace.toString());
log.warn(msg);
validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, msg));
} else if (!policies.replication_clusters.contains(localCluster)) {
getOwnerFromPeerClusterListAsync(pulsarService, policies.replication_clusters)
} else if (!policies.replication_clusters.contains(localCluster) && !policies.allowed_clusters
.contains(localCluster)) {
getOwnerFromPeerClusterListAsync(pulsarService, policies.replication_clusters,
policies.allowed_clusters)
.thenAccept(ownerPeerCluster -> {
if (ownerPeerCluster != null) {
// found a peer that own this namespace
Expand Down Expand Up @@ -948,9 +950,9 @@ public static CompletableFuture<ClusterDataImpl> checkLocalOrGetPeerReplicationC
}

private static CompletableFuture<ClusterDataImpl> getOwnerFromPeerClusterListAsync(PulsarService pulsar,
Set<String> replicationClusters) {
Set<String> replicationClusters, Set<String> allowedClusters) {
String currentCluster = pulsar.getConfiguration().getClusterName();
if (replicationClusters == null || replicationClusters.isEmpty() || isBlank(currentCluster)) {
if (replicationClusters.isEmpty() && allowedClusters.isEmpty() || isBlank(currentCluster)) {
return CompletableFuture.completedFuture(null);
}

Expand All @@ -960,7 +962,8 @@ private static CompletableFuture<ClusterDataImpl> getOwnerFromPeerClusterListAsy
return CompletableFuture.completedFuture(null);
}
for (String peerCluster : cluster.get().getPeerClusterNames()) {
if (replicationClusters.contains(peerCluster)) {
if (replicationClusters.contains(peerCluster)
|| allowedClusters.contains(peerCluster)) {
return pulsar.getPulsarResources().getClusterResources().getClusterAsync(peerCluster)
.thenApply(ret -> {
if (!ret.isPresent()) {
Expand Down
Loading

0 comments on commit 4bac125

Please sign in to comment.