diff --git a/server/memstore.go b/server/memstore.go index de57cf10b1d..723ac4ee0d2 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -1182,6 +1182,7 @@ func (ms *memStore) purge(fseq uint64, _ /* noMarkers */ bool) (uint64, error) { }) } ms.fss = stree.NewSubjectTree[SimpleState]() + ms.dmap.Empty() sdmcb := ms.subjectDeleteMarkersAfterOperation(JSMarkerReasonPurge) ms.mu.Unlock() diff --git a/server/memstore_test.go b/server/memstore_test.go index a0389ff5aaa..c372817fce3 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -1160,6 +1160,40 @@ func TestMemStoreNumPendingBug(t *testing.T) { require_Equal(t, total, checkTotal) } +func TestMemStorePurgeLeaksDmap(t *testing.T) { + cfg := &StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: MemoryStorage, + } + ms, err := newMemStore(cfg) + require_NoError(t, err) + defer ms.Stop() + + for i := 0; i < 10; i++ { + _, _, err = ms.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + } + + for i := uint64(2); i <= 9; i++ { + _, err = ms.RemoveMsg(i) + require_NoError(t, err) + } + ms.mu.Lock() + dmaps := ms.dmap.Size() + ms.mu.Unlock() + require_Equal(t, dmaps, 8) + + purged, err := ms.Purge() + require_NoError(t, err) + require_Equal(t, purged, 2) + + ms.mu.Lock() + dmaps = ms.dmap.Size() + ms.mu.Unlock() + require_Equal(t, dmaps, 0) +} + func TestMemStoreMessageTTL(t *testing.T) { fs, err := newMemStore( &StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: MemoryStorage, AllowMsgTTL: true},