Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker][branch-3.3] PIP-321 Introduce allowed-cluster at the namespace level (#22378) #22961

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -691,9 +691,21 @@ protected CompletableFuture<Void> internalSetNamespaceReplicationClusters(List<S
"Invalid cluster id: " + clusterId);
}
return validatePeerClusterConflictAsync(clusterId, replicationClusterSet)
.thenCompose(__ ->
validateClusterForTenantAsync(
namespaceName.getTenant(), clusterId));
.thenCompose(__ -> getNamespacePoliciesAsync(this.namespaceName)
.thenCompose(nsPolicies -> {
if (nsPolicies.allowed_clusters.isEmpty()) {
return validateClusterForTenantAsync(
namespaceName.getTenant(), clusterId);
}
if (!nsPolicies.allowed_clusters.contains(clusterId)) {
String msg = String.format("Cluster [%s] is not in the "
+ "list of allowed clusters list for namespace "
+ "[%s]", clusterId, namespaceName.toString());
log.info(msg);
throw new RestException(Status.FORBIDDEN, msg);
}
return CompletableFuture.completedFuture(null);
}));
}).collect(Collectors.toList());
return FutureUtil.waitForAll(futures).thenApply(__ -> replicationClusterSet);
}))
Expand Down Expand Up @@ -2710,4 +2722,65 @@ protected CompletableFuture<Boolean> internalGetDispatcherPauseOnAckStatePersist
return policiesOpt.map(p -> p.dispatcherPauseOnAckStatePersistentEnabled).orElse(false);
});
}

protected CompletableFuture<Void> internalSetNamespaceAllowedClusters(List<String> clusterIds) {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ALLOW_CLUSTERS, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
// Allowed clusters in the namespace policy should be included in the allowed clusters in the tenant
// policy.
.thenCompose(__ -> FutureUtil.waitForAll(clusterIds.stream().map(clusterId ->
validateClusterForTenantAsync(namespaceName.getTenant(), clusterId))
.collect(Collectors.toList())))
// Allowed clusters should include all the existed replication clusters and could not contain global
// cluster.
.thenCompose(__ -> {
checkNotNull(clusterIds, "ClusterIds should not be null");
if (clusterIds.contains("global")) {
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot specify global in the list of allowed clusters");
}
return getNamespacePoliciesAsync(this.namespaceName).thenApply(namespacePolicies -> {
namespacePolicies.replication_clusters.forEach(replicationCluster -> {
if (!clusterIds.contains(replicationCluster)) {
throw new RestException(Status.BAD_REQUEST,
String.format("Allowed clusters do not contain the replication cluster %s. "
+ "Please remove the replication cluster if the cluster is not allowed "
+ "for this namespace", replicationCluster));
}
});
return Sets.newHashSet(clusterIds);
});
})
// Verify the allowed clusters are valid and they do not contain the peer clusters.
.thenCompose(allowedClusters -> clustersAsync()
.thenCompose(clusters -> {
List<CompletableFuture<Void>> futures =
allowedClusters.stream().map(clusterId -> {
if (!clusters.contains(clusterId)) {
throw new RestException(Status.FORBIDDEN,
"Invalid cluster id: " + clusterId);
}
return validatePeerClusterConflictAsync(clusterId, allowedClusters);
}).collect(Collectors.toList());
return FutureUtil.waitForAll(futures).thenApply(__ -> allowedClusters);
}))
// Update allowed clusters into policies.
.thenCompose(allowedClusterSet -> updatePoliciesAsync(namespaceName, policies -> {
policies.allowed_clusters = allowedClusterSet;
return policies;
}));
}

protected CompletableFuture<Set<String>> internalGetNamespaceAllowedClustersAsync() {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ALLOW_CLUSTERS, PolicyOperation.READ)
.thenAccept(__ -> {
if (!namespaceName.isGlobal()) {
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot get the allowed clusters for a non-global namespace");
}
}).thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenApply(policies -> policies.allowed_clusters);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -3041,5 +3041,52 @@ public void getDispatcherPauseOnAckStatePersistent(@Suspended final AsyncRespons
});
}


@POST
@Path("/{tenant}/{namespace}/allowedClusters")
@ApiOperation(value = "Set the allowed clusters for a namespace.")
@ApiResponses(value = {
@ApiResponse(code = 400, message = "The list of allowed clusters should include all replication clusters."),
@ApiResponse(code = 403, message = "The requester does not have admin permissions."),
@ApiResponse(code = 404, message = "The specified tenant, cluster, or namespace does not exist."),
@ApiResponse(code = 409, message = "A peer-cluster cannot be part of an allowed-cluster."),
@ApiResponse(code = 412, message = "The namespace is not global or the provided cluster IDs are invalid.")})
public void setNamespaceAllowedClusters(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@ApiParam(value = "List of allowed clusters", required = true)
List<String> clusterIds) {
validateNamespaceName(tenant, namespace);
internalSetNamespaceAllowedClusters(clusterIds)
.thenAccept(asyncResponse::resume)
.exceptionally(e -> {
log.error("[{}] Failed to set namespace allowed clusters on namespace {}",
clientAppId(), namespace, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return null;
});
}

@GET
@Path("/{tenant}/{namespace}/allowedClusters")
@ApiOperation(value = "Get the allowed clusters for a namespace.",
response = String.class, responseContainer = "List")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Namespace is not global")})
public void getNamespaceAllowedClusters(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalGetNamespaceAllowedClustersAsync()
.thenAccept(asyncResponse::resume)
.exceptionally(e -> {
log.error("[{}] Failed to get namespace allowed clusters on namespace {}", clientAppId(),
namespace, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return null;
});
}

private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1873,52 +1873,78 @@ public CompletableFuture<Void> checkReplication() {
if (log.isDebugEnabled()) {
log.debug("[{}] Checking replication status", name);
}

List<String> configuredClusters = topicPolicies.getReplicationClusters().get();
if (CollectionUtils.isEmpty(configuredClusters)) {
log.warn("[{}] No replication clusters configured", name);
return CompletableFuture.completedFuture(null);
}

int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();

String localCluster = brokerService.pulsar().getConfiguration().getClusterName();

// if local cluster is removed from global namespace cluster-list : then delete topic forcefully
// because pulsar doesn't serve global topic without local repl-cluster configured.
if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
log.info("Deleting topic [{}] because local cluster is not part of "
+ " global namespace repl list {}", topic, configuredClusters);
return deleteForcefully();
}

removeTerminatedReplicators(replicators);
List<CompletableFuture<Void>> futures = new ArrayList<>();

// Check for missing replicators
for (String cluster : configuredClusters) {
if (cluster.equals(localCluster)) {
continue;
return checkAllowedCluster(localCluster).thenCompose(success -> {
if (!success) {
// if local cluster is removed from global namespace cluster-list : then delete topic forcefully
// because pulsar doesn't serve global topic without local repl-cluster configured.
return deleteForcefully();
}
if (!replicators.containsKey(cluster)) {
futures.add(startReplicator(cluster));
}
}

// Check for replicators to be stopped
replicators.forEach((cluster, replicator) -> {
// Update message TTL
((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLInSeconds);
if (!cluster.equals(localCluster)) {
if (!configuredClusters.contains(cluster)) {
futures.add(removeReplicator(cluster));
int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();

removeTerminatedReplicators(replicators);
List<CompletableFuture<Void>> futures = new ArrayList<>();

// The replication clusters at namespace level will get local cluster when creating a namespace.
// If there are only one cluster in the replication clusters, it means the replication is not enabled.
// If the cluster 1 and cluster 2 use the same configuration store and the namespace is created in cluster1
// without enabling geo-replication, then the replication clusters always has cluster1.
//
// When a topic under the namespace is load in the cluster2, the `cluster1` may be identified as
// remote cluster and start geo-replication. This check is to avoid the above case.
if (!(configuredClusters.size() == 1 && replicators.isEmpty())) {
// Check for missing replicators
for (String cluster : configuredClusters) {
if (cluster.equals(localCluster)) {
continue;
}
if (!replicators.containsKey(cluster)) {
futures.add(startReplicator(cluster));
}
}
// Check for replicators to be stopped
replicators.forEach((cluster, replicator) -> {
// Update message TTL
((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLInSeconds);
if (!cluster.equals(localCluster)) {
if (!configuredClusters.contains(cluster)) {
futures.add(removeReplicator(cluster));
}
}
});
}
});

futures.add(checkShadowReplication());
futures.add(checkShadowReplication());

return FutureUtil.waitForAll(futures);
return FutureUtil.waitForAll(futures);
});
}

private CompletableFuture<Boolean> checkAllowedCluster(String localCluster) {
List<String> replicationClusters = topicPolicies.getReplicationClusters().get();
return brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject()).thenCompose(policiesOptional -> {
Set<String> allowedClusters = Set.of();
if (policiesOptional.isPresent()) {
allowedClusters = policiesOptional.get().allowed_clusters;
}
if (TopicName.get(topic).isGlobal() && !replicationClusters.contains(localCluster)
&& !allowedClusters.contains(localCluster)) {
log.warn("Local cluster {} is not part of global namespace repl list {} and allowed list {}",
localCluster, replicationClusters, allowedClusters);
return CompletableFuture.completedFuture(false);
} else {
return CompletableFuture.completedFuture(true);
}
});
}

private CompletableFuture<Void> checkShadowReplication() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -901,14 +901,16 @@ public static CompletableFuture<ClusterDataImpl> checkLocalOrGetPeerReplicationC
log.warn(msg);
validationFuture.completeExceptionally(new RestException(Status.NOT_FOUND,
"Namespace is deleted"));
} else if (policies.replication_clusters.isEmpty()) {
} else if (policies.replication_clusters.isEmpty() && policies.allowed_clusters.isEmpty()) {
String msg = String.format(
"Namespace does not have any clusters configured : local_cluster=%s ns=%s",
localCluster, namespace.toString());
log.warn(msg);
validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, msg));
} else if (!policies.replication_clusters.contains(localCluster)) {
getOwnerFromPeerClusterListAsync(pulsarService, policies.replication_clusters)
} else if (!policies.replication_clusters.contains(localCluster) && !policies.allowed_clusters
.contains(localCluster)) {
getOwnerFromPeerClusterListAsync(pulsarService, policies.replication_clusters,
policies.allowed_clusters)
.thenAccept(ownerPeerCluster -> {
if (ownerPeerCluster != null) {
// found a peer that own this namespace
Expand Down Expand Up @@ -948,9 +950,9 @@ public static CompletableFuture<ClusterDataImpl> checkLocalOrGetPeerReplicationC
}

private static CompletableFuture<ClusterDataImpl> getOwnerFromPeerClusterListAsync(PulsarService pulsar,
Set<String> replicationClusters) {
Set<String> replicationClusters, Set<String> allowedClusters) {
String currentCluster = pulsar.getConfiguration().getClusterName();
if (replicationClusters == null || replicationClusters.isEmpty() || isBlank(currentCluster)) {
if (replicationClusters.isEmpty() && allowedClusters.isEmpty() || isBlank(currentCluster)) {
return CompletableFuture.completedFuture(null);
}

Expand All @@ -960,7 +962,8 @@ private static CompletableFuture<ClusterDataImpl> getOwnerFromPeerClusterListAsy
return CompletableFuture.completedFuture(null);
}
for (String peerCluster : cluster.get().getPeerClusterNames()) {
if (replicationClusters.contains(peerCluster)) {
if (replicationClusters.contains(peerCluster)
|| allowedClusters.contains(peerCluster)) {
return pulsar.getPulsarResources().getClusterResources().getClusterAsync(peerCluster)
.thenApply(ret -> {
if (!ret.isPresent()) {
Expand Down
Loading
Loading