Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,13 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
return err
}

// Create or delete the THW if needed.
if cfg.AllowMsgTTL && fs.ttls == nil {
fs.ttls = thw.NewHashWheel()
} else if !cfg.AllowMsgTTL && fs.ttls != nil {
fs.ttls = nil
}

// Limits checks and enforcement.
fs.enforceMsgLimit()
fs.enforceBytesLimit()
Expand All @@ -666,7 +673,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
}
fs.mu.Unlock()

if cfg.MaxAge != 0 {
if cfg.MaxAge != 0 || cfg.AllowMsgTTL {
fs.expireMsgs()
}
return nil
Expand Down
20 changes: 20 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9607,3 +9607,23 @@ func TestFileStoreAccessTimeSpinUp(t *testing.T) {
ngra := runtime.NumGoroutine()
require_Equal(t, ngr, ngra)
}

func TestFileStoreUpdateConfigTTLState(t *testing.T) {
cfg := StreamConfig{
Name: "zzz",
Subjects: []string{">"},
Storage: FileStorage,
}
fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, cfg)
require_NoError(t, err)
defer fs.Stop()
require_Equal(t, fs.ttls, nil)

cfg.AllowMsgTTL = true
require_NoError(t, fs.UpdateConfig(&cfg))
require_NotEqual(t, fs.ttls, nil)

cfg.AllowMsgTTL = false
require_NoError(t, fs.UpdateConfig(&cfg))
require_Equal(t, fs.ttls, nil)
}
8 changes: 7 additions & 1 deletion server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {

ms.mu.Lock()
ms.cfg = *cfg
// Create or delete the THW if needed.
if cfg.AllowMsgTTL && ms.ttls == nil {
ms.ttls = thw.NewHashWheel()
} else if !cfg.AllowMsgTTL && ms.ttls != nil {
ms.ttls = nil
}
// Limits checks and enforcement.
ms.enforceMsgLimit()
ms.enforceBytesLimit()
Expand Down Expand Up @@ -112,7 +118,7 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
}
ms.mu.Unlock()

if cfg.MaxAge != 0 {
if cfg.MaxAge != 0 || cfg.AllowMsgTTL {
ms.expireMsgs()
}
return nil
Expand Down
20 changes: 20 additions & 0 deletions server/memstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1292,6 +1292,26 @@ func TestMemStoreAllLastSeqs(t *testing.T) {
require_True(t, reflect.DeepEqual(seqs, expected))
}

func TestMemStoreUpdateConfigTTLState(t *testing.T) {
cfg := &StreamConfig{
Name: "zzz",
Subjects: []string{">"},
Storage: MemoryStorage,
}
ms, err := newMemStore(cfg)
require_NoError(t, err)
defer ms.Stop()
require_Equal(t, ms.ttls, nil)

cfg.AllowMsgTTL = true
require_NoError(t, ms.UpdateConfig(cfg))
require_NotEqual(t, ms.ttls, nil)

cfg.AllowMsgTTL = false
require_NoError(t, ms.UpdateConfig(cfg))
require_Equal(t, ms.ttls, nil)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
48 changes: 48 additions & 0 deletions server/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package server
import (
"fmt"
"testing"
"time"
)

func testAllStoreAllPermutations(t *testing.T, compressionAndEncryption bool, cfg StreamConfig, fn func(t *testing.T, fs StreamStore)) {
Expand Down Expand Up @@ -535,3 +536,50 @@ func TestStorePurgeExZero(t *testing.T) {
},
)
}

func TestStoreUpdateConfigTTLState(t *testing.T) {
config := func() StreamConfig {
return StreamConfig{Name: "TEST", Subjects: []string{"foo"}}
}
testAllStoreAllPermutations(
t, false, config(),
func(t *testing.T, fs StreamStore) {
cfg := config()
switch fs.(type) {
case *fileStore:
cfg.Storage = FileStorage
case *memStore:
cfg.Storage = MemoryStorage
}

// TTLs disabled at this point so this message should survive.
seq, _, err := fs.StoreMsg("foo", nil, nil, 1)
require_NoError(t, err)
time.Sleep(2 * time.Second)
_, err = fs.LoadMsg(seq, nil)
require_NoError(t, err)

// Now enable TTLs.
cfg.AllowMsgTTL = true
require_NoError(t, fs.UpdateConfig(&cfg))

// TTLs enabled at this point so this message should be cleaned up.
seq, _, err = fs.StoreMsg("foo", nil, nil, 1)
require_NoError(t, err)
time.Sleep(2 * time.Second)
_, err = fs.LoadMsg(seq, nil)
require_Error(t, err)

// Now disable TTLs again.
cfg.AllowMsgTTL = false
require_NoError(t, fs.UpdateConfig(&cfg))

// TTLs disabled again so this message should survive.
seq, _, err = fs.StoreMsg("foo", nil, nil, 1)
require_NoError(t, err)
time.Sleep(2 * time.Second)
_, err = fs.LoadMsg(seq, nil)
require_NoError(t, err)
},
)
}
Loading