From ffbdb107c971f5962175d32d38aeef52225c81cf Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 13 Oct 2025 17:51:21 -0600 Subject: [PATCH 1/4] [FIXED] Trusted Proxies: Reload log message report added keys as removed When doing a configuration reload and some keys are removed, there is a log message such as: ``` Reloaded: proxies trusted keys ["xxx", "yyy"] were removed ``` However, the list contains the added keys, not the removed ones. The log statement about the added ones is correct. Signed-off-by: Ivan Kozlovic --- server/reload.go | 2 +- test/client_auth_test.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/server/reload.go b/server/reload.go index 4a49026a661..89594d1b9e3 100644 --- a/server/reload.go +++ b/server/reload.go @@ -1077,7 +1077,7 @@ func (p *proxiesReload) Apply(s *Server) { c.setAuthError(ErrAuthProxyNotTrusted) c.authViolation() } - s.Noticef("Reloaded: proxies trusted keys %q were removed", p.add) + s.Noticef("Reloaded: proxies trusted keys %q were removed", p.del) } if len(p.add) > 0 { s.Noticef("Reloaded: proxies trusted keys %q were added", p.add) diff --git a/test/client_auth_test.go b/test/client_auth_test.go index 51e8cb02129..7402960e6fd 100644 --- a/test/client_auth_test.go +++ b/test/client_auth_test.go @@ -18,6 +18,7 @@ import ( "fmt" "net" "os" + "strings" "testing" "time" @@ -160,6 +161,18 @@ func TestClientConnectInfo(t *testing.T) { } } +type captureProxiesReloadLogger struct { + dummyLogger + ch chan string +} + +func (l *captureProxiesReloadLogger) Noticef(format string, args ...any) { + msg := fmt.Sprintf(format, args...) + if strings.Contains(msg, "proxies trusted keys") { + l.ch <- msg + } +} + func TestProxyKeyVerification(t *testing.T) { u1, _ := nkeys.CreateUser() u1Pub, _ := u1.PublicKey() @@ -343,10 +356,32 @@ func TestProxyKeyVerification(t *testing.T) { cid2 := currentCID checkLeafNodeConnected(t, s) + logger := &captureProxiesReloadLogger{ch: make(chan string, 10)} + s.SetLogger(logger, false, false) + os.WriteFile(conf, fmt.Appendf(nil, tmpl, u3Pub, u2Pub), 0660) if err := s.Reload(); err != nil { t.Fatalf("Reload failed: %v", err) } + for range 2 { + select { + case str := <-logger.ch: + if strings.Contains(str, "removed") { + if !strings.Contains(str, u1Pub) { + t.Fatalf("Expected removed trace to include %q, it did not: %s", u1Pub, str) + } + } else if strings.Contains(str, "added") { + if !strings.Contains(str, u3Pub) { + t.Fatalf("Expected added trace to include %q, it did not: %s", u3Pub, str) + } + } else { + t.Fatalf("Unexpected log: %q", str) + } + default: + t.Fatal("Expected a log, did not get one") + } + } + // Connections should get disconnected. // We need to consume what is sent by the server, but for leaf we may // get some LS+, etc... so just consumer until we get the io.EOF From bc7407a8643aa06c14fb9eea777555bffc9559d5 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Tue, 14 Oct 2025 09:49:45 +0100 Subject: [PATCH 2/4] Update to Go 1.25.3/1.24.9 Signed-off-by: Neil Twigg --- .goreleaser.yml | 2 +- go.mod | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.goreleaser.yml b/.goreleaser.yml index a7df5691236..a20f523dd82 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -21,7 +21,7 @@ builds: env: # This is the toolchain version we use for releases. To override, set the env var, e.g.: # GORELEASER_TOOLCHAIN="go1.22.8" TARGET='linux_amd64' goreleaser build --snapshot --clean --single-target - - GOTOOLCHAIN={{ envOrDefault "GORELEASER_TOOLCHAIN" "go1.25.2" }} + - GOTOOLCHAIN={{ envOrDefault "GORELEASER_TOOLCHAIN" "go1.25.3" }} - GO111MODULE=on - CGO_ENABLED=0 goos: diff --git a/go.mod b/go.mod index 59a01685d37..7f127aca4c2 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module github.com/nats-io/nats-server/v2 go 1.24.0 -toolchain go1.24.8 +toolchain go1.24.9 require ( github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op From 18007b87e5b5cd336c8cb8f7e22bcf70a600483f Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 14 Oct 2025 11:38:48 +0200 Subject: [PATCH 3/4] [IMPROVED] JSZ Raft leader stats Signed-off-by: Maurice van Veen --- server/jetstream_cluster_1_test.go | 52 ++++++++++++++++++++++++++++++ server/monitor.go | 36 ++++++++++++++------- 2 files changed, 76 insertions(+), 12 deletions(-) diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 6d1ca28e4b3..e480db7080c 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -10395,6 +10395,58 @@ func TestJetStreamClusterCatchupSkipMsgDesync(t *testing.T) { } } +func TestJetStreamClusterJszRaftLeaderReporting(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "DURABLE", + Replicas: 3, + }) + require_NoError(t, err) + + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + for _, s := range c.servers { + _, _, jsa := s.globalAccount().getJetStreamFromAccount() + if !jsa.streamAssigned("TEST") { + return fmt.Errorf("stream not assigned on %s", s.Name()) + } + if !jsa.consumerAssigned("TEST", "DURABLE") { + return fmt.Errorf("consumer not assigned on %s", s.Name()) + } + } + return nil + }) + + sl := c.streamLeader(globalAccountName, "TEST") + cl := c.consumerLeader(globalAccountName, "TEST", "DURABLE") + + for _, s := range c.servers { + jsi, err := s.Jsz(&JSzOptions{RaftGroups: true}) + require_NoError(t, err) + if s == sl { + require_Equal(t, jsi.StreamsLeader, 1) + } else { + require_Equal(t, jsi.StreamsLeader, 0) + } + if s == cl { + require_Equal(t, jsi.ConsumersLeader, 1) + } else { + require_Equal(t, jsi.ConsumersLeader, 0) + } + } +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/monitor.go b/server/monitor.go index c30e51ecdad..83a239d5300 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2966,18 +2966,20 @@ type MetaClusterInfo struct { // JSInfo has detailed information on JetStream. type JSInfo struct { JetStreamStats - ID string `json:"server_id"` - Now time.Time `json:"now"` - Disabled bool `json:"disabled,omitempty"` - Config JetStreamConfig `json:"config,omitempty"` - Limits *JSLimitOpts `json:"limits,omitempty"` - Streams int `json:"streams"` - Consumers int `json:"consumers"` - Messages uint64 `json:"messages"` - Bytes uint64 `json:"bytes"` - Meta *MetaClusterInfo `json:"meta_cluster,omitempty"` - AccountDetails []*AccountDetail `json:"account_details,omitempty"` - Total int `json:"total"` + ID string `json:"server_id"` + Now time.Time `json:"now"` + Disabled bool `json:"disabled,omitempty"` + Config JetStreamConfig `json:"config,omitempty"` + Limits *JSLimitOpts `json:"limits,omitempty"` + Streams int `json:"streams"` + StreamsLeader int `json:"streams_leader,omitempty"` + Consumers int `json:"consumers"` + ConsumersLeader int `json:"consumers_leader,omitempty"` + Messages uint64 `json:"messages"` + Bytes uint64 `json:"bytes"` + Meta *MetaClusterInfo `json:"meta_cluster,omitempty"` + AccountDetails []*AccountDetail `json:"account_details,omitempty"` + Total int `json:"total"` } func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, optRaft, optStreamLeader bool) *AccountDetail { @@ -3197,6 +3199,16 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) { jsi.Messages += streamState.Msgs jsi.Bytes += streamState.Bytes jsi.Consumers += streamState.Consumers + if opts.RaftGroups { + if node := stream.raftNode(); node == nil || node.Leader() { + jsi.StreamsLeader++ + } + for _, consumer := range stream.getPublicConsumers() { + if node := consumer.raftNode(); node == nil || node.Leader() { + jsi.ConsumersLeader++ + } + } + } } } From 350d9d7ebb3dfbb54fa769398fa3dc13c44ff4c7 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 14 Oct 2025 12:53:14 +0200 Subject: [PATCH 4/4] [FIXED] Filestore unlock when message erase fails Signed-off-by: Maurice van Veen --- server/filestore.go | 5 +++++ server/filestore_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/server/filestore.go b/server/filestore.go index c0c7361c00c..2ac494f853a 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5086,10 +5086,15 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( // Grab record info, but use the pre-computed record length. ri, _, _, err := mb.slotInfo(int(seq - mb.cache.fseq)) if err != nil { + mb.finishedWithCache() + mb.mu.Unlock() + fsUnlock() return false, err } if err := mb.eraseMsg(seq, int(ri), int(msz), isLastBlock); err != nil { mb.finishedWithCache() + mb.mu.Unlock() + fsUnlock() return false, err } } diff --git a/server/filestore_test.go b/server/filestore_test.go index 80fafdec9c7..2d9c14e087f 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -10937,3 +10937,29 @@ func TestFileStoreDetectDeleteGapWithOnlySkipMsg(t *testing.T) { require_Len(t, mb.dmap.Size(), 0) }) } + +func TestFileStoreEraseMsgErr(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + + mb := fs.getFirstBlock() + mb.mu.Lock() + if mb.cache == nil { + mb.mu.Unlock() + t.Fatal("Expected cache to be initialized") + } + // Set to a bogus value such that the file rename fails while performing the message erase. + mb.mfn = _EMPTY_ + mb.mu.Unlock() + fs.EraseMsg(2) + }) +}