Skip to content

Commit

Permalink
[Issue 5903] Support compact all partitions of a partitioned topic (#…
Browse files Browse the repository at this point in the history
…6537)

* Resolve conflict.

* Add a unit test to cover the compact logic.
  • Loading branch information
murong00 authored Mar 19, 2020
1 parent 83efd44 commit 9634eb7
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1976,7 +1976,82 @@ private void internalExpireMessagesForSinglePartition(String subName, int expire
}
}

protected void internalTriggerCompaction(boolean authoritative) {
protected void internalTriggerCompaction(AsyncResponse asyncResponse, boolean authoritative) {
log.info("[{}] Trigger compaction on topic {}", clientAppId(), topicName);
try {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
} catch (Exception e) {
log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}

// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
try {
internalTriggerCompactionNonPartitionedTopic(authoritative);
} catch (Exception e) {
log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
asyncResponse.resume(Response.noContent().build());
} else {
getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();

for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics().triggerCompactionAsync(topicNamePartition.toString()));
} catch (Exception e) {
log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicNamePartition, e);
asyncResponse.resume(new RestException(e));
return;
}
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable th = exception.getCause();
if (th instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage()));
return null;
} else if (th instanceof WebApplicationException) {
asyncResponse.resume(th);
return null;
} else {
log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, exception);
asyncResponse.resume(new RestException(exception));
return null;
}
}
asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
try {
internalTriggerCompactionNonPartitionedTopic(authoritative);
} catch (Exception e) {
log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
asyncResponse.resume(Response.noContent().build());
}
}).exceptionally(ex -> {
log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}

protected void internalTriggerCompactionNonPartitionedTopic(boolean authoritative) {
validateWriteOperationOnTopic(authoritative);

PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
Expand All @@ -1985,6 +2060,7 @@ protected void internalTriggerCompaction(boolean authoritative) {
} catch (AlreadyRunningException e) {
throw new RestException(Status.CONFLICT, e.getMessage());
} catch (Exception e) {
log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, e);
throw new RestException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,11 +582,18 @@ public MessageId terminate(@PathParam("property") String property, @PathParam("c
@ApiResponse(code = 405, message = "Operation not allowed on persistent topic"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 409, message = "Compaction already running")})
public void compact(@PathParam("property") String property, @PathParam("cluster") String cluster,
public void compact(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
internalTriggerCompaction(authoritative);
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalTriggerCompaction(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,7 @@ public MessageId terminate(
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public void compact(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
Expand All @@ -981,8 +982,14 @@ public void compact(
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
internalTriggerCompaction(authoritative);
try {
validateTopicName(tenant, namespace, encodedTopic);
internalTriggerCompaction(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2234,6 +2234,49 @@ public void testTriggerCompaction() throws Exception {
verify(compactor, times(2)).compact(topicName);
}

@Test
public void testTriggerCompactionPartitionedTopic() throws Exception {
String topicName = "persistent://prop-xyz/ns1/test-part";
int numPartitions = 2;
admin.topics().createPartitionedTopic(topicName, numPartitions);

// create a partitioned topic by creating a producer
pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));

// mock actual compaction, we don't need to really run it
CompletableFuture<Long> promise = new CompletableFuture<>();
Compactor compactor = pulsar.getCompactor();
doReturn(promise).when(compactor).compact(topicName + "-partition-0");

CompletableFuture<Long> promise1 = new CompletableFuture<>();
doReturn(promise1).when(compactor).compact(topicName + "-partition-1");
admin.topics().triggerCompaction(topicName);

// verify compact called once by each partition topic
verify(compactor).compact(topicName + "-partition-0");
verify(compactor).compact(topicName + "-partition-1");
try {
admin.topics().triggerCompaction(topicName);

fail("Shouldn't be able to run while already running");
} catch (PulsarAdminException e) {
// expected
}
// compact shouldn't have been called again
verify(compactor).compact(topicName + "-partition-0");
verify(compactor).compact(topicName + "-partition-1");

// complete first compaction, and trigger again
promise.complete(1L);
promise1.complete(1L);
admin.topics().triggerCompaction(topicName);

// verify compact was called again
verify(compactor, times(2)).compact(topicName + "-partition-0");
verify(compactor, times(2)).compact(topicName + "-partition-1");
}

@Test
public void testCompactionStatus() throws Exception {
String topicName = "persistent://prop-xyz/ns1/topic1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,35 @@ public void testRevokePartitionedTopic() {
}
}

@Test
public void testTriggerCompactionTopic() {
final String partitionTopicName = "test-part";
final String nonPartitionTopicName = "test-non-part";

// trigger compaction on non-existing topic
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.compact(response, testTenant, testNamespace, "non-existing-topic", true);
ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());

// create non partitioned topic and compaction on it
response = mock(AsyncResponse.class);
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, nonPartitionTopicName, true);
persistentTopics.compact(response, testTenant, testNamespace, nonPartitionTopicName, true);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());

// create partitioned topic and compaction on it
response = mock(AsyncResponse.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 2);
persistentTopics.compact(response, testTenant, testNamespace, partitionTopicName, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
}

@Test()
public void testGetLastMessageId() throws Exception {
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,14 @@ void createSubscription(String topic, String subscriptionName, MessageId message
*/
void triggerCompaction(String topic) throws PulsarAdminException;

/**
* Trigger compaction to run for a topic asynchronously.
*
* @param topic
* The topic on which to trigger compaction
*/
CompletableFuture<Void> triggerCompactionAsync(String topic);

/**
* Check the status of an ongoing compaction for a topic.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -904,17 +904,26 @@ public void failed(Throwable throwable) {
}

@Override
public void triggerCompaction(String topic)
throws PulsarAdminException {
public void triggerCompaction(String topic) throws PulsarAdminException {
try {
TopicName tn = validateTopic(topic);
request(topicPath(tn, "compaction"))
.put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
triggerCompactionAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Void> triggerCompactionAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "compaction");
return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}

@Override
public LongRunningProcessStatus compactionStatus(String topic)
throws PulsarAdminException {
Expand Down

0 comments on commit 9634eb7

Please sign in to comment.