Skip to content

Commit

Permalink
[improve][broker] Don't rollover empty ledgers based on inactivity (#…
Browse files Browse the repository at this point in the history
…21893)

### Motivation

When `managedLedgerInactiveLedgerRolloverTimeSeconds` is set, let's say to `300` (5 minutes), the ledger will also get rolled in the case when no new entries (messages) were added to the ledger. This doesn't make sense.
Empty ledgers are deleted, but having this extra churn is causing extra load on brokers, bookies, and metadata stores (zookeeper).

### Modifications

Skip rolling the ledger if it is empty.

(cherry picked from commit 49edc3d)
  • Loading branch information
lhotari committed Jan 15, 2024
1 parent cde35ef commit 0519a5b
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -4460,7 +4460,8 @@ private void cancelScheduledTasks() {
@Override
public boolean checkInactiveLedgerAndRollOver() {
long currentTimeMs = System.currentTimeMillis();
if (inactiveLedgerRollOverTimeMs > 0 && currentTimeMs > (lastAddEntryTimeMs + inactiveLedgerRollOverTimeMs)) {
if (currentLedgerEntries > 0 && inactiveLedgerRollOverTimeMs > 0 && currentTimeMs > (lastAddEntryTimeMs
+ inactiveLedgerRollOverTimeMs)) {
log.info("[{}] Closing inactive ledger, last-add entry {}", name, lastAddEntryTimeMs);
if (STATE_UPDATER.compareAndSet(this, State.LedgerOpened, State.ClosingLedger)) {
LedgerHandle currentLedger = this.currentLedger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3898,6 +3898,30 @@ public void testInactiveLedgerRollOver() throws Exception {
factory.shutdown();
}

@Test
public void testDontRollOverEmptyInactiveLedgers() throws Exception {
int inactiveLedgerRollOverTimeMs = 5;
ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
@Cleanup("shutdown")
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setInactiveLedgerRollOverTime(inactiveLedgerRollOverTimeMs, TimeUnit.MILLISECONDS);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("rollover_inactive", config);
ManagedCursor cursor = ledger.openCursor("c1");

long ledgerId = ledger.currentLedger.getId();

Thread.sleep(inactiveLedgerRollOverTimeMs * 5);
ledger.checkInactiveLedgerAndRollOver();

Thread.sleep(inactiveLedgerRollOverTimeMs * 5);
ledger.checkInactiveLedgerAndRollOver();

assertEquals(ledger.currentLedger.getId(), ledgerId);

ledger.close();
}

@Test
public void testOffloadTaskCancelled() throws Exception {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
Expand Down Expand Up @@ -4093,6 +4117,7 @@ public void testNonDurableCursorCreateForInactiveLedger() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setInactiveLedgerRollOverTime(10, TimeUnit.MILLISECONDS);
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, config);
ml.addEntry("entry".getBytes(UTF_8));

MutableBoolean isRolledOver = new MutableBoolean(false);
retryStrategically((test) -> {
Expand Down

0 comments on commit 0519a5b

Please sign in to comment.