Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] Update topic policies as much as possible when some ex was thrown #21810

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
protected volatile boolean transferring = false;
private volatile List<PublishRateLimiter> activeRateLimiters;

protected Policies lastUpdatedNamespacePolicies;
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved

public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
this.brokerService = brokerService;
Expand Down Expand Up @@ -1227,7 +1229,7 @@ protected boolean isExceedMaximumMessageSize(int size, PublishContext publishCon
/**
* update topic publish dispatcher for this topic.
*/
public void updatePublishDispatcher() {
public void updatePublishRateLimiter() {
PublishRate publishRate = topicPolicies.getPublishRate().get();
if (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0) {
log.info("Enabling publish rate limiting {} on topic {}", publishRate, getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2706,7 +2706,7 @@ private void updateMaxPublishRatePerTopicInMessages() {
forEachTopic(topic -> {
if (topic instanceof AbstractTopic) {
((AbstractTopic) topic).updateBrokerPublishRate();
((AbstractTopic) topic).updatePublishDispatcher();
((AbstractTopic) topic).updatePublishRateLimiter();
}
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,12 @@ public CompletableFuture<Void> initialize() {
policies = new Policies();
} else {
policies = optPolicies.get();
lastUpdatedNamespacePolicies = policies;
updateTopicPolicyByNamespacePolicy(policies);
isEncryptionRequired = policies.encryption_required;
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
}
updatePublishDispatcher();
updatePublishRateLimiter();
updateResourceGroupLimiter(policies);
return updateClusterMigrated();
});
Expand Down Expand Up @@ -1127,6 +1128,7 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {

isEncryptionRequired = data.encryption_required;
isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
lastUpdatedNamespacePolicies = data;

List<CompletableFuture<Void>> producerCheckFutures = new ArrayList<>(producers.size());
producers.values().forEach(producer -> producerCheckFutures.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.carrotsearch.hppc.ObjectObjectHashMap;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
import java.time.Clock;
Expand Down Expand Up @@ -356,7 +357,7 @@ public CompletableFuture<Void> initialize() {
.thenAcceptAsync(optPolicies -> {
if (!optPolicies.isPresent()) {
isEncryptionRequired = false;
updatePublishDispatcher();
updatePublishRateLimiter();
updateResourceGroupLimiter(new Policies());
initializeDispatchRateLimiterIfNeeded();
updateSubscribeRateLimiter();
Expand All @@ -365,13 +366,15 @@ public CompletableFuture<Void> initialize() {

Policies policies = optPolicies.get();

lastUpdatedNamespacePolicies = policies;

this.updateTopicPolicyByNamespacePolicy(policies);

initializeDispatchRateLimiterIfNeeded();

updateSubscribeRateLimiter();

updatePublishDispatcher();
updatePublishRateLimiter();

updateResourceGroupLimiter(policies);

Expand Down Expand Up @@ -3061,39 +3064,68 @@ public CompletableFuture<Void> onPoliciesUpdate(@Nonnull Policies data) {
return CompletableFuture.completedFuture(null);
}

// Update props.
// The component "EntryFilters" is update in the method "updateTopicPolicyByNamespacePolicy(data)".
// see more detail: https://github.com/apache/pulsar/pull/19364.
updateTopicPolicyByNamespacePolicy(data);
checkReplicatedSubscriptionControllerState();
isEncryptionRequired = data.encryption_required;

isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
lastUpdatedNamespacePolicies = data;

// Update components.
return FutureUtil.waitForAll(applyUpdatedPolicies())
.thenAccept(__ -> log.info("[{}] namespace-level policies updated successfully", topic))
.exceptionally(ex -> {
log.error("[{}] update namespace polices : {} error", this.getName(), data, ex);
throw FutureUtil.wrapToCompletionException(ex);
});
}

updateDispatchRateLimiter();

updateSubscribeRateLimiter();

updatePublishDispatcher();

updateResourceGroupLimiter(data);
private List<CompletableFuture<Void>> applyUpdatedPolicies() {
List<CompletableFuture<Void>> updateComponentsFutureList = new ArrayList<>();
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved

List<CompletableFuture<Void>> producerCheckFutures = new ArrayList<>(producers.size());
producers.values().forEach(producer -> producerCheckFutures.add(
producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));

return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) -> {
return updateSubscriptionsDispatcherRateLimiter().thenCompose((___) -> {
replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter());
checkMessageExpiry();
CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies();
return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture,
preCreateSubscriptionForCompactionIfNeeded());
});
}).exceptionally(ex -> {
log.error("[{}] update namespace polices : {} error", this.getName(), data, ex);
throw FutureUtil.wrapToCompletionException(ex);
// Client permission check.
subscriptions.forEach((subName, sub) -> {
sub.getConsumers().forEach(consumer -> updateComponentsFutureList.add(consumer.checkPermissionsAsync()));
});
producers.values().forEach(producer -> updateComponentsFutureList.add(
producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
// Check message expiry.
updateComponentsFutureList.add(CompletableFuture.runAsync(
() -> checkMessageExpiry(), MoreExecutors.directExecutor()));

// Update rate limiters.
updateComponentsFutureList.add(CompletableFuture.runAsync(
() -> updateDispatchRateLimiter(), MoreExecutors.directExecutor()));
updateComponentsFutureList.add(CompletableFuture.runAsync(
() -> updateSubscribeRateLimiter(), MoreExecutors.directExecutor()));
updateComponentsFutureList.add(CompletableFuture.runAsync(
() -> updatePublishRateLimiter(), MoreExecutors.directExecutor()));
if (lastUpdatedNamespacePolicies != null) {
updateComponentsFutureList.add(CompletableFuture.runAsync(
() -> updateResourceGroupLimiter(lastUpdatedNamespacePolicies), MoreExecutors.directExecutor()));
}
updateComponentsFutureList.add(CompletableFuture.runAsync(
() -> updateSubscriptionsDispatcherRateLimiter(), MoreExecutors.directExecutor()));
updateComponentsFutureList.add(CompletableFuture.runAsync(
() -> replicators.forEach((name, replicator) -> replicator.updateRateLimiter()),
MoreExecutors.directExecutor()));
updateComponentsFutureList.add(CompletableFuture.runAsync(
() -> shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter()),
MoreExecutors.directExecutor()));

// Other components.
updateComponentsFutureList.add(CompletableFuture.runAsync(
() -> checkReplicationAndRetryOnFailure(), MoreExecutors.directExecutor()));
updateComponentsFutureList.add(CompletableFuture.runAsync(
() -> checkDeduplicationStatus(), MoreExecutors.directExecutor()));
updateComponentsFutureList.add(CompletableFuture.runAsync(
() -> checkPersistencePolicies(), MoreExecutors.directExecutor()));
updateComponentsFutureList.add(CompletableFuture.runAsync(
() -> preCreateSubscriptionForCompactionIfNeeded(), MoreExecutors.directExecutor()));

return updateComponentsFutureList;
}

/**
Expand Down Expand Up @@ -3736,40 +3768,30 @@ public void onUpdate(TopicPolicies policies) {
if (policies == null) {
return;
}
// Update props.
// The component "EntryFilters" is update in the method "updateTopicPolicy(data)".
// see more detail: https://github.com/apache/pulsar/pull/19364.
updateTopicPolicy(policies);
shadowTopics = policies.getShadowTopics();
updateDispatchRateLimiter();
checkReplicatedSubscriptionControllerState();
updateSubscriptionsDispatcherRateLimiter().thenRun(() -> {
updatePublishDispatcher();
updateSubscribeRateLimiter();
replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter());
checkMessageExpiry();
})
.thenCompose(__ -> checkReplicationAndRetryOnFailure())
.thenCompose(__ -> checkDeduplicationStatus())
.thenCompose(__ -> preCreateSubscriptionForCompactionIfNeeded())
.thenCompose(__ -> checkPersistencePolicies())
.thenAccept(__ -> log.info("[{}] Policies updated successfully", topic))
.exceptionally(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
log.error("[{}] update topic policy error: {}", topic, t.getMessage(), t);
return null;
});

// Update components.
FutureUtil.waitForAll(applyUpdatedPolicies())
.thenAccept(__ -> log.info("[{}] topic-level policies updated successfully", topic))
.exceptionally(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
log.error("[{}] update topic-level policy error: {}", topic, t.getMessage(), t);
return null;
});
}

private CompletableFuture<Void> updateSubscriptionsDispatcherRateLimiter() {
List<CompletableFuture<Void>> subscriptionCheckFutures = new ArrayList<>((int) subscriptions.size());
subscriptions.forEach((subName, sub) -> {
List<CompletableFuture<Void>> consumerCheckFutures = new ArrayList<>(sub.getConsumers().size());
sub.getConsumers().forEach(consumer -> consumerCheckFutures.add(consumer.checkPermissionsAsync()));
subscriptionCheckFutures.add(FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> {
Dispatcher dispatcher = sub.getDispatcher();
if (dispatcher != null) {
dispatcher.updateRateLimiter();
}
}));
Dispatcher dispatcher = sub.getDispatcher();
if (dispatcher != null) {
dispatcher.updateRateLimiter();
}
});
return FutureUtil.waitForAll(subscriptionCheckFutures);
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.NamespaceService;
Expand All @@ -50,6 +52,7 @@
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -83,8 +86,11 @@
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -3157,4 +3163,49 @@ public void testProduceChangesWithEncryptionRequired() throws Exception {
});
}

@Test
public void testUpdateRetentionWithPartialFailure() throws Exception {
String tpName = BrokerTestUtil.newUniqueName("persistent://" + myNamespace + "/tp");
admin.topics().createNonPartitionedTopic(tpName);

// Load topic up.
admin.topics().getInternalStats(tpName);

// Inject an error that makes dispatch rate update fail.
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(tpName, false).join().get();
ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions =
WhiteboxImpl.getInternalState(persistentTopic, "subscriptions");
PersistentSubscription mockedSubscription = Mockito.mock(PersistentSubscription.class);
Mockito.when(mockedSubscription.getDispatcher()).thenThrow(new RuntimeException("Mocked error: getDispatcher"));
subscriptions.put("mockedSubscription", mockedSubscription);

// Update namespace-level retention policies.
RetentionPolicies retentionPolicies1 = new RetentionPolicies(1, 1);
admin.namespaces().setRetentionAsync(myNamespace, retentionPolicies1);

// Verify: update retention will be success even if other component update throws exception.
Awaitility.await().untilAsserted(() -> {
ManagedLedgerImpl ML = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
assertEquals(ML.getConfig().getRetentionSizeInMB(), 1);
assertEquals(ML.getConfig().getRetentionTimeMillis(), 1 * 60 * 1000);
});

// Update topic-level retention policies.
RetentionPolicies retentionPolicies2 = new RetentionPolicies(2, 2);
admin.topics().setRetentionAsync(tpName, retentionPolicies2);

// Verify: update retention will be success even if other component update throws exception.
Awaitility.await().untilAsserted(() -> {
ManagedLedgerImpl ML = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
assertEquals(ML.getConfig().getRetentionSizeInMB(), 2);
assertEquals(ML.getConfig().getRetentionTimeMillis(), 2 * 60 * 1000);
});

// Cleanup.
subscriptions.clear();
admin.namespaces().removeRetention(myNamespace);
admin.topics().delete(tpName, false);
}

}
Loading