Skip to content

Commit 58e13af

Browse files
Technoboy-codelipenghui
authored andcommitted
Fix lost message issue due to ledger rollover. (apache#14664)
(cherry picked from commit ad2cc2d)
1 parent 32f8065 commit 58e13af

File tree

5 files changed

+49
-10
lines changed

5 files changed

+49
-10
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -762,8 +762,8 @@ private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
762762
}
763763
} else if (state == State.ClosedLedger) {
764764
// No ledger and no pending operations. Create a new ledger
765-
log.info("[{}] Creating a new ledger", name);
766765
if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) {
766+
log.info("[{}] Creating a new ledger", name);
767767
this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
768768
mbean.startDataLedgerCreateOp();
769769
asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
@@ -1588,8 +1588,8 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
15881588
}
15891589

15901590
synchronized void createLedgerAfterClosed() {
1591-
if(isNeededCreateNewLedgerAfterCloseLedger()) {
1592-
log.info("[{}] Creating a new ledger", name);
1591+
if (isNeededCreateNewLedgerAfterCloseLedger()) {
1592+
log.info("[{}] Creating a new ledger after closed", name);
15931593
STATE_UPDATER.set(this, State.CreatingLedger);
15941594
this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
15951595
mbean.startDataLedgerCreateOp();
@@ -1612,8 +1612,8 @@ boolean isNeededCreateNewLedgerAfterCloseLedger() {
16121612
@Override
16131613
public void rollCurrentLedgerIfFull() {
16141614
log.info("[{}] Start checking if current ledger is full", name);
1615-
if (currentLedgerEntries > 0 && currentLedgerIsFull()) {
1616-
STATE_UPDATER.set(this, State.ClosingLedger);
1615+
if (currentLedgerEntries > 0 && currentLedgerIsFull()
1616+
&& STATE_UPDATER.compareAndSet(this, State.LedgerOpened, State.ClosingLedger)) {
16171617
currentLedger.asyncClose(new AsyncCallback.CloseCallback() {
16181618
@Override
16191619
public void closeComplete(int rc, LedgerHandle lh, Object o) {

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

+3
Original file line numberDiff line numberDiff line change
@@ -2241,6 +2241,9 @@ void testFindNewestMatchingAfterLedgerRollover() throws Exception {
22412241
// roll a new ledger
22422242
int numLedgersBefore = ledger.getLedgersInfo().size();
22432243
ledger.getConfig().setMaxEntriesPerLedger(1);
2244+
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
2245+
stateUpdater.setAccessible(true);
2246+
stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
22442247
ledger.rollCurrentLedgerIfFull();
22452248
Awaitility.await().atMost(20, TimeUnit.SECONDS)
22462249
.until(() -> ledger.getLedgersInfo().size() > numLedgersBefore);

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java

+33-4
Original file line numberDiff line numberDiff line change
@@ -1881,6 +1881,9 @@ public void testDeletionAfterLedgerClosedAndRetention() throws Exception {
18811881
c1.skipEntries(1, IndividualDeletedEntries.Exclude);
18821882
c2.skipEntries(1, IndividualDeletedEntries.Exclude);
18831883
// let current ledger close
1884+
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
1885+
stateUpdater.setAccessible(true);
1886+
stateUpdater.set(ml, ManagedLedgerImpl.State.LedgerOpened);
18841887
ml.rollCurrentLedgerIfFull();
18851888
// let retention expire
18861889
Thread.sleep(1500);
@@ -2150,6 +2153,9 @@ public void testGetPositionAfterN() throws Exception {
21502153
managedCursor.markDelete(positionMarkDelete);
21512154

21522155
//trigger ledger rollover and wait for the new ledger created
2156+
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
2157+
stateUpdater.setAccessible(true);
2158+
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
21532159
managedLedger.rollCurrentLedgerIfFull();
21542160
Awaitility.await().untilAsserted(() -> assertEquals(managedLedger.getLedgersInfo().size(), 3));
21552161
assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries());
@@ -3009,7 +3015,7 @@ public void testManagedLedgerRollOverIfFull() throws Exception {
30093015
ledger.addEntry(new byte[1024 * 1024]);
30103016
}
30113017

3012-
Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2);
3018+
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2));
30133019
List<Entry> entries = cursor.readEntries(msgNum);
30143020
Assert.assertEquals(msgNum, entries.size());
30153021

@@ -3020,9 +3026,12 @@ public void testManagedLedgerRollOverIfFull() throws Exception {
30203026

30213027
// all the messages have benn acknowledged
30223028
// and all the ledgers have been removed except the last ledger
3023-
Thread.sleep(1000);
3024-
Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1);
3025-
Assert.assertEquals(ledger.getTotalSize(), 0);
3029+
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
3030+
stateUpdater.setAccessible(true);
3031+
stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
3032+
ledger.rollCurrentLedgerIfFull();
3033+
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1));
3034+
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getTotalSize(), 0));
30263035
}
30273036

30283037
@Test
@@ -3042,6 +3051,26 @@ public void testLedgerReachMaximumRolloverTime() throws Exception {
30423051
.until(() -> firstLedgerId != ml.addEntry("test".getBytes()).getLedgerId());
30433052
}
30443053

3054+
@Test
3055+
public void testLedgerNotRolloverWithoutOpenState() throws Exception {
3056+
ManagedLedgerConfig config = new ManagedLedgerConfig();
3057+
config.setMaxEntriesPerLedger(2);
3058+
3059+
ManagedLedgerImpl ml = spy((ManagedLedgerImpl)factory.open("ledger-not-rollover-without-open-state", config));
3060+
ml.addEntry("test1".getBytes()).getLedgerId();
3061+
long ledgerId2 = ml.addEntry("test2".getBytes()).getLedgerId();
3062+
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
3063+
stateUpdater.setAccessible(true);
3064+
// Set state to CreatingLedger to avoid rollover
3065+
stateUpdater.set(ml, ManagedLedgerImpl.State.CreatingLedger);
3066+
ml.rollCurrentLedgerIfFull();
3067+
Field currentLedger = ManagedLedgerImpl.class.getDeclaredField("currentLedger");
3068+
currentLedger.setAccessible(true);
3069+
LedgerHandle lh = (LedgerHandle) currentLedger.get(ml);
3070+
Awaitility.await()
3071+
.until(() -> ledgerId2 == lh.getId());
3072+
}
3073+
30453074
@Test
30463075
public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exception {
30473076
ManagedLedgerConfig config = new ManagedLedgerConfig();

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java

+4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import java.lang.reflect.Field;
2122
import java.time.Duration;
2223
import java.util.concurrent.TimeUnit;
2324
import lombok.Cleanup;
@@ -98,6 +99,9 @@ public void testCurrentLedgerRolloverIfFull() throws Exception {
9899
});
99100

100101
// trigger a ledger rollover
102+
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
103+
stateUpdater.setAccessible(true);
104+
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
101105
managedLedger.rollCurrentLedgerIfFull();
102106

103107
// the last ledger will be closed and removed and we have one ledger for empty

pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,11 @@ public void testRecoverSequenceId(boolean isUseManagedLedger) throws Exception {
157157
Position position = managedLedger.getLastConfirmedEntry();
158158

159159
if (isUseManagedLedger) {
160+
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
161+
stateUpdater.setAccessible(true);
162+
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
163+
managedLedger.rollCurrentLedgerIfFull();
160164
Awaitility.await().until(() -> {
161-
managedLedger.rollCurrentLedgerIfFull();
162165
return !managedLedger.ledgerExists(position.getLedgerId());
163166
});
164167
}

0 commit comments

Comments
 (0)