Skip to content

Commit

Permalink
Fix create partitioned topic with a substring of an existing topic na…
Browse files Browse the repository at this point in the history
…me. (apache#6478)

Fixes apache#6468

Fix create a partitioned topic with a substring of an existing topic name. And make create partitioned topic async.
  • Loading branch information
codelipenghui authored Mar 6, 2020
1 parent f2ec1b4 commit 19ccfd5
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.net.MalformedURLException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -36,6 +37,7 @@

import javax.servlet.ServletContext;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
Expand All @@ -46,6 +48,7 @@
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
Expand Down Expand Up @@ -255,35 +258,42 @@ protected List<String> getListOfNamespaces(String property) throws Exception {
return namespaces;
}

protected void tryCreatePartitionsAsync(int numPartitions) {
protected CompletableFuture<Void> tryCreatePartitionsAsync(int numPartitions) {
if (!topicName.isPersistent()) {
return;
return CompletableFuture.completedFuture(null);
}
List<CompletableFuture<Void>> futures = new ArrayList<>(numPartitions);
for (int i = 0; i < numPartitions; i++) {
tryCreatePartitionAsync(i);
futures.add(tryCreatePartitionAsync(i, null));
}
return FutureUtil.waitForAll(futures);
}

private void tryCreatePartitionAsync(final int partition) {
private CompletableFuture<Void> tryCreatePartitionAsync(final int partition, CompletableFuture<Void> reuseFuture) {
CompletableFuture<Void> result = reuseFuture == null ? new CompletableFuture<>() : reuseFuture;
zkCreateOptimisticAsync(localZk(), ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0],
(rc, s, o, s1) -> {
if (KeeperException.Code.OK.intValue() == rc) {
if (log.isDebugEnabled()) {
log.debug("[{}] Topic partition {} created.", clientAppId(),
topicName.getPartition(partition));
}
result.complete(null);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
log.info("[{}] Topic partition {} is exists, doing nothing.", clientAppId(),
topicName.getPartition(partition));
result.completeExceptionally(KeeperException.create(KeeperException.Code.NODEEXISTS));
} else if (KeeperException.Code.BADVERSION.intValue() == rc) {
log.warn("[{}] Fail to create topic partition {} with concurrent modification, retry now.",
clientAppId(), topicName.getPartition(partition));
tryCreatePartitionAsync(partition);
tryCreatePartitionAsync(partition, result);
} else {
log.error("[{}] Fail to create topic partition {}", clientAppId(),
topicName.getPartition(partition), KeeperException.create(KeeperException.Code.get(rc)));
result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
}
});
return result;
}

protected NamespaceName namespaceName;
Expand Down Expand Up @@ -707,4 +717,98 @@ protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
partitionedTopics.sort(null);
return partitionedTopics;
}

protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions) {
try {
validateAdminAccessForTenant(topicName.getTenant());
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
if (numPartitions <= 0) {
asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0"));
return;
}
checkTopicExistsAsync(topicName).thenAccept(exists -> {
if (exists) {
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists"));
} else {
try {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
zkCreateOptimisticAsync(globalZk(), path, data, (rc, s, o, s1) -> {
if (KeeperException.Code.OK.intValue() == rc) {
globalZk().sync(path, (rc2, s2, ctx) -> {
if (KeeperException.Code.OK.intValue() == rc2) {
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
log.info("[{}] Successfully created partitions for topic {}", clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
// The partitioned topic is created but there are some partitions create failed
asyncResponse.resume(new RestException(e));
return null;
});
} else {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc2)));
asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc2))));
}
}, null);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "Partitioned topic already exists"));
} else if (KeeperException.Code.BADVERSION.intValue() == rc) {
log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(),
topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
} else {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc)));
asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc))));
}
});
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}
}).exceptionally(ex -> {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

/**
* Check the exists topics contains the given topic.
* Since there are topic partitions and non-partitioned topics in Pulsar, must ensure both partitions
* and non-partitioned topics are not duplicated. So, if compare with a partition name, we should compare
* to the partitioned name of this partition.
*
* @param topicName given topic name
*/
protected CompletableFuture<Boolean> checkTopicExistsAsync(TopicName topicName) {
return pulsar().getNamespaceService().getListOfTopics(topicName.getNamespaceObject(),
PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
.thenCompose(topics -> {
boolean exists = false;
for (String topic : topics) {
if (topicName.getPartitionedTopicName().equals(TopicName.get(topic).getPartitionedTopicName())) {
exists = true;
break;
}
}
return CompletableFuture.completedFuture(exists);
});
}

protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) {
if (throwable instanceof WebApplicationException) {
asyncResponse.resume(throwable);
} else {
asyncResponse.resume(new RestException(throwable));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import org.apache.pulsar.common.api.proto.PulsarApi;

import static org.apache.pulsar.common.util.Codec.decode;

import com.github.zafarkhaja.semver.Version;
Expand Down Expand Up @@ -390,46 +390,6 @@ protected void internalRevokePermissionsOnTopic(String role) {
revokePermissions(topicName.toString(), role);
}

protected void internalCreatePartitionedTopic(int numPartitions) {
validateAdminAccessForTenant(topicName.getTenant());
if (numPartitions <= 0) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
}
validatePartitionTopicName(topicName.getLocalName());
try {
boolean topicExist = pulsar().getNamespaceService()
.getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
.join()
.contains(topicName.toString());
if (topicExist) {
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
throw new RestException(Status.CONFLICT, "This topic already exists");
}
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
}
try {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
zkCreateOptimistic(path, data);
tryCreatePartitionsAsync(numPartitions);
// Sync data to all quorums and the observers
zkSync(path);
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
} catch (KeeperException.NodeExistsException e) {
log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
throw new RestException(Status.CONFLICT, "Partitioned topic already exists");
} catch (KeeperException.BadVersionException e) {
log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(),
topicName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
}
}

protected void internalCreateNonPartitionedTopic(boolean authoritative) {
validateAdminAccessForTenant(topicName.getTenant());
validateNonPartitionTopicName(topicName.getLocalName());
Expand Down Expand Up @@ -540,11 +500,22 @@ protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateL
}
}

protected void internalCreateMissedPartitions() {
PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName, false, false);
if (metadata != null) {
tryCreatePartitionsAsync(metadata.partitions);
}
protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
if (metadata != null) {
tryCreatePartitionsAsync(metadata.partitions).thenAccept(v -> {
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
resumeAsyncResponseExceptionally(asyncResponse, e);
return null;
});
}
}).exceptionally(e -> {
log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
resumeAsyncResponseExceptionally(asyncResponse, e);
return null;
});
}

private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) {
Expand Down Expand Up @@ -2071,40 +2042,6 @@ private void validatePartitionTopicUpdate(String topicName, int numberOfPartitio
}
}

/**
* Validate partitioned topic name.
* Validation will fail and throw RestException if
* 1) There's already a partitioned topic with same topic name and have some of its partition created.
* 2) There's already non partition topic with same name and contains partition suffix "-partition-"
* followed by numeric value. In this case internal created partition of partitioned topic could override
* the existing non partition topic.
*
* @param topicName
*/
private void validatePartitionTopicName(String topicName) {
List<String> existingTopicList = internalGetList();
String prefix = topicName + TopicName.PARTITIONED_TOPIC_SUFFIX;
for (String existingTopicName : existingTopicList) {
if (existingTopicName.contains(prefix)) {
try {
Long.parseLong(existingTopicName.substring(
existingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX)
+ TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
log.warn("[{}] Already have topic {} which contains partition " +
"suffix '-partition-' and end with numeric value. Creation of partitioned topic {}"
+ "could cause conflict.", clientAppId(), existingTopicName, topicName);
throw new RestException(Status.PRECONDITION_FAILED,
"Already have topic " + existingTopicName + " which contains partition suffix '-partition-' " +
"and end with numeric value, Creation of partitioned topic " + topicName +
" could cause conflict.");
} catch (NumberFormatException e) {
// Do nothing, if value after partition suffix is not pure numeric value,
// as it can't conflict with internal created partitioned topic's name.
}
}
}
}

/**
* Validate non partition topic name,
* Validation will fail and throw RestException if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,41 +124,15 @@ public PersistentTopicInternalStats getInternalStats(@PathParam("property") Stri
@ApiOperation(hidden = true, value = "Create a partitioned topic.", notes = "It needs to be called before creating a producer on a partitioned topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 409, message = "Partitioned topic already exist") })
public void createPartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
int numPartitions) {
validateTopicName(property, cluster, namespace, encodedTopic);
validateAdminAccessForTenant(topicName.getTenant());
if (numPartitions <= 0) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
}
try {
boolean topicExist = pulsar().getNamespaceService()
.getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
.join()
.contains(topicName.toString());
if (topicExist) {
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
throw new RestException(Status.CONFLICT, "This topic already exists");
}
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
}
try {
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
topicName.getEncodedLocalName());
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
zkCreateOptimistic(path, data);
// Sync data to all quorums and the observers
zkSync(path);
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
} catch (KeeperException.NodeExistsException e) {
log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
throw new RestException(Status.CONFLICT, "Partitioned topic already exist");
validateTopicName(property, cluster, namespace, encodedTopic);
internalCreatePartitionedTopic(asyncResponse, numPartitions);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}

Expand Down
Loading

0 comments on commit 19ccfd5

Please sign in to comment.