diff --git a/server/filestore.go b/server/filestore.go index e487c454523..0a71b1a3cd8 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -4756,6 +4756,17 @@ func (fs *fileStore) enforceMsgLimit() { return } for nmsgs := fs.state.Msgs; nmsgs > uint64(fs.cfg.MaxMsgs); nmsgs = fs.state.Msgs { + // If the first block can be removed fully, purge it entirely without needing to walk sequences. + if len(fs.blks) > 0 { + fmb := fs.blks[0] + fmb.mu.RLock() + msgs := fmb.msgs + fmb.mu.RUnlock() + if nmsgs-msgs > uint64(fs.cfg.MaxMsgs) { + fs.purgeMsgBlock(fmb) + continue + } + } if removed, err := fs.deleteFirstMsg(); err != nil || !removed { fs.rebuildFirst() return @@ -4773,6 +4784,17 @@ func (fs *fileStore) enforceBytesLimit() { return } for bs := fs.state.Bytes; bs > uint64(fs.cfg.MaxBytes); bs = fs.state.Bytes { + // If the first block can be removed fully, purge it entirely without needing to walk sequences. + if len(fs.blks) > 0 { + fmb := fs.blks[0] + fmb.mu.RLock() + bytes := fmb.bytes + fmb.mu.RUnlock() + if bs-bytes > uint64(fs.cfg.MaxBytes) { + fs.purgeMsgBlock(fmb) + continue + } + } if removed, err := fs.deleteFirstMsg(); err != nil || !removed { fs.rebuildFirst() return @@ -9347,6 +9369,25 @@ func (fs *fileStore) forceRemoveMsgBlock(mb *msgBlock) { fs.removeMsgBlockFromList(mb) } +// Purges and removes the msgBlock from the store. +// Lock should be held. +func (fs *fileStore) purgeMsgBlock(mb *msgBlock) { + mb.mu.Lock() + // Update top level accounting. + msgs, bytes := mb.msgs, mb.bytes + if msgs > fs.state.Msgs { + msgs = fs.state.Msgs + } + if bytes > fs.state.Bytes { + bytes = fs.state.Bytes + } + fs.state.Msgs -= msgs + fs.state.Bytes -= bytes + fs.removeMsgBlock(mb) + mb.mu.Unlock() + fs.selectNextFirst() +} + // Called by purge to simply get rid of the cache and close our fds. // Lock should not be held. func (mb *msgBlock) dirtyClose() { diff --git a/server/filestore_test.go b/server/filestore_test.go index 2d9c14e087f..bf6675174cc 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -10963,3 +10963,44 @@ func TestFileStoreEraseMsgErr(t *testing.T) { fs.EraseMsg(2) }) } + +func TestFileStorePurgeMsgBlock(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fcfg.BlockSize = 10 * 33 + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + for range 20 { + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + } + + fs.mu.RLock() + blks := len(fs.blks) + fs.mu.RUnlock() + require_Equal(t, blks, 2) + + state := fs.State() + require_Equal(t, state.FirstSeq, 1) + require_Equal(t, state.LastSeq, 20) + require_Equal(t, state.Msgs, 20) + require_Equal(t, state.Bytes, 20*33) + + // Purging the block should both remove the block and do the accounting. + fmb := fs.getFirstBlock() + fs.mu.Lock() + fs.purgeMsgBlock(fmb) + blks = len(fs.blks) + fs.mu.Unlock() + + require_Equal(t, blks, 1) + state = fs.State() + require_Equal(t, state.FirstSeq, 11) + require_Equal(t, state.LastSeq, 20) + require_Equal(t, state.Msgs, 10) + require_Equal(t, state.Bytes, 10*33) + }) +}