From 89441f46b98a89ed744ae7492d35ea015dc5bf82 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 6 Nov 2025 14:59:07 +0000 Subject: [PATCH] Add `meta_compact_size` option to further control JetStream meta group compaction/snapshotting Signed-off-by: Neil Twigg --- server/jetstream_cluster.go | 8 +++- server/jetstream_cluster_4_test.go | 74 ++++++++++++++++++++++++++++-- server/opts.go | 10 ++++ server/reload.go | 2 +- 4 files changed, 86 insertions(+), 8 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 1cabec12461..56fffc0ed5a 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1397,10 +1397,14 @@ func (js *jetStream) monitorCluster() { } // Look up what the threshold is for compaction. Re-reading from config here as it is reloadable. js.srv.optsMu.RLock() - thresh := js.srv.opts.JetStreamMetaCompact + ethresh := js.srv.opts.JetStreamMetaCompact + szthresh := js.srv.opts.JetStreamMetaCompactSize js.srv.optsMu.RUnlock() + // Work out our criteria for snapshotting. + byEntries, bySize := ethresh > 0, szthresh > 0 + byNeither := !byEntries && !bySize // For the meta layer we want to snapshot when over the above threshold (which could be 0 by default). - if ne, _ := n.Size(); force || ne > thresh || n.NeedSnapshot() { + if ne, nsz := n.Size(); force || byNeither || (byEntries && ne > ethresh) || (bySize && nsz > szthresh) || n.NeedSnapshot() { snap, err := js.metaSnapshot() if err != nil { s.Warnf("Error generating JetStream cluster snapshot: %v", err) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 61015f8b978..ea952989648 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -21,6 +21,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "math/big" "math/rand" "os" @@ -7162,13 +7163,13 @@ func TestJetStreamClusterAccountMaxConnectionsReconnect(t *testing.T) { } func TestJetStreamClusterMetaCompactThreshold(t *testing.T) { - for _, thres := range []uint64{0, 5, 10} { - t.Run(fmt.Sprintf("%d", thres), func(t *testing.T) { + for _, thresh := range []uint64{0, 5, 10} { + t.Run(fmt.Sprintf("%d", thresh), func(t *testing.T) { c := createJetStreamClusterExplicit(t, "R1TEST", 3) defer c.shutdown() for _, s := range c.servers { s.optsMu.Lock() - s.opts.JetStreamMetaCompact = thres + s.opts.JetStreamMetaCompact = thresh s.optsMu.Unlock() } @@ -7179,7 +7180,9 @@ func TestJetStreamClusterMetaCompactThreshold(t *testing.T) { _, cc := leader.getJetStreamCluster() rg := cc.meta.(*raft) - for i := range uint64(15) { + // We will get nowhere near math.MaxInt, as we will hit the + // compaction threshold and return early, but keeps "i" moving up. + for i := range math.MaxInt { rg.RLock() papplied := rg.papplied rg.RUnlock() @@ -7196,7 +7199,7 @@ func TestJetStreamClusterMetaCompactThreshold(t *testing.T) { cc.meta.(*raft).leadc <- true // Should we have compacted on this iteration? - if entries > thres { + if entries > thresh { checkFor(t, time.Second, 5*time.Millisecond, func() error { rg.RLock() npapplied := rg.papplied @@ -7208,6 +7211,67 @@ func TestJetStreamClusterMetaCompactThreshold(t *testing.T) { }) entries, _ = cc.meta.Size() require_Equal(t, entries, 0) + return + } + } + }) + } +} + +func TestJetStreamClusterMetaCompactSizeThreshold(t *testing.T) { + for _, othresh := range []any{int64(1), "4K", "32K", "1M"} { + t.Run(fmt.Sprintf("%v", othresh), func(t *testing.T) { + it, err := getStorageSize(othresh) + require_NoError(t, err) + thresh := uint64(it) + + c := createJetStreamClusterExplicit(t, "R1TEST", 3) + defer c.shutdown() + for _, s := range c.servers { + s.optsMu.Lock() + s.opts.JetStreamMetaCompactSize = thresh + s.optsMu.Unlock() + } + + nc, _ := jsClientConnect(t, c.servers[0]) + defer nc.Close() + + leader := c.leader() + _, cc := leader.getJetStreamCluster() + rg := cc.meta.(*raft) + + // We will get nowhere near math.MaxInt, as we will hit the + // compaction threshold and return early, but keeps "i" moving up. + for i := range math.MaxInt { + rg.RLock() + papplied := rg.papplied + rg.RUnlock() + + jsStreamCreate(t, nc, &StreamConfig{ + Name: fmt.Sprintf("test_%d", i), + Subjects: []string{fmt.Sprintf("test.%d", i)}, + Storage: MemoryStorage, + }) + + // Kicking the leader change channel is the easiest way to + // trick monitorCluster() into calling doSnapshot(). + _, size := cc.meta.Size() + cc.meta.(*raft).leadc <- true + + // Should we have compacted on this iteration? + if size > thresh { + checkFor(t, time.Second, 5*time.Millisecond, func() error { + rg.RLock() + npapplied := rg.papplied + rg.RUnlock() + if npapplied <= papplied { + return fmt.Errorf("haven't snapshotted yet (%d <= %d)", npapplied, papplied) + } + return nil + }) + _, size = cc.meta.Size() + require_Equal(t, size, 0) + return } } }) diff --git a/server/opts.go b/server/opts.go index a3507a4a324..24e85f0ff45 100644 --- a/server/opts.go +++ b/server/opts.go @@ -387,6 +387,7 @@ type Options struct { JetStreamMaxCatchup int64 JetStreamRequestQueueLimit int64 JetStreamMetaCompact uint64 + JetStreamMetaCompactSize uint64 StreamMaxBufferedMsgs int `json:"-"` StreamMaxBufferedSize int64 `json:"-"` StoreDir string `json:"-"` @@ -2635,6 +2636,15 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er return &configErr{tk, fmt.Sprintf("Expected an absolute size for %q, got %v", mk, mv)} } opts.JetStreamMetaCompact = uint64(thres) + case "meta_compact_size": + s, err := getStorageSize(mv) + if err != nil { + return &configErr{tk, fmt.Sprintf("%s %s", strings.ToLower(mk), err)} + } + if s < 0 { + return &configErr{tk, fmt.Sprintf("Expected an absolute size for %q, got %v", mk, mv)} + } + opts.JetStreamMetaCompactSize = uint64(s) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/reload.go b/server/reload.go index f8af878f206..c727af58ba9 100644 --- a/server/reload.go +++ b/server/reload.go @@ -1659,7 +1659,7 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { return nil, fmt.Errorf("config reload not supported for jetstream max memory and store") } } - case "jetstreammetacompact": + case "jetstreammetacompact", "jetstreammetacompactsize": // Allowed at runtime but monitorCluster looks at s.opts directly, so no further work needed here. case "websocket": // Similar to gateways