Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1397,10 +1397,14 @@ func (js *jetStream) monitorCluster() {
}
// Look up what the threshold is for compaction. Re-reading from config here as it is reloadable.
js.srv.optsMu.RLock()
thresh := js.srv.opts.JetStreamMetaCompact
ethresh := js.srv.opts.JetStreamMetaCompact
szthresh := js.srv.opts.JetStreamMetaCompactSize
js.srv.optsMu.RUnlock()
// Work out our criteria for snapshotting.
byEntries, bySize := ethresh > 0, szthresh > 0
byNeither := !byEntries && !bySize
// For the meta layer we want to snapshot when over the above threshold (which could be 0 by default).
if ne, _ := n.Size(); force || ne > thresh || n.NeedSnapshot() {
if ne, nsz := n.Size(); force || byNeither || (byEntries && ne > ethresh) || (bySize && nsz > szthresh) || n.NeedSnapshot() {
snap, err := js.metaSnapshot()
if err != nil {
s.Warnf("Error generating JetStream cluster snapshot: %v", err)
Expand Down
74 changes: 69 additions & 5 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"math/big"
"math/rand"
"os"
Expand Down Expand Up @@ -7162,13 +7163,13 @@ func TestJetStreamClusterAccountMaxConnectionsReconnect(t *testing.T) {
}

func TestJetStreamClusterMetaCompactThreshold(t *testing.T) {
for _, thres := range []uint64{0, 5, 10} {
t.Run(fmt.Sprintf("%d", thres), func(t *testing.T) {
for _, thresh := range []uint64{0, 5, 10} {
t.Run(fmt.Sprintf("%d", thresh), func(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R1TEST", 3)
defer c.shutdown()
for _, s := range c.servers {
s.optsMu.Lock()
s.opts.JetStreamMetaCompact = thres
s.opts.JetStreamMetaCompact = thresh
s.optsMu.Unlock()
}

Expand All @@ -7179,7 +7180,9 @@ func TestJetStreamClusterMetaCompactThreshold(t *testing.T) {
_, cc := leader.getJetStreamCluster()
rg := cc.meta.(*raft)

for i := range uint64(15) {
// We will get nowhere near math.MaxInt, as we will hit the
// compaction threshold and return early, but keeps "i" moving up.
for i := range math.MaxInt {
rg.RLock()
papplied := rg.papplied
rg.RUnlock()
Expand All @@ -7196,7 +7199,7 @@ func TestJetStreamClusterMetaCompactThreshold(t *testing.T) {
cc.meta.(*raft).leadc <- true

// Should we have compacted on this iteration?
if entries > thres {
if entries > thresh {
checkFor(t, time.Second, 5*time.Millisecond, func() error {
rg.RLock()
npapplied := rg.papplied
Expand All @@ -7208,6 +7211,67 @@ func TestJetStreamClusterMetaCompactThreshold(t *testing.T) {
})
entries, _ = cc.meta.Size()
require_Equal(t, entries, 0)
return
}
}
})
}
}

func TestJetStreamClusterMetaCompactSizeThreshold(t *testing.T) {
for _, othresh := range []any{int64(1), "4K", "32K", "1M"} {
t.Run(fmt.Sprintf("%v", othresh), func(t *testing.T) {
it, err := getStorageSize(othresh)
require_NoError(t, err)
thresh := uint64(it)

c := createJetStreamClusterExplicit(t, "R1TEST", 3)
defer c.shutdown()
for _, s := range c.servers {
s.optsMu.Lock()
s.opts.JetStreamMetaCompactSize = thresh
s.optsMu.Unlock()
}

nc, _ := jsClientConnect(t, c.servers[0])
defer nc.Close()

leader := c.leader()
_, cc := leader.getJetStreamCluster()
rg := cc.meta.(*raft)

// We will get nowhere near math.MaxInt, as we will hit the
// compaction threshold and return early, but keeps "i" moving up.
for i := range math.MaxInt {
rg.RLock()
papplied := rg.papplied
rg.RUnlock()

jsStreamCreate(t, nc, &StreamConfig{
Name: fmt.Sprintf("test_%d", i),
Subjects: []string{fmt.Sprintf("test.%d", i)},
Storage: MemoryStorage,
})

// Kicking the leader change channel is the easiest way to
// trick monitorCluster() into calling doSnapshot().
_, size := cc.meta.Size()
cc.meta.(*raft).leadc <- true

// Should we have compacted on this iteration?
if size > thresh {
checkFor(t, time.Second, 5*time.Millisecond, func() error {
rg.RLock()
npapplied := rg.papplied
rg.RUnlock()
if npapplied <= papplied {
return fmt.Errorf("haven't snapshotted yet (%d <= %d)", npapplied, papplied)
}
return nil
})
_, size = cc.meta.Size()
require_Equal(t, size, 0)
return
}
}
})
Expand Down
10 changes: 10 additions & 0 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ type Options struct {
JetStreamMaxCatchup int64
JetStreamRequestQueueLimit int64
JetStreamMetaCompact uint64
JetStreamMetaCompactSize uint64
StreamMaxBufferedMsgs int `json:"-"`
StreamMaxBufferedSize int64 `json:"-"`
StoreDir string `json:"-"`
Expand Down Expand Up @@ -2635,6 +2636,15 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er
return &configErr{tk, fmt.Sprintf("Expected an absolute size for %q, got %v", mk, mv)}
}
opts.JetStreamMetaCompact = uint64(thres)
case "meta_compact_size":
s, err := getStorageSize(mv)
if err != nil {
return &configErr{tk, fmt.Sprintf("%s %s", strings.ToLower(mk), err)}
}
if s < 0 {
return &configErr{tk, fmt.Sprintf("Expected an absolute size for %q, got %v", mk, mv)}
}
opts.JetStreamMetaCompactSize = uint64(s)
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
Expand Down
2 changes: 1 addition & 1 deletion server/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -1659,7 +1659,7 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
return nil, fmt.Errorf("config reload not supported for jetstream max memory and store")
}
}
case "jetstreammetacompact":
case "jetstreammetacompact", "jetstreammetacompactsize":
// Allowed at runtime but monitorCluster looks at s.opts directly, so no further work needed here.
case "websocket":
// Similar to gateways
Expand Down