-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Fix shadow topics cannot be consumed when the entry is not cached #23147
[fix][broker] Fix shadow topics cannot be consumed when the entry is not cached #23147
Conversation
…not cached ### Motivation For shadow topics, a `ReadOnlyLedgerHandle` is created to read messages from the source topic when the entry is not cached. However, it leverages the `readAsync` API that validates the `lastAddConfirmed` field (LAC). In `ReadOnlyLedgerHandle`, this field could never be updated, so `readAsync` could fail immediately. See `LedgerHandle#readAsync`: ```java if (lastEntry > lastAddConfirmed) { LOG.error("ReadAsync exception on ledgerId:{} firstEntry:{} lastEntry:{} lastAddConfirmed:{}", ledgerId, firstEntry, lastEntry, lastAddConfirmed); return FutureUtils.exception(new BKReadException()); } ``` This bug is not exposed because: 1. `PulsarMockReadHandle` does not maintain a LAC field. 2. The case for cache miss is never tested. ### Modifications Replace `readAsync` with `readUnconfirmedAsync`. The managed ledger already maintains a `lastConfirmedEntry` to limit the last entry. See `ManagedLedgerImpl#internalReadFromLedger`: ```java Position lastPosition = lastConfirmedEntry; if (ledger.getId() == lastPosition.getLedgerId()) { lastEntryInLedger = lastPosition.getEntryId(); ``` Add `ShadowTopicRealBkTest` to cover two code changes `RangeEntryCacheImpl#readFromStorage` and `EntryCache#asyncReadEntry`.
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Mark it as drafted to fix |
d89fbf2
to
0d143a9
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #23147 +/- ##
============================================
+ Coverage 73.57% 74.55% +0.98%
- Complexity 32624 33613 +989
============================================
Files 1877 1920 +43
Lines 139502 144283 +4781
Branches 15299 15781 +482
============================================
+ Hits 102638 107575 +4937
+ Misses 28908 28478 -430
- Partials 7956 8230 +274
Flags with carried forward coverage won't be shown. Click here to find out more.
|
Now all tests passed. PTAL again. @hangc0276 /cc @lhotari @RobertIndie since the implementation changed a bit after your reviews. |
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java
Show resolved
Hide resolved
…not cached (#23147) For shadow topics, a `ReadOnlyLedgerHandle` is created to read messages from the source topic when the entry is not cached. However, it leverages the `readAsync` API that validates the `lastAddConfirmed` field (LAC). In `ReadOnlyLedgerHandle`, this field could never be updated, so `readAsync` could fail immediately. See `LedgerHandle#readAsync`: ```java if (lastEntry > lastAddConfirmed) { LOG.error("ReadAsync exception on ledgerId:{} firstEntry:{} lastEntry:{} lastAddConfirmed:{}", ledgerId, firstEntry, lastEntry, lastAddConfirmed); return FutureUtils.exception(new BKReadException()); } ``` This bug is not exposed because: 1. `PulsarMockReadHandle` does not maintain a LAC field. 2. The case for cache miss is never tested. Replace `readAsync` with `readUnconfirmedAsync` and compare the entry range with the `ManagedLedger#getLastConfirmedEntry`. The managed ledger already maintains a `lastConfirmedEntry` to limit the last entry. See `ManagedLedgerImpl#internalReadFromLedger`: ```java Position lastPosition = lastConfirmedEntry; if (ledger.getId() == lastPosition.getLedgerId()) { lastEntryInLedger = lastPosition.getEntryId(); ``` Add `ShadowTopicRealBkTest` to cover two code changes `RangeEntryCacheImpl#readFromStorage` and `EntryCache#asyncReadEntry`. Exceptionally, compare the entry range with the LAC of a ledger handle when it does not exist in the managed ledger. It's because `ReadOnlyManagedLedgerImpl` could read a ledger in another managed ledger. <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> PR in forked repository: BewareMyPower#33 <!-- After opening this PR, the build in apache/pulsar will fail and instructions will be provided for opening a PR in the PR author's forked repository. apache/pulsar pull requests should be first tested in your own fork since the apache/pulsar CI based on GitHub Actions has constrained resources and quota. GitHub Actions provides separate quota for pull requests that are executed in a forked repository. The tests will be run in the forked repository until all PR review comments have been handled, the tests pass and the PR is approved by a reviewer. --> (cherry picked from commit 15b88d2)
…not cached (#23147) For shadow topics, a `ReadOnlyLedgerHandle` is created to read messages from the source topic when the entry is not cached. However, it leverages the `readAsync` API that validates the `lastAddConfirmed` field (LAC). In `ReadOnlyLedgerHandle`, this field could never be updated, so `readAsync` could fail immediately. See `LedgerHandle#readAsync`: ```java if (lastEntry > lastAddConfirmed) { LOG.error("ReadAsync exception on ledgerId:{} firstEntry:{} lastEntry:{} lastAddConfirmed:{}", ledgerId, firstEntry, lastEntry, lastAddConfirmed); return FutureUtils.exception(new BKReadException()); } ``` This bug is not exposed because: 1. `PulsarMockReadHandle` does not maintain a LAC field. 2. The case for cache miss is never tested. Replace `readAsync` with `readUnconfirmedAsync` and compare the entry range with the `ManagedLedger#getLastConfirmedEntry`. The managed ledger already maintains a `lastConfirmedEntry` to limit the last entry. See `ManagedLedgerImpl#internalReadFromLedger`: ```java Position lastPosition = lastConfirmedEntry; if (ledger.getId() == lastPosition.getLedgerId()) { lastEntryInLedger = lastPosition.getEntryId(); ``` Add `ShadowTopicRealBkTest` to cover two code changes `RangeEntryCacheImpl#readFromStorage` and `EntryCache#asyncReadEntry`. Exceptionally, compare the entry range with the LAC of a ledger handle when it does not exist in the managed ledger. It's because `ReadOnlyManagedLedgerImpl` could read a ledger in another managed ledger. <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> PR in forked repository: BewareMyPower#33 <!-- After opening this PR, the build in apache/pulsar will fail and instructions will be provided for opening a PR in the PR author's forked repository. apache/pulsar pull requests should be first tested in your own fork since the apache/pulsar CI based on GitHub Actions has constrained resources and quota. GitHub Actions provides separate quota for pull requests that are executed in a forked repository. The tests will be run in the forked repository until all PR review comments have been handled, the tests pass and the PR is approved by a reviewer. --> (cherry picked from commit 15b88d2)
…not cached (apache#23147) For shadow topics, a `ReadOnlyLedgerHandle` is created to read messages from the source topic when the entry is not cached. However, it leverages the `readAsync` API that validates the `lastAddConfirmed` field (LAC). In `ReadOnlyLedgerHandle`, this field could never be updated, so `readAsync` could fail immediately. See `LedgerHandle#readAsync`: ```java if (lastEntry > lastAddConfirmed) { LOG.error("ReadAsync exception on ledgerId:{} firstEntry:{} lastEntry:{} lastAddConfirmed:{}", ledgerId, firstEntry, lastEntry, lastAddConfirmed); return FutureUtils.exception(new BKReadException()); } ``` This bug is not exposed because: 1. `PulsarMockReadHandle` does not maintain a LAC field. 2. The case for cache miss is never tested. Replace `readAsync` with `readUnconfirmedAsync` and compare the entry range with the `ManagedLedger#getLastConfirmedEntry`. The managed ledger already maintains a `lastConfirmedEntry` to limit the last entry. See `ManagedLedgerImpl#internalReadFromLedger`: ```java Position lastPosition = lastConfirmedEntry; if (ledger.getId() == lastPosition.getLedgerId()) { lastEntryInLedger = lastPosition.getEntryId(); ``` Add `ShadowTopicRealBkTest` to cover two code changes `RangeEntryCacheImpl#readFromStorage` and `EntryCache#asyncReadEntry`. Exceptionally, compare the entry range with the LAC of a ledger handle when it does not exist in the managed ledger. It's because `ReadOnlyManagedLedgerImpl` could read a ledger in another managed ledger. <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> PR in forked repository: BewareMyPower#33 <!-- After opening this PR, the build in apache/pulsar will fail and instructions will be provided for opening a PR in the PR author's forked repository. apache/pulsar pull requests should be first tested in your own fork since the apache/pulsar CI based on GitHub Actions has constrained resources and quota. GitHub Actions provides separate quota for pull requests that are executed in a forked repository. The tests will be run in the forked repository until all PR review comments have been handled, the tests pass and the PR is approved by a reviewer. --> (cherry picked from commit 15b88d2) (cherry picked from commit 14b3672)
…not cached (apache#23147) For shadow topics, a `ReadOnlyLedgerHandle` is created to read messages from the source topic when the entry is not cached. However, it leverages the `readAsync` API that validates the `lastAddConfirmed` field (LAC). In `ReadOnlyLedgerHandle`, this field could never be updated, so `readAsync` could fail immediately. See `LedgerHandle#readAsync`: ```java if (lastEntry > lastAddConfirmed) { LOG.error("ReadAsync exception on ledgerId:{} firstEntry:{} lastEntry:{} lastAddConfirmed:{}", ledgerId, firstEntry, lastEntry, lastAddConfirmed); return FutureUtils.exception(new BKReadException()); } ``` This bug is not exposed because: 1. `PulsarMockReadHandle` does not maintain a LAC field. 2. The case for cache miss is never tested. Replace `readAsync` with `readUnconfirmedAsync` and compare the entry range with the `ManagedLedger#getLastConfirmedEntry`. The managed ledger already maintains a `lastConfirmedEntry` to limit the last entry. See `ManagedLedgerImpl#internalReadFromLedger`: ```java Position lastPosition = lastConfirmedEntry; if (ledger.getId() == lastPosition.getLedgerId()) { lastEntryInLedger = lastPosition.getEntryId(); ``` Add `ShadowTopicRealBkTest` to cover two code changes `RangeEntryCacheImpl#readFromStorage` and `EntryCache#asyncReadEntry`. Exceptionally, compare the entry range with the LAC of a ledger handle when it does not exist in the managed ledger. It's because `ReadOnlyManagedLedgerImpl` could read a ledger in another managed ledger. <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> PR in forked repository: BewareMyPower#33 <!-- After opening this PR, the build in apache/pulsar will fail and instructions will be provided for opening a PR in the PR author's forked repository. apache/pulsar pull requests should be first tested in your own fork since the apache/pulsar CI based on GitHub Actions has constrained resources and quota. GitHub Actions provides separate quota for pull requests that are executed in a forked repository. The tests will be run in the forked repository until all PR review comments have been handled, the tests pass and the PR is approved by a reviewer. --> (cherry picked from commit 15b88d2) (cherry picked from commit 14b3672)
…not cached (apache#23147) For shadow topics, a `ReadOnlyLedgerHandle` is created to read messages from the source topic when the entry is not cached. However, it leverages the `readAsync` API that validates the `lastAddConfirmed` field (LAC). In `ReadOnlyLedgerHandle`, this field could never be updated, so `readAsync` could fail immediately. See `LedgerHandle#readAsync`: ```java if (lastEntry > lastAddConfirmed) { LOG.error("ReadAsync exception on ledgerId:{} firstEntry:{} lastEntry:{} lastAddConfirmed:{}", ledgerId, firstEntry, lastEntry, lastAddConfirmed); return FutureUtils.exception(new BKReadException()); } ``` This bug is not exposed because: 1. `PulsarMockReadHandle` does not maintain a LAC field. 2. The case for cache miss is never tested. Replace `readAsync` with `readUnconfirmedAsync` and compare the entry range with the `ManagedLedger#getLastConfirmedEntry`. The managed ledger already maintains a `lastConfirmedEntry` to limit the last entry. See `ManagedLedgerImpl#internalReadFromLedger`: ```java Position lastPosition = lastConfirmedEntry; if (ledger.getId() == lastPosition.getLedgerId()) { lastEntryInLedger = lastPosition.getEntryId(); ``` Add `ShadowTopicRealBkTest` to cover two code changes `RangeEntryCacheImpl#readFromStorage` and `EntryCache#asyncReadEntry`. Exceptionally, compare the entry range with the LAC of a ledger handle when it does not exist in the managed ledger. It's because `ReadOnlyManagedLedgerImpl` could read a ledger in another managed ledger. <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> PR in forked repository: BewareMyPower#33 <!-- After opening this PR, the build in apache/pulsar will fail and instructions will be provided for opening a PR in the PR author's forked repository. apache/pulsar pull requests should be first tested in your own fork since the apache/pulsar CI based on GitHub Actions has constrained resources and quota. GitHub Actions provides separate quota for pull requests that are executed in a forked repository. The tests will be run in the forked repository until all PR review comments have been handled, the tests pass and the PR is approved by a reviewer. --> (cherry picked from commit 15b88d2) (cherry picked from commit 14b3672)
…not cached (apache#23147) ### Motivation For shadow topics, a `ReadOnlyLedgerHandle` is created to read messages from the source topic when the entry is not cached. However, it leverages the `readAsync` API that validates the `lastAddConfirmed` field (LAC). In `ReadOnlyLedgerHandle`, this field could never be updated, so `readAsync` could fail immediately. See `LedgerHandle#readAsync`: ```java if (lastEntry > lastAddConfirmed) { LOG.error("ReadAsync exception on ledgerId:{} firstEntry:{} lastEntry:{} lastAddConfirmed:{}", ledgerId, firstEntry, lastEntry, lastAddConfirmed); return FutureUtils.exception(new BKReadException()); } ``` This bug is not exposed because: 1. `PulsarMockReadHandle` does not maintain a LAC field. 2. The case for cache miss is never tested. ### Modifications Replace `readAsync` with `readUnconfirmedAsync` and compare the entry range with the `ManagedLedger#getLastConfirmedEntry`. The managed ledger already maintains a `lastConfirmedEntry` to limit the last entry. See `ManagedLedgerImpl#internalReadFromLedger`: ```java Position lastPosition = lastConfirmedEntry; if (ledger.getId() == lastPosition.getLedgerId()) { lastEntryInLedger = lastPosition.getEntryId(); ``` Add `ShadowTopicRealBkTest` to cover two code changes `RangeEntryCacheImpl#readFromStorage` and `EntryCache#asyncReadEntry`. Exceptionally, compare the entry range with the LAC of a ledger handle when it does not exist in the managed ledger. It's because `ReadOnlyManagedLedgerImpl` could read a ledger in another managed ledger. ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> ### Matching PR in forked repository PR in forked repository: BewareMyPower#33 <!-- After opening this PR, the build in apache/pulsar will fail and instructions will be provided for opening a PR in the PR author's forked repository. apache/pulsar pull requests should be first tested in your own fork since the apache/pulsar CI based on GitHub Actions has constrained resources and quota. GitHub Actions provides separate quota for pull requests that are executed in a forked repository. The tests will be run in the forked repository until all PR review comments have been handled, the tests pass and the PR is approved by a reviewer. -->
Motivation
For shadow topics, a
ReadOnlyLedgerHandle
is created to read messages from the source topic when the entry is not cached. However, it leverages thereadAsync
API that validates thelastAddConfirmed
field (LAC). InReadOnlyLedgerHandle
, this field could never be updated, soreadAsync
could fail immediately. SeeLedgerHandle#readAsync
:This bug is not exposed because:
PulsarMockReadHandle
does not maintain a LAC field.Modifications
Replace
readAsync
withreadUnconfirmedAsync
and compare the entry range with theManagedLedger#getLastConfirmedEntry
. The managed ledger already maintains alastConfirmedEntry
to limit the last entry. SeeManagedLedgerImpl#internalReadFromLedger
:Add
ShadowTopicRealBkTest
to cover two code changesRangeEntryCacheImpl#readFromStorage
andEntryCache#asyncReadEntry
.Exceptionally, compare the entry range with the LAC of a ledger handle when it does not exist in the managed ledger. It's because
ReadOnlyManagedLedgerImpl
could read a ledger in another managed ledger.Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: BewareMyPower#33