Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][txn]Handle exceptions in the transaction pending ack init #21274

Merged
merged 9 commits into from
Apr 15, 2024

Conversation

liangyepianzhou
Copy link
Contributor

@liangyepianzhou liangyepianzhou commented Sep 28, 2023

Reopen #20927

Motivation

Transaction Pending ack is a component of persistent subscription. When creating or loading a subscription, If the subscription has a transaction pending ack, the transaction pending ack of this subscription needs to read data from the bookkeeper to recover it.
If the pending ack recovery failed, the subscription could not provide service anymore. And there is no processing logic for recovery failure.

pendingAckStoreProvider.checkInitializedBefore(persistentSubscription)
.thenAcceptAsync(init -> {
if (init) {
initPendingAckStore();
} else {
completeHandleFuture();
}
}, internalPinnedExecutor)
.exceptionallyAsync(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
changeToErrorState();
exceptionHandleFuture(t);
this.pendingAckStoreFuture.completeExceptionally(t);
return null;
}, internalPinnedExecutor);
}

2023-08-02T02:21:46,026+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] WARN  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.6:54641][persistent://sn/system/__xxx-partition-0][multiTopicsReader-3ec782c191] Failed to create consumer: consumerId=0, org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /managed-ledgers/sn/system/persistent/__xxx-partition-0-multiTopicsReader-3ec782c191__transaction_pending_ack
java.util.concurrent.CompletionException: org.apache.pulsar.metadata.api.MetadataStoreException: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /managed-ledgers/sn/system/persistent/__xxx-partition-0-multiTopicsReader-3ec782c191__transaction_pending_ack
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?]
	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1177) ~[?:?]
	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:215) ~[io.streamnative-pulsar-broker-2.11.2.2.jar:2.11.2.2]
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:536) ~[io.streamnative-pulsar-broker-2.11.2.2.jar:2.11.2.2]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$16(PersistentTopic.java:785) ~[io.streamnative-pulsar-broker-2.11.2.2.jar:2.11.2.2]
	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187) ~[?:?]
	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$18(PersistentTopic.java:780) ~[io.streamnative-pulsar-broker-2.11.2.2.jar:2.11.2.2]
	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187) ~[?:?]
	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:701) ~[io.streamnative-pulsar-broker-2.11.2.2.jar:2.11.2.2]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:677) ~[io.streamnative-pulsar-broker-2.11.2.2.jar:2.11.2.2]
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$11(ServerCnx.java:1135) ~[io.streamnative-pulsar-broker-2.11.2.2.jar:2.11.2.2]
	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage$Functions.lambda$getLedgerEntry$0(BookkeeperSchemaStorage.java:655) ~[io.streamnative-pulsar-broker-2.11.2.2.jar:2.11.2.2]
	at org.apache.bookkeeper.client.LedgerHandle$6.onSuccess(LedgerHandle.java:809) ~[org.apache.bookkeeper-bookkeeper-server-4.15.4.jar:4.15.4]
	at org.apache.bookkeeper.client.LedgerHandle$6.onSuccess(LedgerHandle.java:806) ~[org.apache.bookkeeper-bookkeeper-server-4.15.4.jar:4.15.4]
	at org.apache.bookkeeper.common.concurrent.FutureEventListener.accept(FutureEventListener.java:42) ~[org.apache.bookkeeper-bookkeeper-common-4.15.4.jar:4.15.4]
	at org.apache.bookkeeper.common.concurrent.FutureEventListener.accept(FutureEventListener.java:26) ~[org.apache.bookkeeper-bookkeeper-common-4.15.4.jar:4.15.4]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.93.Final.jar:4.1.93.Final]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: org.apache.pulsar.metadata.api.MetadataStoreException: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /managed-ledgers/sn/system/persistent/__xxx-partition-0-multiTopicsReader-3ec782c191__transaction_pending_ack
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:452) ~[io.streamnative-pulsar-metadata-2.11.2.2.jar:2.11.2.2]
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$existsFromStore$11(ZKMetadataStore.java:331) ~[io.streamnative-pulsar-metadata-2.11.2.2.jar:2.11.2.2]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
	... 4 more
Caused by: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /managed-ledgers/sn/system/persistent/__xxx-partition-0-multiTopicsReader-3ec782c191__transaction_pending_ack
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:102) ~[org.apache.zookeeper-zookeeper-3.8.1.jar:3.8.1]
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:54) ~[org.apache.zookeeper-zookeeper-3.8.1.jar:3.8.1]
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:442) ~[io.streamnative-pulsar-metadata-2.11.2.2.jar:2.11.2.2]
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$existsFromStore$11(ZKMetadataStore.java:331) ~[io.streamnative-pulsar-metadata-2.11.2.2.jar:2.11.2.2]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
	... 4 more

Modifications

If a retryable exception is encountered, recover again. Otherwise, a TransactionComponentLoadFailedException exception is returned.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@liangyepianzhou liangyepianzhou self-assigned this Sep 28, 2023
@liangyepianzhou liangyepianzhou marked this pull request as draft September 28, 2023 08:14
@liangyepianzhou liangyepianzhou added doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels Oct 17, 2023
@liangyepianzhou liangyepianzhou marked this pull request as ready for review October 20, 2023 03:25
@apache apache deleted a comment from github-actions bot Oct 20, 2023
@liangyepianzhou liangyepianzhou changed the title [fix][txn]Remove subscription after transaction pending ack init failed. [fix][txn]Handle exceptions in the transaction pending ack init Oct 23, 2023
@codelipenghui codelipenghui added this to the 3.2.0 milestone Oct 27, 2023
@codelipenghui codelipenghui added type/bug The PR fixed a bug or issue reported a bug area/transaction release/2.10.6 category/reliability The function does not work properly in certain specific environments or failures. e.g. data lost release/3.1.2 release/3.0.3 release/2.11.4 labels Oct 27, 2023
@shibd
Copy link
Member

shibd commented Apr 11, 2024

Seems testPendingAckReplayChangeStateError always fails.

liangyepianzhou and others added 9 commits April 14, 2024 20:55
1. Add a serverError `TransactionComponentLoadFailed`
2. Add a new PulsarClientException `TransactionComponentLoadFailedException`.
2. Add a new BrokerServiceException `TransactionComponentLoadFailedException`.

(cherry picked from commit 5d5b51f)
1. Add a timer and backoff to retry for retryable exception.
2. Return `TransactionComponentLoadFailed` error to client when encounter no-retryable exception.
Test build consumer with retryable exception and no-retryable exception.
@shibd shibd force-pushed the remove_failed_sub branch from c5ffbf9 to 7f589de Compare April 14, 2024 13:00
@shibd shibd merged commit 5d18ff7 into apache:master Apr 15, 2024
50 checks passed
shibd pushed a commit that referenced this pull request Apr 15, 2024
shibd pushed a commit that referenced this pull request Apr 15, 2024
shibd pushed a commit that referenced this pull request Apr 15, 2024
shibd pushed a commit that referenced this pull request Apr 15, 2024
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 16, 2024
…he#21274)

Co-authored-by: Baodi Shi <[email protected]>
(cherry picked from commit 5d18ff7)
(cherry picked from commit 000ee66)
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 17, 2024
…he#21274)

Co-authored-by: Baodi Shi <[email protected]>
(cherry picked from commit 5d18ff7)
(cherry picked from commit 000ee66)
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 17, 2024
…he#21274)

Co-authored-by: Baodi Shi <[email protected]>
(cherry picked from commit 5d18ff7)
(cherry picked from commit 000ee66)
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 19, 2024
…he#21274)

Co-authored-by: Baodi Shi <[email protected]>
(cherry picked from commit 5d18ff7)
(cherry picked from commit 000ee66)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 23, 2024
…he#21274)

Co-authored-by: Baodi Shi <[email protected]>
(cherry picked from commit 5d18ff7)
(cherry picked from commit 000ee66)
pgier pushed a commit to pgier/pulsar that referenced this pull request Aug 23, 2024
…he#21274)

Co-authored-by: Baodi Shi <[email protected]>
(cherry picked from commit 5d18ff7)
(cherry picked from commit 000ee66)
hanmz pushed a commit to hanmz/pulsar that referenced this pull request Feb 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants