Skip to content

Commit

Permalink
Enable users to set subscription expiration time for each namespace (#…
Browse files Browse the repository at this point in the history
…6851)

## Motivation

We can automatically delete inactive subscriptions by setting `subscriptionExpirationTimeMinutes` in broker.conf to a value greater than 0.
```sh
# How long to delete inactive subscriptions from last consuming
# When it is 0, inactive subscriptions are not deleted automatically
subscriptionExpirationTimeMinutes=0
```

However, since this setting value applies to all topics, we have to set it to 0 if there is even one topic whose subscriptions should not be deleted.

### Modifications

Enable users to set a subscription expiration time for each namespace. This value overrides `subscriptionExpirationTimeMinutes` in broker.conf.
```sh
$ ./bin/pulsar-admin namespaces set-subscription-expiration-time --time 60 tenant1/ns1
$ ./bin/pulsar-admin namespaces get-subscription-expiration-time tenant1/ns1

60
```
  • Loading branch information
Masahiro Sakamoto authored May 1, 2020
1 parent d3d2737 commit 549994b
Show file tree
Hide file tree
Showing 12 changed files with 330 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "How long to delete inactive subscriptions from last consuming."
+ " When it is 0, inactive subscriptions are not deleted automatically"
)
private long subscriptionExpirationTimeMinutes = 0;
private int subscriptionExpirationTimeMinutes = 0;
@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
Expand All @@ -319,7 +319,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
category = CATEGORY_POLICIES,
doc = "How frequently to proactively check and purge expired subscription"
)
private long subscriptionExpiryCheckIntervalInMinutes = 5;
private int subscriptionExpiryCheckIntervalInMinutes = 5;

@FieldContext(
category = CATEGORY_POLICIES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,45 @@ protected void internalSetNamespaceMessageTTL(int messageTTL) {
}
}

protected void internalSetSubscriptionExpirationTime(int expirationTime) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();

if (expirationTime < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for subscription expiration time");
}

Entry<Policies, Stat> policiesNode = null;

try {
// Force to read the data s.t. the watch to the cache content is setup.
policiesNode = policiesCache().getWithStat(path(POLICIES, namespaceName.toString())).orElseThrow(
() -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
policiesNode.getKey().subscription_expiration_time_minutes = expirationTime;

// Write back the new policies into zookeeper
globalZk().setData(path(POLICIES, namespaceName.toString()),
jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));

log.info("[{}] Successfully updated the subscription expiration time on namespace {}", clientAppId(),
namespaceName);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update the subscription expiration time for namespace {}: does not exist",
clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
"[{}] Failed to update the subscription expiration time on namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to update the subscription expiration time on namespace {}", clientAppId(),
namespaceName, e);
throw new RestException(e);
}
}

protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, AutoTopicCreationOverride autoTopicCreationOverride) {
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
validateAdminAccessForTenant(namespaceName.getTenant());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,33 @@ public void setNamespaceMessageTTL(@PathParam("property") String property, @Path
internalSetNamespaceMessageTTL(messageTTL);
}

@GET
@Path("/{property}/{cluster}/{namespace}/subscriptionExpirationTime")
@ApiOperation(hidden = true, value = "Get the subscription expiration time for the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
public int getSubscriptionExpirationTime(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
validateAdminAccessForTenant(property);
validateNamespaceName(property, cluster, namespace);

Policies policies = getNamespacePolicies(namespaceName);
return policies.subscription_expiration_time_minutes;
}

@POST
@Path("/{property}/{cluster}/{namespace}/subscriptionExpirationTime")
@ApiOperation(hidden = true, value = "Set subscription expiration time in minutes for namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Invalid expiration time") })
public void setSubscriptionExpirationTime(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace, int expirationTime) {
validateNamespaceName(property, cluster, namespace);
internalSetSubscriptionExpirationTime(expirationTime);
}


@POST
@Path("/{property}/{cluster}/{namespace}/antiAffinity")
@ApiOperation(value = "Set anti-affinity group for a namespace")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,32 @@ public void setNamespaceMessageTTL(@PathParam("tenant") String tenant, @PathPara
internalSetNamespaceMessageTTL(messageTTL);
}

@GET
@Path("/{tenant}/{namespace}/subscriptionExpirationTime")
@ApiOperation(value = "Get the subscription expiration time for the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public int getSubscriptionExpirationTime(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateAdminAccessForTenant(tenant);
validateNamespaceName(tenant, namespace);

Policies policies = getNamespacePolicies(namespaceName);
return policies.subscription_expiration_time_minutes;
}

@POST
@Path("/{tenant}/{namespace}/subscriptionExpirationTime")
@ApiOperation(value = "Set subscription expiration time in minutes for namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Invalid expiration time") })
public void setSubscriptionExpirationTime(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace, int expirationTime) {
validateNamespaceName(tenant, namespace);
internalSetSubscriptionExpirationTime(expirationTime);
}

@POST
@Path("/{tenant}/{namespace}/deduplication")
@ApiOperation(value = "Enable or disable broker side deduplication for all topics in a namespace")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ protected void startInactivityMonitor() {
duplicationCheckerIntervalInSeconds, TimeUnit.SECONDS);

// Inactive subscriber checker
if (pulsar().getConfiguration().getSubscriptionExpirationTimeMinutes() > 0) {
if (pulsar().getConfiguration().getSubscriptionExpiryCheckIntervalInMinutes() > 0) {
long subscriptionExpiryCheckIntervalInSeconds =
TimeUnit.MINUTES.toSeconds(pulsar().getConfiguration().getSubscriptionExpiryCheckIntervalInMinutes());
inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkInactiveSubscriptions),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1694,15 +1694,33 @@ public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode delete

@Override
public void checkInactiveSubscriptions() {
final long expirationTime = TimeUnit.MINUTES.toMillis(brokerService.pulsar().getConfiguration().getSubscriptionExpirationTimeMinutes());
if (expirationTime <= 0) return;
subscriptions.forEach((subName, sub) -> {
if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected()) return;
if (System.currentTimeMillis() - sub.cursor.getLastActive() > expirationTime) {
sub.delete().thenAccept(
v -> log.info("[{}][{}] The subscription was deleted due to expiration", topic, subName));
TopicName name = TopicName.get(topic);
try {
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, name.getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
final int defaultExpirationTime = brokerService.pulsar().getConfiguration()
.getSubscriptionExpirationTimeMinutes();
final long expirationTimeMillis = TimeUnit.MINUTES
.toMillis((policies.subscription_expiration_time_minutes <= 0 && defaultExpirationTime > 0)
? defaultExpirationTime
: policies.subscription_expiration_time_minutes);
if (expirationTimeMillis > 0) {
subscriptions.forEach((subName, sub) -> {
if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected()) {
return;
}
if (System.currentTimeMillis() - sub.cursor.getLastActive() > expirationTimeMillis) {
sub.delete().thenAccept(v -> log.info("[{}][{}] The subscription was deleted due to expiration",
topic, subName));
}
});
}
});
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] Error getting policies", topic);
}
}
}

@Override
Expand Down Expand Up @@ -2115,4 +2133,4 @@ Optional<ReplicatedSubscriptionsController> getReplicatedSubscriptionController(
public CompactedTopic getCompactedTopic() {
return compactedTopic;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ public void setup() throws Exception {
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setMessageExpiryCheckIntervalInMinutes(1);
conf.setSubscriptionExpiryCheckIntervalInMinutes(1);
conf.setBrokerDeleteInactiveTopicsEnabled(false);

super.internalSetup();

Expand Down Expand Up @@ -2425,6 +2427,39 @@ public void testTopicStatsLastExpireTimestampForSubscription() throws PulsarAdmi
Assert.assertTrue(admin.topics().getStats(topic).subscriptions.values().iterator().next().lastExpireTimestamp > 0L);
}

@Test(timeOut = 150000)
public void testSubscriptionExpiry() throws Exception {
final String namespace1 = "prop-xyz/sub-gc1";
final String namespace2 = "prop-xyz/sub-gc2";
final String topic1 = "persistent://" + namespace1 + "/testSubscriptionExpiry";
final String topic2 = "persistent://" + namespace2 + "/testSubscriptionExpiry";
final String sub = "sub1";

admin.namespaces().createNamespace(namespace1, Sets.newHashSet("test"));
admin.namespaces().createNamespace(namespace2, Sets.newHashSet("test"));
admin.topics().createSubscription(topic1, sub, MessageId.latest);
admin.topics().createSubscription(topic2, sub, MessageId.latest);
admin.namespaces().setSubscriptionExpirationTime(namespace1, 0);
admin.namespaces().setSubscriptionExpirationTime(namespace2, 1);

Assert.assertEquals(admin.namespaces().getSubscriptionExpirationTime(namespace1), 0);
Assert.assertEquals(admin.namespaces().getSubscriptionExpirationTime(namespace2), 1);
Thread.sleep(60000);
for (int i = 0; i < 60; i++) {
if (admin.topics().getSubscriptions(topic2).size() == 0) {
break;
}
Thread.sleep(1000);
}
Assert.assertEquals(admin.topics().getSubscriptions(topic1).size(), 1);
Assert.assertEquals(admin.topics().getSubscriptions(topic2).size(), 0);

admin.topics().delete(topic1);
admin.topics().delete(topic2);
admin.namespaces().deleteNamespace(namespace1);
admin.namespaces().deleteNamespace(namespace2);
}

@Test
public void testCreateAndDeleteNamespaceWithBundles() throws Exception {
admin.clusters().createCluster("usw", new ClusterData());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,80 @@ CompletableFuture<Void> grantPermissionOnSubscriptionAsync(
*/
CompletableFuture<Void> setNamespaceMessageTTLAsync(String namespace, int ttlInSeconds);

/**
* Get the subscription expiration time for a namespace.
* <p/>
* Response example:
*
* <pre>
* <code>1440</code>
* </pre>
*
* @param namespace
* Namespace name
*
* @throws NotAuthorizedException
* Don't have admin permission
* @throws NotFoundException
* Namespace does not exist
* @throws PulsarAdminException
* Unexpected error
*/
int getSubscriptionExpirationTime(String namespace) throws PulsarAdminException;

/**
* Get the subscription expiration time for a namespace asynchronously.
* <p/>
* Response example:
*
* <pre>
* <code>1440</code>
* </pre>
*
* @param namespace
* Namespace name
*/
CompletableFuture<Integer> getSubscriptionExpirationTimeAsync(String namespace);

/**
* Set the subscription expiration time in minutes for all the topics within a namespace.
* <p/>
* Request example:
*
* <pre>
* <code>1440</code>
* </pre>
*
* @param namespace
* Namespace name
* @param expirationTime
* Expiration time values for all subscriptions for all topics in this namespace
*
* @throws NotAuthorizedException
* Don't have admin permission
* @throws NotFoundException
* Namespace does not exist
* @throws PulsarAdminException
* Unexpected error
*/
void setSubscriptionExpirationTime(String namespace, int expirationTime) throws PulsarAdminException;

/**
* Set the subscription expiration time in minutes for all the topics within a namespace asynchronously.
* <p/>
* Request example:
*
* <pre>
* <code>1440</code>
* </pre>
*
* @param namespace
* Namespace name
* @param expirationTime
* Expiration time values for all subscriptions for all topics in this namespace
*/
CompletableFuture<Void> setSubscriptionExpirationTimeAsync(String namespace, int expirationTime);

/**
* Set anti-affinity group name for a namespace.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,62 @@ public CompletableFuture<Void> setNamespaceMessageTTLAsync(String namespace, int
return asyncPostRequest(path, Entity.entity(ttlInSeconds, MediaType.APPLICATION_JSON));
}

@Override
public int getSubscriptionExpirationTime(String namespace) throws PulsarAdminException {
try {
return getSubscriptionExpirationTimeAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Integer> getSubscriptionExpirationTimeAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "subscriptionExpirationTime");
final CompletableFuture<Integer> future = new CompletableFuture<>();
asyncGetRequest(path, new InvocationCallback<Integer>() {
@Override
public void completed(Integer expirationTime) {
future.complete(expirationTime);
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

@Override
public void setSubscriptionExpirationTime(String namespace, int expirationTime)
throws PulsarAdminException {
try {
setSubscriptionExpirationTimeAsync(namespace, expirationTime).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Void> setSubscriptionExpirationTimeAsync(String namespace, int expirationTime) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "subscriptionExpirationTime");
return asyncPostRequest(path, Entity.entity(expirationTime, MediaType.APPLICATION_JSON));
}

@Override
public void setNamespaceAntiAffinityGroup(String namespace, String namespaceAntiAffinityGroup)
throws PulsarAdminException {
Expand Down
Loading

0 comments on commit 549994b

Please sign in to comment.