Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] Retry re-validating ResourceLock with backoff after errors #22617

Merged
merged 4 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.StringInterner;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataEvent;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.client.util.RetryUtil;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.jetbrains.annotations.NotNull;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.compaction.CompactedTopicUtils;
import org.apache.pulsar.compaction.Compactor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
Expand All @@ -65,6 +64,7 @@
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RecoverTimeRecord;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.policies.data.loadbalancer.BundleData;
import org.apache.pulsar.stats.CacheMetricsCollector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import lombok.Cleanup;
import org.apache.pulsar.client.util.RetryUtil;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.annotations.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.HandlerState.State;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Backoff;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
import org.apache.pulsar.common.util.ExceptionHandler;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RelativeTimeUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.pulsar.common.api.proto.Subscription;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.util.Backoff;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.ScheduledExecutorProvider;
import org.apache.pulsar.common.util.Backoff;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
package org.apache.pulsar.common.util;

import com.google.common.annotations.VisibleForTesting;
import java.time.Clock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
package org.apache.pulsar.common.util;

import java.time.Clock;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,8 +32,11 @@ public class BackoffBuilder {

public BackoffBuilder() {
this.initial = 0;
this.unitInitial = TimeUnit.MILLISECONDS;
this.max = 0;
this.unitMax = TimeUnit.MILLISECONDS;
this.mandatoryStop = 0;
this.unitMandatoryStop = TimeUnit.MILLISECONDS;
this.clock = Clock.systemDefaultZone();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
package org.apache.pulsar.common.util;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.util.FutureUtil;
Expand All @@ -52,21 +52,21 @@ class LockManagerImpl<T> implements LockManager<T> {
private final MetadataCache<T> cache;
private final MetadataSerde<T> serde;
private final FutureUtil.Sequencer<Void> sequencer;
private final ExecutorService executor;
private final ScheduledExecutorService executor;

private enum State {
Ready, Closed
}

private State state = State.Ready;

LockManagerImpl(MetadataStoreExtended store, Class<T> clazz, ExecutorService executor) {
LockManagerImpl(MetadataStoreExtended store, Class<T> clazz, ScheduledExecutorService executor) {
this(store, new JSONMetadataSerdeSimpleType<>(
TypeFactory.defaultInstance().constructSimpleType(clazz, null)),
executor);
}

LockManagerImpl(MetadataStoreExtended store, MetadataSerde<T> serde, ExecutorService executor) {
LockManagerImpl(MetadataStoreExtended store, MetadataSerde<T> serde, ScheduledExecutorService executor) {
this.store = store;
this.cache = store.getMetadataCache(serde);
this.serde = serde;
Expand All @@ -83,7 +83,7 @@ public CompletableFuture<Optional<T>> readLock(String path) {

@Override
public CompletableFuture<ResourceLock<T>> acquireLock(String path, T value) {
ResourceLockImpl<T> lock = new ResourceLockImpl<>(store, serde, path);
ResourceLockImpl<T> lock = new ResourceLockImpl<>(store, serde, path, executor);

CompletableFuture<ResourceLock<T>> result = new CompletableFuture<>();
lock.acquire(value).thenRun(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@
import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataSerde;
Expand All @@ -44,7 +49,10 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
private long version;
private final CompletableFuture<Void> expiredFuture;
private boolean revalidateAfterReconnection = false;
private final Backoff backoff;
private final FutureUtil.Sequencer<Void> sequencer;
private final ScheduledExecutorService executor;
private ScheduledFuture<?> revalidateTask;

private enum State {
Init,
Expand All @@ -55,14 +63,20 @@ private enum State {

private State state;

public ResourceLockImpl(MetadataStoreExtended store, MetadataSerde<T> serde, String path) {
ResourceLockImpl(MetadataStoreExtended store, MetadataSerde<T> serde, String path,
ScheduledExecutorService executor) {
this.store = store;
this.serde = serde;
this.path = path;
this.version = -1;
this.expiredFuture = new CompletableFuture<>();
this.sequencer = FutureUtil.Sequencer.create();
this.state = State.Init;
this.executor = executor;
this.backoff = new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
.setMax(60, TimeUnit.SECONDS)
.create();
}

@Override
Expand Down Expand Up @@ -93,6 +107,10 @@ public synchronized CompletableFuture<Void> release() {
}

state = State.Releasing;
if (revalidateTask != null) {
revalidateTask.cancel(true);
}

CompletableFuture<Void> result = new CompletableFuture<>();

store.delete(path, Optional.of(version))
Expand Down Expand Up @@ -210,8 +228,15 @@ synchronized CompletableFuture<Void> revalidateIfNeededAfterReconnection() {
* This method is thread-safe and it will perform multiple re-validation operations in turn.
*/
synchronized CompletableFuture<Void> silentRevalidateOnce() {
if (state != State.Valid) {
return CompletableFuture.completedFuture(null);
}

return sequencer.sequential(() -> revalidate(value))
.thenRun(() -> log.info("Successfully revalidated the lock on {}", path))
.thenRun(() -> {
log.info("Successfully revalidated the lock on {}", path);
backoff.reset();
})
.exceptionally(ex -> {
synchronized (ResourceLockImpl.this) {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
Expand All @@ -225,8 +250,12 @@ synchronized CompletableFuture<Void> silentRevalidateOnce() {
// Continue assuming we hold the lock, until we can revalidate it, either
// on Reconnected or SessionReestablished events.
revalidateAfterReconnection = true;
log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection {}", path,
realCause.getMessage());

long delayMillis = backoff.next();
log.warn("Failed to revalidate the lock at {}: {} - Retrying in {} seconds", path,
realCause.getMessage(), delayMillis / 1000.0);
revalidateTask =
executor.schedule(this::silentRevalidateOnce, delayMillis, TimeUnit.MILLISECONDS);
}
}
return null;
Expand Down
Loading
Loading