Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to create partitioned topic with 1 partition #4764

Merged
merged 2 commits into from
Jul 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ protected void internalRevokePermissionsOnTopic(String role) {

protected void internalCreatePartitionedTopic(int numPartitions) {
validateAdminAccessForTenant(topicName.getTenant());
if (numPartitions <= 1) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 1");
if (numPartitions <= 0) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
}
try {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
Expand Down Expand Up @@ -431,8 +431,8 @@ protected void internalUpdatePartitionedTopic(int numPartitions) {
topicName);
throw new RestException(Status.FORBIDDEN, "Update forbidden on global namespace");
}
if (numPartitions <= 1) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 1");
if (numPartitions <= 0) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
}
try {
updatePartitionedTopic(topicName, numPartitions).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ public void createPartitionedTopic(@PathParam("property") String property, @Path
int numPartitions) {
validateTopicName(property, cluster, namespace, encodedTopic);
validateAdminAccessForTenant(topicName.getTenant());
if (numPartitions <= 1) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 1");
if (numPartitions <= 0) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
}
try {
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ public void createPartitionedTopic(
validateGlobalNamespaceOwnership(tenant,namespace);
validateTopicName(tenant, namespace, encodedTopic);
validateAdminAccessForTenant(topicName.getTenant());
if (numPartitions <= 1) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 1");
if (numPartitions <= 0) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
}
try {
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public void createNonPartitionedTopic(
@ApiResponse(code = 401, message = "Don't have permission to adminisActions to be grantedtrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 1"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0"),
@ApiResponse(code = 409, message = "Partitioned topic does not exist"),
@ApiResponse(code = 412, message = "Partitioned topic name is invalid"),
@ApiResponse(code = 500, message = "Internal server error")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void createPartitionedTopic(String topic, int numPartitions) throws Pulsa

@Override
public CompletableFuture<Void> createPartitionedTopicAsync(String topic, int numPartitions) {
checkArgument(numPartitions > 1, "Number of partitions should be more than 1");
checkArgument(numPartitions > 0, "Number of partitions should be more than 0");
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "partitions");
return asyncPutRequest(path, Entity.entity(numPartitions, MediaType.APPLICATION_JSON));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,17 +217,17 @@ public void createNonPartitionedTopic(String topic) throws PulsarAdminException
throw new PulsarAdminException(e);
}
}

@Override
public CompletableFuture<Void> createNonPartitionedTopicAsync(String topic){
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn);
return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}

@Override
public CompletableFuture<Void> createPartitionedTopicAsync(String topic, int numPartitions) {
checkArgument(numPartitions > 1, "Number of partitions should be more than 1");
checkArgument(numPartitions > 0, "Number of partitions should be more than 0");
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "partitions");
return asyncPutRequest(path, Entity.entity(numPartitions, MediaType.APPLICATION_JSON));
Expand All @@ -247,7 +247,7 @@ public void updatePartitionedTopic(String topic, int numPartitions) throws Pulsa

@Override
public CompletableFuture<Void> updatePartitionedTopicAsync(String topic, int numPartitions) {
checkArgument(numPartitions > 1, "Number of partitions must be more than 1");
checkArgument(numPartitions > 0, "Number of partitions must be more than 0");
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "partitions");
return asyncPostRequest(path, Entity.entity(numPartitions, MediaType.APPLICATION_JSON));
Expand Down Expand Up @@ -569,7 +569,7 @@ public void failed(Throwable throwable) {
});
return future;
}

@Override
public void deleteSubscription(String topic, String subName) throws PulsarAdminException {
try {
Expand Down