Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] The topic might reference a closed ledger #22860

Merged
merged 4 commits into from
Jun 13, 2024

Conversation

shibd
Copy link
Member

@shibd shibd commented Jun 6, 2024

Motivation

We observe that a normal topic might reference a closed ledger and it never auto recover. will cause the producer and customer stuck.

The root cause is related to topic create timeout. Assuming we have two concurrent requests to create a topic: firstCreate(Thread-1), secondCreate(Thread-2)

Thread-1 Thread-2
firstCreate topic timeout
remove this old topic and recreate a new topic
Open ledger from cache, and get an old ledger that create by firstCreate topic
call topic.close
old ledger close and remove it from cache
but this old ledger being referenced to new topic and that stats is close
  • When the firstCreate topic timeout, will call topic.close. it will close the ledger, and remove it from the ledger cache.

    executor().submit(() -> {
    persistentTopic.close().whenComplete((ignore, ex) -> {
    if (ex != null) {
    log.warn("[{}] Get an error when closing topic.",
    topic, ex);
    }
    });
    });

  • If the secondCreate request successfully creates a topic before the old ledger closes, the reference will be made to the old ledger.

Modifications

Refactor BrokerService#getTopic() method:

  • If any exceptions occur during the creation topics process, they should be proactively and asynchronously removed from the topics: pulsar.getExecutor().execute(() -> topics.remove(topicName.toString()));
  • The new get topic operation should not remove the topicFuture that created by previous get topic operation. If the previous topicFuture is not removed from the map yet, the broker should always use the existing topicFuture (waiting for completion or return an error to the client side directly)
  • Add documentation to clearly define the method boundaries
  • During the process of creating a topic, all the remove methods should not include topicFuture because the actual future object placed in the map might not be the same as the topicFuture. You can check this code to validation it.

Verifying this change

  • Add testCloseLedgerThatTopicAfterCreateTimeout to cover this case.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: shibd#37

@shibd shibd added type/bug The PR fixed a bug or issue reported a bug ready-to-test release/3.3.1 release/3.0.6 labels Jun 6, 2024
@shibd shibd added this to the 3.4.0 milestone Jun 6, 2024
@shibd shibd self-assigned this Jun 6, 2024
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jun 6, 2024
@liangyepianzhou liangyepianzhou self-requested a review June 9, 2024 08:06
@shibd shibd requested a review from codelipenghui June 12, 2024 01:28
@codecov-commenter
Copy link

Codecov Report

Attention: Patch coverage is 87.80488% with 5 lines in your changes missing coverage. Please review.

Project coverage is 73.27%. Comparing base (bbc6224) to head (b3d598b).
Report is 374 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #22860      +/-   ##
============================================
- Coverage     73.57%   73.27%   -0.30%     
- Complexity    32624    32708      +84     
============================================
  Files          1877     1891      +14     
  Lines        139502   141982    +2480     
  Branches      15299    15571     +272     
============================================
+ Hits         102638   104040    +1402     
- Misses        28908    29931    +1023     
- Partials       7956     8011      +55     
Flag Coverage Δ
inttests 27.27% <68.29%> (+2.69%) ⬆️
systests 24.66% <21.95%> (+0.34%) ⬆️
unittests 72.31% <87.80%> (-0.54%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
...n/java/org/apache/pulsar/broker/PulsarService.java 83.91% <100.00%> (+1.54%) ⬆️
...rg/apache/pulsar/broker/service/BrokerService.java 81.65% <86.84%> (+0.87%) ⬆️

... and 404 files with indirect coverage changes

@codelipenghui codelipenghui requested a review from lhotari June 12, 2024 13:01
@shibd shibd force-pushed the optimize_ctopic_timeout branch from b3d598b to 40ce065 Compare June 13, 2024 06:36
@shibd shibd requested a review from codelipenghui June 13, 2024 07:38
@shibd shibd requested a review from poorbarcode June 13, 2024 07:38
@shibd shibd merged commit a91a172 into apache:master Jun 13, 2024
48 of 51 checks passed
shibd added a commit to shibd/pulsar that referenced this pull request Jun 13, 2024
shibd added a commit that referenced this pull request Jun 13, 2024
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Jun 17, 2024
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Jun 25, 2024
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 1, 2024
@TakaHiR07
Copy link
Contributor

@shibd I have a concern about this pr.

After this pr, the process is "topicFuture timeout -> close topic -> remove topicFuture". During the "close topic", client can acquire the timeout response immediately and retry. That means, if topicFuture timeout, client may generate a large amount of retry requests to server. Is seems not very good.

I have test and see client retry frequently in log

17:53:05.357 [pulsar-io-32-30] INFO org.apache.pulsar.broker.service.ServerCnx - [[id: 0xce950df8, L:/ip1:6651 - R:/ip2:50384]] Subscribing on topic persistent://test/test/test-partition-0 / test_304_pulsar. consumerId: 43
17:53:05.357 [pulsar-io-32-30] WARN org.apache.pulsar.broker.service.ServerCnx - [/ip2:50384][persistent://test/test/test-partition-0][test_304_pulsar] Failed to create consumer: consumerId=43, Failed to load topic within timeout

17:53:05.472 [pulsar-io-32-30] INFO org.apache.pulsar.broker.service.ServerCnx - [[id: 0xce950df8, L:/ip1:6651 - R:/ip2:50384]] Subscribing on topic persistent://test/test/test-partition-0 / test_304_pulsar. consumerId: 43
17:53:05.472 [pulsar-io-32-30] WARN org.apache.pulsar.broker.service.ServerCnx - [/ip2:50384][persistent://test/test/test-partition-0][test_304_pulsar] Failed to create consumer: consumerId=43, Failed to load topic within timeout

17:53:05.592 [pulsar-io-32-30] INFO org.apache.pulsar.broker.service.ServerCnx - [[id: 0xce950df8, L:/ip1:6651 - R:/ip2:50384]] Subscribing on topic persistent://test/test/test-partition-0 / test_304_pulsar. consumerId: 43
17:53:05.592 [pulsar-io-32-30] WARN org.apache.pulsar.broker.service.ServerCnx - [/ip2:50384][persistent://test/test/test-partition-0][test_304_pulsar] Failed to create consumer: consumerId=43, Failed to load topic within timeout

@shibd
Copy link
Member Author

shibd commented Jul 5, 2024

client can acquire the timeout response immediately and retry.

Yes, you are right. Maybe we need improve client, when get a timoutexception, don't be so aggressive in retrying.

or, we should completely refactor the broker's behavior in topic load:

  1. When the broker loads a topic and times it out, it should not complete the future to the client before closing all resources.

Anyway, the current solution will not leave the topic in an unusable state. (It is link with a closed ledger.)

WHDYT?

@TakaHiR07
Copy link
Contributor

  1. When the broker loads a topic and times it out, it should not complete the future to the client before closing all resources.

This way seems better.

@poorbarcode
Copy link
Contributor

@shibd

Could you cherry-pick this PR into branch-2.11?

@shibd
Copy link
Member Author

shibd commented Jul 19, 2024

hi, @poorbarcode

#22739

For branch-2.11, I think we can use this fix, It is a simple solution. Do you agree?

@TakaHiR07
Copy link
Contributor

TakaHiR07 commented Jul 26, 2024

@shibd I have another question. Is there exist the situation: broker's memory usage is high, persistentTopic object is be GC between the persistentTopic object create and add object to reference map ? And then result in topicFuture permanently not finish ?

// Once we have the configuration, we can proceed with the async open operation
managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), managedLedgerConfig,
new OpenLedgerCallback() {
@Override
public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
try {
PersistentTopic persistentTopic = isSystemTopic(topic)
? new SystemTopic(topic, ledger, BrokerService.this)
: newTopic(topic, ledger, BrokerService.this, PersistentTopic.class);
persistentTopic
.initialize()
.thenCompose(__ -> persistentTopic.preCreateSubscriptionForCompactionIfNeeded())
.thenCompose(__ -> persistentTopic.checkReplication())
.thenCompose(v -> {
// Also check dedup status
return persistentTopic.checkDeduplicationStatus();
})
.thenRun(() -> {
log.info("Created topic {} - dedup is {}", topic,
persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled");
long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
- topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
if (topicFuture.isCompletedExceptionally()) {
// Check create persistent topic timeout.
log.warn("{} future is already completed with failure {}, closing the"
+ " topic", topic, FutureUtil.getException(topicFuture));
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, ex) -> {
topics.remove(topic, topicFuture);
if (ex != null) {
log.warn("[{}] Get an error when closing topic.",
topic, ex);
}
});
});
} else {
addTopicToStatsMaps(topicName, persistentTopic);
topicFuture.complete(Optional.of(persistentTopic));
}
})

lhotari pushed a commit that referenced this pull request Jul 29, 2024
nodece pushed a commit to ascentstream/pulsar that referenced this pull request Sep 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants