Skip to content

Commit

Permalink
[fix][broker] Handle BucketDelayedDeliveryTracker recover failed (#22735
Browse files Browse the repository at this point in the history
)
  • Loading branch information
dao-jun authored Jul 24, 2024
1 parent c9c5bb4 commit 1c53841
Show file tree
Hide file tree
Showing 9 changed files with 420 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.delayed;

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
Expand All @@ -33,10 +34,15 @@
import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage;
import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
import org.apache.pulsar.broker.delayed.bucket.RecoverDelayedDeliveryTrackerException;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BucketDelayedDeliveryTrackerFactory implements DelayedDeliveryTrackerFactory {
private static final Logger log = LoggerFactory.getLogger(BucketDelayedDeliveryTrackerFactory.class);

BucketSnapshotStorage bucketSnapshotStorage;

Expand Down Expand Up @@ -73,8 +79,28 @@ public void initialize(PulsarService pulsarService) throws Exception {

@Override
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
return new BucketDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict,
bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
String topicName = dispatcher.getTopic().getName();
String subscriptionName = dispatcher.getSubscription().getName();
BrokerService brokerService = dispatcher.getTopic().getBrokerService();
DelayedDeliveryTracker tracker;

try {
tracker = newTracker0(dispatcher);
} catch (RecoverDelayedDeliveryTrackerException ex) {
log.warn("Failed to recover BucketDelayedDeliveryTracker, fallback to InMemoryDelayedDeliveryTracker."
+ " topic {}, subscription {}", topicName, subscriptionName, ex);
// If failed to create BucketDelayedDeliveryTracker, fallback to InMemoryDelayedDeliveryTracker
brokerService.initializeFallbackDelayedDeliveryTrackerFactory();
tracker = brokerService.getFallbackDelayedDeliveryTrackerFactory().newTracker(dispatcher);
}
return tracker;
}

@VisibleForTesting
BucketDelayedDeliveryTracker newTracker0(PersistentDispatcherMultipleConsumers dispatcher)
throws RecoverDelayedDeliveryTrackerException {
return new BucketDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
TimeUnit.SECONDS.toMillis(delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds),
delayedDeliveryMaxIndexesPerBucketSnapshotSegment, delayedDeliveryMaxNumBuckets);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,51 @@ public interface DelayedDeliveryTracker extends AutoCloseable {
* Close the subscription tracker and release all resources.
*/
void close();

DelayedDeliveryTracker DISABLE = new DelayedDeliveryTracker() {
@Override
public boolean addMessage(long ledgerId, long entryId, long deliveryAt) {
return false;
}

@Override
public boolean hasMessageAvailable() {
return false;
}

@Override
public long getNumberOfDelayedMessages() {
return 0;
}

@Override
public long getBufferMemoryUsage() {
return 0;
}

@Override
public NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) {
return null;
}

@Override
public boolean shouldPauseAllDeliveries() {
return false;
}

@Override
public void resetTickTime(long tickTime) {

}

@Override
public CompletableFuture<Void> clear() {
return null;
}

@Override
public void close() {

}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
*/
package org.apache.pulsar.broker.delayed;

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryDelayedDeliveryTrackerFactory implements DelayedDeliveryTrackerFactory {
private static final Logger log = LoggerFactory.getLogger(InMemoryDelayedDeliveryTrackerFactory.class);

private Timer timer;

Expand All @@ -48,6 +52,21 @@ public void initialize(PulsarService pulsarService) {

@Override
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
String topicName = dispatcher.getTopic().getName();
String subscriptionName = dispatcher.getSubscription().getName();
DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE;
try {
tracker = newTracker0(dispatcher);
} catch (Exception e) {
// it should never go here
log.warn("Failed to create InMemoryDelayedDeliveryTracker, topic {}, subscription {}",
topicName, subscriptionName, e);
}
return tracker;
}

@VisibleForTesting
InMemoryDelayedDeliveryTracker newTracker0(PersistentDispatcherMultipleConsumers dispatcher) {
return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,22 +106,24 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
private CompletableFuture<Void> pendingLoad = null;

public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) {
Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets)
throws RecoverDelayedDeliveryTrackerException {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
bucketSnapshotStorage, minIndexCountPerBucket, timeStepPerBucketSnapshotSegmentInMillis,
maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
}

public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) {
Timer timer, long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets)
throws RecoverDelayedDeliveryTrackerException {
super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict);
this.minIndexCountPerBucket = minIndexCountPerBucket;
this.timeStepPerBucketSnapshotSegmentInMillis = timeStepPerBucketSnapshotSegmentInMillis;
Expand All @@ -134,10 +136,17 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat
new MutableBucket(dispatcher.getName(), dispatcher.getCursor(), FutureUtil.Sequencer.create(),
bucketSnapshotStorage);
this.stats = new BucketDelayedMessageIndexStats();
this.numberDelayedMessages = recoverBucketSnapshot();

// Close the tracker if failed to recover.
try {
this.numberDelayedMessages = recoverBucketSnapshot();
} catch (RecoverDelayedDeliveryTrackerException e) {
close();
throw e;
}
}

private synchronized long recoverBucketSnapshot() throws RuntimeException {
private synchronized long recoverBucketSnapshot() throws RecoverDelayedDeliveryTrackerException {
ManagedCursor cursor = this.lastMutableBucket.getCursor();
Map<String, String> cursorProperties = cursor.getCursorProperties();
if (MapUtils.isEmpty(cursorProperties)) {
Expand Down Expand Up @@ -182,7 +191,7 @@ private synchronized long recoverBucketSnapshot() throws RuntimeException {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new RuntimeException(e);
throw new RecoverDelayedDeliveryTrackerException(e);
}

for (Map.Entry<Range<Long>, CompletableFuture<List<DelayedIndex>>> entry : futures.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.delayed.bucket;

public class RecoverDelayedDeliveryTrackerException extends Exception {
public RecoverDelayedDeliveryTrackerException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import org.apache.pulsar.broker.cache.BundlesQuotas;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader;
import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
import org.apache.pulsar.broker.loadbalance.LoadManager;
Expand Down Expand Up @@ -296,10 +297,11 @@ public class BrokerService implements Closeable {
private final AtomicBoolean blockedDispatcherOnHighUnackedMsgs = new AtomicBoolean(false);
private final ConcurrentOpenHashSet<PersistentDispatcherMultipleConsumers> blockedDispatchers;
private final ReadWriteLock lock = new ReentrantReadWriteLock();

@Getter
@VisibleForTesting
private final DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory;
// InMemoryDelayedDeliveryTrackerFactory is for the purpose of
// fallback if recover BucketDelayedDeliveryTracker failed.
private volatile DelayedDeliveryTrackerFactory fallbackDelayedDeliveryTrackerFactory;
private final ServerBootstrap defaultServerBootstrap;
private final List<EventLoopGroup> protocolHandlersWorkerGroups = new ArrayList<>();

Expand Down Expand Up @@ -865,6 +867,9 @@ public CompletableFuture<Void> closeAsync() {
pendingLookupOperationsCounter.close();
try {
delayedDeliveryTrackerFactory.close();
if (fallbackDelayedDeliveryTrackerFactory != null) {
fallbackDelayedDeliveryTrackerFactory.close();
}
} catch (Exception e) {
log.warn("Error in closing delayedDeliveryTrackerFactory", e);
}
Expand Down Expand Up @@ -3418,6 +3423,25 @@ public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleC
}
}

/**
* Initializes the in-memory delayed delivery tracker factory when
* BucketDelayedDeliveryTrackerFactory.newTracker failed.
*/
public synchronized void initializeFallbackDelayedDeliveryTrackerFactory() {
if (fallbackDelayedDeliveryTrackerFactory != null) {
return;
}

DelayedDeliveryTrackerFactory factory = new InMemoryDelayedDeliveryTrackerFactory();
try {
factory.initialize(pulsar);
this.fallbackDelayedDeliveryTrackerFactory = factory;
} catch (Exception e) {
// it should never go here
log.error("Failed to initialize InMemoryDelayedDeliveryTrackerFactory", e);
}
}

private static class ConfigField {
// field holds the pulsar dynamic configuration.
final Field field;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1165,15 +1165,15 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata
}

synchronized (this) {
if (!delayedDeliveryTracker.isPresent()) {
if (delayedDeliveryTracker.isEmpty()) {
if (!msgMetadata.hasDeliverAtTime()) {
// No need to initialize the tracker here
return false;
}

// Initialize the tracker the first time we need to use it
delayedDeliveryTracker = Optional
.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
delayedDeliveryTracker = Optional.of(
topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
}

delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
Expand Down Expand Up @@ -1327,5 +1327,10 @@ protected int getStickyKeyHash(Entry entry) {
return StickyKeyConsumerSelector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer()));
}


public Subscription getSubscription() {
return subscription;
}

private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
}
Loading

0 comments on commit 1c53841

Please sign in to comment.