Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] The topic might reference a closed ledger #22860

Merged
merged 4 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1920,6 +1920,11 @@ protected BrokerService newBrokerService(PulsarService pulsar) throws Exception
return new BrokerService(pulsar, ioEventLoopGroup);
}

@VisibleForTesting
public void setTransactionExecutorProvider(TransactionBufferProvider transactionBufferProvider) {
this.transactionBufferProvider = transactionBufferProvider;
}

private CompactionServiceFactory loadCompactionServiceFactory() {
String compactionServiceFactoryClassName = config.getCompactionServiceFactoryClassName();
var compactionServiceFactory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,35 +1001,27 @@ public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean c
return getTopic(TopicName.get(topic), createIfMissing, properties);
}

/**
* Retrieves or creates a topic based on the specified parameters.
* 0. If topic future exists in the cache returned directly regardless of whether it fails or timeout.
* 1. If the topic metadata exists, the topic is created regardless of {@code createIfMissing}.
* 2. If the topic metadata not exists, and {@code createIfMissing} is false,
* returns an empty Optional in a CompletableFuture.
* 3. Otherwise, use computeIfAbsent. It returns the existing topic or creates and adds a new topicFuture.
* Any exceptions will remove the topicFuture from the map.
*
* @param topicName The name of the topic, potentially including partition information.
* @param createIfMissing If true, creates the topic if it does not exist.
* @param properties Topic configuration properties used during creation.
* @return CompletableFuture with an Optional of the topic if found or created, otherwise empty.
*/
public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, boolean createIfMissing,
Map<String, String> properties) {
try {
CompletableFuture<Optional<Topic>> topicFuture = topics.get(topicName.toString());
if (topicFuture != null) {
if (topicFuture.isCompletedExceptionally()
|| (topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent())) {
// Exceptional topics should be recreated.
topics.remove(topicName.toString(), topicFuture);
} else {
// a non-existing topic in the cache shouldn't prevent creating a topic
if (createIfMissing) {
if (topicFuture.isDone() && topicFuture.getNow(Optional.empty()).isPresent()) {
return topicFuture;
} else {
return topicFuture.thenCompose(value -> {
if (!value.isPresent()) {
// retry and create topic
return getTopic(topicName, createIfMissing, properties);
} else {
// in-progress future completed successfully
return CompletableFuture.completedFuture(value);
}
});
}
} else {
return topicFuture;
}
}
// If topic future exists in the cache returned directly regardless of whether it fails or timeout.
CompletableFuture<Optional<Topic>> tp = topics.get(topicName.toString());
if (tp != null) {
return tp;
}
final boolean isPersistentTopic = topicName.getDomain().equals(TopicDomain.persistent);
if (isPersistentTopic) {
Expand Down Expand Up @@ -1062,6 +1054,7 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
final String errorMsg =
String.format("Illegal topic partition name %s with max allowed "
+ "%d partitions", topicName, metadata.partitions);
pulsar.getExecutor().execute(() -> topics.remove(topicName.toString()));
log.warn(errorMsg);
return FutureUtil.failedFuture(
new BrokerServiceException.NotAllowedException(errorMsg));
Expand All @@ -1072,19 +1065,23 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
if (!optionalTopic.isPresent() && createIfMissing) {
shibd marked this conversation as resolved.
Show resolved Hide resolved
log.warn("[{}] Try to recreate the topic with createIfMissing=true "
+ "but the returned topic is empty", topicName);
// Before retry create topic, need remove it from topics.
topics.remove(topicName.toString());
return getTopic(topicName, createIfMissing, properties);
}
return CompletableFuture.completedFuture(optionalTopic);
});
});
});
} else {
return topics.computeIfAbsent(topicName.toString(), (name) -> {
if (!topics.containsKey(topicName.toString())) {
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.BEFORE);
if (topicName.isPartitioned()) {
final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> {
if (topicName.getPartitionIndex() < metadata.partitions) {
}
if (topicName.isPartitioned()) {
final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> {
if (topicName.getPartitionIndex() < metadata.partitions) {
return topics.computeIfAbsent(topicName.toString(), (name) -> {
topicEventsDispatcher
.notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE);

Expand All @@ -1095,11 +1092,13 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
topicEventsDispatcher
.notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD);
return res;
}
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE);
return CompletableFuture.completedFuture(Optional.empty());
});
} else if (createIfMissing) {
});
}
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE);
return CompletableFuture.completedFuture(Optional.empty());
});
} else if (createIfMissing) {
return topics.computeIfAbsent(topicName.toString(), (name) -> {
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE);

CompletableFuture<Optional<Topic>> res = createNonPersistentTopic(name);
Expand All @@ -1109,11 +1108,15 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
topicEventsDispatcher
.notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD);
return res;
} else {
});
} else {
CompletableFuture<Optional<Topic>> topicFuture = topics.get(topicName.toString());
if (topicFuture == null) {
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE);
return CompletableFuture.completedFuture(Optional.empty());
topicFuture = CompletableFuture.completedFuture(Optional.empty());
}
});
return topicFuture;
}
}
} catch (IllegalArgumentException e) {
log.warn("[{}] Illegalargument exception when loading topic", topicName, e);
Expand Down Expand Up @@ -1258,6 +1261,7 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load non-persistent topic {}", topic);
}
pulsar.getExecutor().execute(() -> topics.remove(topic));
return FutureUtil.failedFuture(
new NotAllowedException("Broker is not unable to load non-persistent topic"));
}
Expand All @@ -1267,6 +1271,7 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class);
} catch (Throwable e) {
log.warn("Failed to create topic {}", topic, e);
pulsar.getExecutor().execute(() -> topics.remove(topic));
topicFuture.completeExceptionally(e);
return topicFuture;
}
Expand All @@ -1283,7 +1288,7 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
}).exceptionally(ex -> {
log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause());
nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
pulsar.getExecutor().execute(() -> topics.remove(topic));
shibd marked this conversation as resolved.
Show resolved Hide resolved
topicFuture.completeExceptionally(ex);
});
return null;
Expand All @@ -1299,7 +1304,7 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
topicFuture.complete(Optional.of(nonPersistentTopic));
// after get metadata return success, we should delete this topic from this broker, because this topic not
// owner by this broker and it don't initialize and checkReplication
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
pulsar.getExecutor().execute(() -> topics.remove(topic));
return null;
});

Expand Down Expand Up @@ -1538,6 +1543,7 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load persistent topic {}", topic);
}
pulsar.getExecutor().execute(() -> topics.remove(topic));
topicFuture.completeExceptionally(new NotAllowedException(
"Broker is unable to load persistent topic"));
return topicFuture;
Expand All @@ -1556,6 +1562,7 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
// do not recreate topic if topic is already migrated and deleted by broker
// so, avoid creating a new topic if migration is already started
if (ex != null && (ex.getCause() instanceof TopicMigratedException)) {
pulsar.getExecutor().execute(() -> topics.remove(topic));
topicFuture.completeExceptionally(ex.getCause());
return null;
}
Expand All @@ -1570,6 +1577,7 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
}
}
}).exceptionally(ex -> {
pulsar.getExecutor().execute(() -> topics.remove(topic));
shibd marked this conversation as resolved.
Show resolved Hide resolved
topicFuture.completeExceptionally(ex.getCause());
return null;
});
Expand Down Expand Up @@ -1623,15 +1631,15 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean
finalProperties, topicPolicies)
).exceptionally(throwable -> {
log.warn("[{}] Read topic property failed", topic, throwable);
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
pulsar.getExecutor().execute(() -> topics.remove(topic));
topicFuture.completeExceptionally(throwable);
return null;
});
} else {
// namespace is being unloaded
String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic);
log.warn(msg);
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
pulsar.getExecutor().execute(() -> topics.remove(topic));
topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
}
}).exceptionally(ex -> {
Expand Down Expand Up @@ -1662,7 +1670,7 @@ private void createPersistentTopic(final String topic, boolean createIfMissing,
if (isTransactionInternalName(topicName)) {
String msg = String.format("Can not create transaction system topic %s", topic);
log.warn(msg);
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
pulsar.getExecutor().execute(() -> topics.remove(topic));
topicFuture.completeExceptionally(new NotAllowedException(msg));
return;
}
Expand Down Expand Up @@ -1744,6 +1752,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ " topic", topic, FutureUtil.getException(topicFuture));
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, ex) -> {
topics.remove(topic);
if (ex != null) {
log.warn("[{}] Get an error when closing topic.",
topic, ex);
Expand All @@ -1760,6 +1769,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ " Removing topic from topics list {}, {}", topic, ex);
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, closeEx) -> {
topics.remove(topic);
if (closeEx != null) {
log.warn("[{}] Get an error when closing topic.",
topic, closeEx);
Expand All @@ -1771,7 +1781,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
});
} catch (PulsarServerException e) {
log.warn("Failed to create topic {}: {}", topic, e.getMessage());
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
pulsar.getExecutor().execute(() -> topics.remove(topic));
topicFuture.completeExceptionally(e);
}
}
Expand All @@ -1784,7 +1794,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
topicFuture.complete(Optional.empty());
} else {
log.warn("Failed to create topic {}", topic, exception);
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
pulsar.getExecutor().execute(() -> topics.remove(topic));
topicFuture.completeExceptionally(new PersistenceException(exception));
}
}
Expand All @@ -1794,7 +1804,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Failed to get topic configuration: {}", topic, exception.getMessage(), exception);
// remove topic from topics-map in different thread to avoid possible deadlock if
// createPersistentTopic-thread only tries to handle this future-result
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
pulsar.getExecutor().execute(() -> topics.remove(topic));
topicFuture.completeExceptionally(exception);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -1434,13 +1432,6 @@ public void testCleanupTopic() throws Exception {
// Ok
}

final CompletableFuture<Optional<Topic>> timedOutTopicFuture = topicFuture;
// timeout topic future should be removed from cache
shibd marked this conversation as resolved.
Show resolved Hide resolved
retryStrategically((test) -> pulsar1.getBrokerService().getTopic(topicName, false) != timedOutTopicFuture, 5,
1000);

assertNotEquals(timedOutTopicFuture, pulsar1.getBrokerService().getTopics().get(topicName));

try {
Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS);
Expand All @@ -1452,6 +1443,7 @@ public void testCleanupTopic() throws Exception {
ManagedLedgerImpl ml = (ManagedLedgerImpl) mlFactory.open(topicMlName + "-2");
mlFuture.complete(ml);

// Re-create topic will success.
Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Shared).subscribeAsync()
.get(2 * topicLoadTimeoutSeconds, TimeUnit.SECONDS);
Expand Down
Loading
Loading