Skip to content

Commit

Permalink
[fix][broker] The topic might reference a closed ledger (apache#22860)
Browse files Browse the repository at this point in the history
(cherry picked from commit a91a172)
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
liangyepianzhou authored and nodece committed Sep 4, 2024
1 parent 69d5fae commit 9ed3e28
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1757,4 +1757,9 @@ public static boolean isTransactionInternalName(TopicName topicName) {
protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
return new BrokerService(pulsar, ioEventLoopGroup);
}

@VisibleForTesting
public void setTransactionExecutorProvider(TransactionBufferProvider transactionBufferProvider) {
this.transactionBufferProvider = transactionBufferProvider;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -971,65 +971,97 @@ public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean c

public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean createIfMissing,
Map<String, String> properties) {
return getTopic(TopicName.get(topic), createIfMissing, properties);
}

/**
* Retrieves or creates a topic based on the specified parameters.
* 0. If disable PersistentTopics or NonPersistentTopics, it will return a failed future with NotAllowedException.
* 1. If topic future exists in the cache returned directly regardless of whether it fails or timeout.
* 2. If the topic metadata exists, the topic is created regardless of {@code createIfMissing}.
* 3. If the topic metadata not exists, and {@code createIfMissing} is false,
* returns an empty Optional in a CompletableFuture. And this empty future not be added to the map.
* 4. Otherwise, use computeIfAbsent. It returns the existing topic or creates and adds a new topicFuture.
* Any exceptions will remove the topicFuture from the map.
*
* @param topicName The name of the topic, potentially including partition information.
* @param createIfMissing If true, creates the topic if it does not exist.
* @param properties Topic configuration properties used during creation.
* @return CompletableFuture with an Optional of the topic if found or created, otherwise empty.
*/
public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, boolean createIfMissing,
Map<String, String> properties) {
try {
CompletableFuture<Optional<Topic>> topicFuture = topics.get(topic);
if (topicFuture != null) {
if (topicFuture.isCompletedExceptionally()
|| (topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent())) {
// Exceptional topics should be recreated.
topics.remove(topic, topicFuture);
} else {
// a non-existing topic in the cache shouldn't prevent creating a topic
if (createIfMissing) {
if (topicFuture.isDone() && topicFuture.getNow(Optional.empty()).isPresent()) {
return topicFuture;
} else {
return topicFuture.thenCompose(value -> {
if (!value.isPresent()) {
// retry and create topic
return getTopic(topic, createIfMissing, properties);
} else {
// in-progress future completed successfully
return CompletableFuture.completedFuture(value);
}
});
}
} else {
return topicFuture;
}
}
// If topic future exists in the cache returned directly regardless of whether it fails or timeout.
CompletableFuture<Optional<Topic>> tp = topics.get(topicName.toString());
if (tp != null) {
return tp;
}
final TopicName topicName = TopicName.get(topic);
final boolean isPersistentTopic = topicName.getDomain().equals(TopicDomain.persistent);
if (isPersistentTopic) {
final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture =
getTopicPoliciesBypassSystemTopic(topicName);
return topicPoliciesFuture.exceptionally(ex -> {
final Throwable rc = FutureUtil.unwrapCompletionException(ex);
final String errorInfo = String.format("Topic creation encountered an exception by initialize"
+ " topic policies service. topic_name=%s error_message=%s", topicName, rc.getMessage());
log.error(errorInfo, rc);
throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo));
}).thenCompose(optionalTopicPolicies -> {
final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null);
return topics.computeIfAbsent(topicName.toString(), (tpName) -> {
return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies);
}).thenCompose(optionalTopic -> {
if (!optionalTopic.isPresent() && createIfMissing) {
log.warn("[{}] Try to recreate the topic with createIfMissing=true "
+ "but the returned topic is empty", topicName);
return getTopic(topic, createIfMissing, properties);
if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load persistent topic {}", topicName);
}
return FutureUtil.failedFuture(new NotAllowedException(
"Broker is unable to load persistent topic"));
}
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName)
.thenCompose(exists -> {
if (!exists && !createIfMissing) {
return CompletableFuture.completedFuture(Optional.empty());
}
return getTopicPoliciesBypassSystemTopic(topicName).exceptionally(ex -> {
final Throwable rc = FutureUtil.unwrapCompletionException(ex);
final String errorInfo = String.format("Topic creation encountered an exception by initialize"
+ " topic policies service. topic_name=%s error_message=%s", topicName,
rc.getMessage());
log.error(errorInfo, rc);
throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo));
}).thenCompose(optionalTopicPolicies -> {
final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null);
if (topicName.isPartitioned()) {
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
return fetchPartitionedTopicMetadataAsync(topicNameEntity)
.thenCompose((metadata) -> {
// Allow crate non-partitioned persistent topic that name includes
// `partition`
if (metadata.partitions == 0
|| topicName.getPartitionIndex() < metadata.partitions) {
return topics.computeIfAbsent(topicName.toString(), (tpName) ->
loadOrCreatePersistentTopic(tpName,
createIfMissing, properties, topicPolicies));
} else {
final String errorMsg =
String.format("Illegal topic partition name %s with max allowed "
+ "%d partitions", topicName, metadata.partitions);
log.warn(errorMsg);
return FutureUtil.failedFuture(
new BrokerServiceException.NotAllowedException(errorMsg));
}
});
} else {
return topics.computeIfAbsent(topicName.toString(), (tpName) ->
loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies));
}
return CompletableFuture.completedFuture(optionalTopic);
});
});
} else {
return topics.computeIfAbsent(topic, (name) -> {
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load non-persistent topic {}", topicName);
}
return FutureUtil.failedFuture(new NotAllowedException(
"Broker is unable to load persistent topic"));
}
if (!topics.containsKey(topicName.toString())) {
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.BEFORE);
if (topicName.isPartitioned()) {
final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> {
if (topicName.getPartitionIndex() < metadata.partitions) {
}
if (topicName.isPartitioned()) {
final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> {
if (topicName.getPartitionIndex() < metadata.partitions) {
return topics.computeIfAbsent(topicName.toString(), (name) -> {
topicEventsDispatcher
.notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE);

Expand All @@ -1040,11 +1072,13 @@ public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean c
topicEventsDispatcher
.notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD);
return res;
}
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE);
return CompletableFuture.completedFuture(Optional.empty());
});
} else if (createIfMissing) {
});
}
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE);
return CompletableFuture.completedFuture(Optional.empty());
});
} else if (createIfMissing) {
return topics.computeIfAbsent(topicName.toString(), (name) -> {
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE);

CompletableFuture<Optional<Topic>> res = createNonPersistentTopic(name);
Expand All @@ -1054,21 +1088,25 @@ public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean c
topicEventsDispatcher
.notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD);
return res;
} else {
});
} else {
CompletableFuture<Optional<Topic>> topicFuture = topics.get(topicName.toString());
if (topicFuture == null) {
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE);
return CompletableFuture.completedFuture(Optional.empty());
topicFuture = CompletableFuture.completedFuture(Optional.empty());
}
});
return topicFuture;
}
}
} catch (IllegalArgumentException e) {
log.warn("[{}] Illegalargument exception when loading topic", topic, e);
log.warn("[{}] Illegalargument exception when loading topic", topicName, e);
return FutureUtil.failedFuture(e);
} catch (RuntimeException e) {
Throwable cause = e.getCause();
if (cause instanceof ServiceUnitNotReadyException) {
log.warn("[{}] Service unit is not ready when loading the topic", topic);
log.warn("[{}] Service unit is not ready when loading the topic", topicName);
} else {
log.warn("[{}] Unexpected exception when loading topic: {}", topic, e.getMessage(), e);
log.warn("[{}] Unexpected exception when loading topic: {}", topicName, e.getMessage(), e);
}

return FutureUtil.failedFuture(cause);
Expand Down Expand Up @@ -1226,16 +1264,10 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
topicFuture.exceptionally(t -> {
pulsarStats.recordTopicLoadFailed();
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
return null;
});
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load non-persistent topic {}", topic);
}
topicFuture.completeExceptionally(
new NotAllowedException("Broker is not unable to load non-persistent topic"));
return topicFuture;
}

final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);
CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
Expand Down Expand Up @@ -1500,6 +1532,7 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
}
}
}).exceptionally(ex -> {
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(ex.getCause());
return null;
});
Expand Down Expand Up @@ -1623,6 +1656,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ " topic", topic, FutureUtil.getException(topicFuture));
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, ex) -> {
topics.remove(topic, topicFuture);
if (ex != null) {
log.warn("[{}] Get an error when closing topic.",
topic, ex);
Expand All @@ -1639,6 +1673,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ " Removing topic from topics list {}, {}", topic, ex);
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, closeEx) -> {
topics.remove(topic, topicFuture);
if (closeEx != null) {
log.warn("[{}] Get an error when closing topic.",
topic, closeEx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
this.transactionBuffer = brokerService.getPulsar()
.getTransactionBufferProvider().newTransactionBuffer(this);
} else {
this.transactionBuffer = new TransactionBufferDisable();
this.transactionBuffer = new TransactionBufferDisable(this);
}
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry());
}
Expand Down Expand Up @@ -376,7 +376,7 @@ public CompletableFuture<Void> initialize() {
this.transactionBuffer = brokerService.getPulsar()
.getTransactionBufferProvider().newTransactionBuffer(this);
} else {
this.transactionBuffer = new TransactionBufferDisable();
this.transactionBuffer = new TransactionBufferDisable(this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
Expand All @@ -39,6 +40,13 @@
@Slf4j
public class TransactionBufferDisable implements TransactionBuffer {

private final Topic topic;

// Introduced by #21466
public TransactionBufferDisable(Topic topic) {
this.topic = topic;
}

@Override
public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
return CompletableFuture.completedFuture(null);
Expand Down
Loading

0 comments on commit 9ed3e28

Please sign in to comment.