From b6cace2b2b6825367d3e8a1054e994ec9d7ab6e0 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 7 Jan 2026 18:17:20 +0100 Subject: [PATCH 01/10] [FIXED] Filestore can't open msg block after single truncated block Signed-off-by: Maurice van Veen --- server/filestore.go | 2 +- server/jetstream_test.go | 49 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/server/filestore.go b/server/filestore.go index d8e8fd55c7b..fd3233335b5 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2182,7 +2182,7 @@ func (fs *fileStore) recoverMsgs() error { if fs.ld != nil { var emptyBlks []*msgBlock for _, mb := range fs.blks { - if mb.msgs == 0 && mb.rbytes == 0 { + if mb.msgs == 0 && mb.rbytes == 0 && mb != fs.lmb { emptyBlks = append(emptyBlks, mb) } } diff --git a/server/jetstream_test.go b/server/jetstream_test.go index b893a566016..14f8e3175c3 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -21006,3 +21006,52 @@ func TestJetStreamServerEncryptionRecoveryWithoutStreamStateFile(t *testing.T) { }) } } + +func TestJetStreamFileStoreErrorOpeningBlockAfterTruncate(t *testing.T) { + storeDir := t.TempDir() + conf := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + jetstream: {store_dir: %q} + `, storeDir))) + + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + pubAck, err := js.Publish("foo", nil) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, 1) + + // Shut down the server and manually truncate the message blocks to be entirely empty, simulating data loss. + mset, err := s.globalAccount().lookupStream("TEST") + require_NoError(t, err) + fs := mset.store.(*fileStore) + blk := filepath.Join(fs.fcfg.StoreDir, msgDir, "1.blk") + index := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile) + nc.Close() + s.Shutdown() + + // Truncate the block such that it isn't fully empty, but doesn't contain any messages. + require_NoError(t, os.Truncate(blk, 1)) + require_NoError(t, os.Remove(index)) + + // Restart the server and reconnect. + s, _ = RunServerWithConfig(conf) + defer s.Shutdown() + nc, js = jsClientConnect(t, s) + defer nc.Close() + + // Publish another message. Due to the simulated data loss, the stream sequence should continue + // counting after truncating the corrupted data. + pubAck, err = js.Publish("foo", nil) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, 1) +} From ba22cd629bbdbc8d03ac9bd23636826e2ec97762 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 8 Jan 2026 08:05:05 +0100 Subject: [PATCH 02/10] [FIXED] Concurrent map iter/write for WQ unique consumer partition Signed-off-by: Maurice van Veen --- server/consumer.go | 4 +++- server/stream.go | 4 ++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/server/consumer.go b/server/consumer.go index 4527d5460e1..215112a71ef 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -941,11 +941,11 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } if cName != _EMPTY_ { if eo, ok := mset.consumers[cName]; ok { - mset.mu.Unlock() if action == ActionCreate { ocfg := eo.config() copyConsumerMetadata(config, &ocfg) if !reflect.DeepEqual(config, &ocfg) { + mset.mu.Unlock() return nil, NewJSConsumerAlreadyExistsError() } } @@ -953,9 +953,11 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri if cfg.Retention == WorkQueuePolicy { subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects) if !mset.partitionUnique(cName, subjects) { + mset.mu.Unlock() return nil, NewJSConsumerWQConsumerNotUniqueError() } } + mset.mu.Unlock() err := eo.updateConfig(config) if err == nil { return eo, nil diff --git a/server/stream.go b/server/stream.go index b90e88eaf9f..b2bc2c677a5 100644 --- a/server/stream.go +++ b/server/stream.go @@ -6311,14 +6311,18 @@ func (mset *stream) partitionUnique(name string, partitions []string) bool { if n == name { continue } + o.mu.RLock() if o.subjf == nil { + o.mu.RUnlock() return false } for _, filter := range o.subjf { if SubjectsCollide(partition, filter.subject) { + o.mu.RUnlock() return false } } + o.mu.RUnlock() } } return true From f4d47471609b10f4739bd6ef80078a911ea8803d Mon Sep 17 00:00:00 2001 From: Daniele Sciascia Date: Fri, 9 Jan 2026 11:08:46 +0100 Subject: [PATCH 03/10] Fix a jetStream / jsAccount lock ordering violation Method `addStreamWithAssignment` could cause a deadlock when acquiring the jetStream lock while holding the jsAccount lock, a lock ordering violation. This would cause a deadlock when attempting to shutdown and a a stream was created concurrrently. Easiliy reproduced when running `BenchmarkJetStreamCreate` benchmark. The jetstream lock was acquired to protect the following assignment: ``` mset.create = sa.Created ``` It appears unnecessary to lock jetstream for that, so the fix removes the locking. Signed-off-by: Daniele Sciascia --- server/stream.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/server/stream.go b/server/stream.go index b2bc2c677a5..e4153dff542 100644 --- a/server/stream.go +++ b/server/stream.go @@ -701,11 +701,19 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt ipqLimitByLen[*inMsg](mlen), ipqLimitBySize[*inMsg](msz), ), - gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"), - qch: make(chan struct{}), - mqch: make(chan struct{}), - uch: make(chan struct{}, 4), - sch: make(chan struct{}, 1), + gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"), + qch: make(chan struct{}), + mqch: make(chan struct{}), + uch: make(chan struct{}, 4), + sch: make(chan struct{}, 1), + created: time.Now().UTC(), + } + + // Add created timestamp used for the store, must match that of the stream assignment if it exists. + if sa != nil { + // The following assignment does not require mutex + // protection: sa.Created is immutable. + mset.created = sa.Created } // Start our signaling routine to process consumers. From 8804edb5c6e16962732d2669b3b5fca8af3fbe42 Mon Sep 17 00:00:00 2001 From: Daniele Sciascia Date: Thu, 8 Jan 2026 16:18:37 +0100 Subject: [PATCH 04/10] Add `tls_cert_not_after` to varz Expose the expiration dates of all TLS certificates in the varz monitor endpoint. Fixes #7684 Signed-off-by: Daniele Sciascia --- server/monitor.go | 77 +++++++++++++++++++++++++++++------------- server/monitor_test.go | 61 +++++++++++++++++++++++++++++---- 2 files changed, 108 insertions(+), 30 deletions(-) diff --git a/server/monitor.go b/server/monitor.go index 675de972ff9..ceaa96fad12 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1250,6 +1250,7 @@ type Varz struct { PinnedAccountFail uint64 `json:"pinned_account_fails,omitempty"` // PinnedAccountFail is how often user logon fails due to the issuer account not being pinned. OCSPResponseCache *OCSPResponseCacheVarz `json:"ocsp_peer_cache,omitempty"` // OCSPResponseCache is the state of the OCSP cache // OCSPResponseCache holds information about SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats"` // SlowConsumersStats is statistics about all detected Slow Consumer + TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate of this server } // JetStreamVarz contains basic runtime information about jetstream @@ -1262,34 +1263,36 @@ type JetStreamVarz struct { // ClusterOptsVarz contains monitoring cluster information type ClusterOptsVarz struct { - Name string `json:"name,omitempty"` // Name is the configured cluster name - Host string `json:"addr,omitempty"` // Host is the host the cluster listens on for connections - Port int `json:"cluster_port,omitempty"` // Port is the port the cluster listens on for connections - AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication - URLs []string `json:"urls,omitempty"` // URLs is the list of cluster URLs - TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete - TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections - TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed - PoolSize int `json:"pool_size,omitempty"` // PoolSize is the configured route connection pool size - WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete - WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors + Name string `json:"name,omitempty"` // Name is the configured cluster name + Host string `json:"addr,omitempty"` // Host is the host the cluster listens on for connections + Port int `json:"cluster_port,omitempty"` // Port is the port the cluster listens on for connections + AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication + URLs []string `json:"urls,omitempty"` // URLs is the list of cluster URLs + TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete + TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections + TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed + PoolSize int `json:"pool_size,omitempty"` // PoolSize is the configured route connection pool size + WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete + WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors + TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate } // GatewayOptsVarz contains monitoring gateway information type GatewayOptsVarz struct { - Name string `json:"name,omitempty"` // Name is the configured cluster name - Host string `json:"host,omitempty"` // Host is the host the gateway listens on for connections - Port int `json:"port,omitempty"` // Port is the post gateway connections listens on - AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication - TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete - TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections - TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed - Advertise string `json:"advertise,omitempty"` // Advertise is the URL advertised to remote gateway clients - ConnectRetries int `json:"connect_retries,omitempty"` // ConnectRetries is how many connection attempts the route will make - Gateways []RemoteGatewayOptsVarz `json:"gateways,omitempty"` // Gateways is state of configured gateway remotes - RejectUnknown bool `json:"reject_unknown,omitempty"` // RejectUnknown indicates if unknown cluster connections will be rejected - WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete - WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors + Name string `json:"name,omitempty"` // Name is the configured cluster name + Host string `json:"host,omitempty"` // Host is the host the gateway listens on for connections + Port int `json:"port,omitempty"` // Port is the post gateway connections listens on + AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication + TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete + TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections + TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed + Advertise string `json:"advertise,omitempty"` // Advertise is the URL advertised to remote gateway clients + ConnectRetries int `json:"connect_retries,omitempty"` // ConnectRetries is how many connection attempts the route will make + Gateways []RemoteGatewayOptsVarz `json:"gateways,omitempty"` // Gateways is state of configured gateway remotes + RejectUnknown bool `json:"reject_unknown,omitempty"` // RejectUnknown indicates if unknown cluster connections will be rejected + WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete + WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors + TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificaet } // RemoteGatewayOptsVarz contains monitoring remote gateway information @@ -1311,6 +1314,7 @@ type LeafNodeOptsVarz struct { TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be performed WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors + TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate } // DenyRules Contains lists of subjects not allowed to be imported/exported @@ -1341,6 +1345,7 @@ type MQTTOptsVarz struct { AckWait time.Duration `json:"ack_wait,omitempty"` // AckWait is how long the internal JetStream state store will allow acks to complete MaxAckPending uint16 `json:"max_ack_pending,omitempty"` // MaxAckPending is how many outstanding acks the internal JetStream state store will allow TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done + TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate } // WebsocketOptsVarz contains monitoring websocket information @@ -1359,6 +1364,7 @@ type WebsocketOptsVarz struct { AllowedOrigins []string `json:"allowed_origins,omitempty"` // AllowedOrigins list of configured trusted origins Compression bool `json:"compression,omitempty"` // Compression indicates if compression is supported TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done + TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate } // OCSPResponseCacheVarz contains OCSP response cache information @@ -1407,6 +1413,22 @@ func myUptime(d time.Duration) string { return fmt.Sprintf("%ds", tsecs) } +func tlsCertNotAfter(config *tls.Config) time.Time { + if config == nil || len(config.Certificates) == 0 { + return time.Time{} + } + cert := config.Certificates[0] + leaf := cert.Leaf + if leaf == nil { + var err error + leaf, err = x509.ParseCertificate(cert.Certificate[0]) + if err != nil { + return time.Time{} + } + } + return leaf.NotAfter +} + // HandleRoot will show basic info and links to others handlers. func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { // This feels dumb to me, but is required: https://code.google.com/p/go/issues/detail?id=4799 @@ -1730,6 +1752,13 @@ func (s *Server) updateVarzConfigReloadableFields(v *Varz) { v.Websocket.TLSPinnedCerts = getPinnedCertsAsSlice(opts.Websocket.TLSPinnedCerts) v.TLSOCSPPeerVerify = s.ocspPeerVerify && v.TLSRequired && s.opts.tlsConfigOpts != nil && s.opts.tlsConfigOpts.OCSPPeerConfig != nil && s.opts.tlsConfigOpts.OCSPPeerConfig.Verify + + v.TLSCertNotAfter = tlsCertNotAfter(opts.TLSConfig) + v.Cluster.TLSCertNotAfter = tlsCertNotAfter(opts.Cluster.TLSConfig) + v.Gateway.TLSCertNotAfter = tlsCertNotAfter(opts.Gateway.TLSConfig) + v.LeafNode.TLSCertNotAfter = tlsCertNotAfter(opts.LeafNode.TLSConfig) + v.MQTT.TLSCertNotAfter = tlsCertNotAfter(opts.MQTT.TLSConfig) + v.Websocket.TLSCertNotAfter = tlsCertNotAfter(opts.Websocket.TLSConfig) } func getPinnedCertsAsSlice(certs PinnedCertSet) []string { diff --git a/server/monitor_test.go b/server/monitor_test.go index 8fcd8b54711..4981907f1a1 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -2743,7 +2743,9 @@ func TestMonitorCluster(t *testing.T) { opts.Cluster.TLSConfig != nil, opts.Cluster.TLSConfig != nil, DEFAULT_ROUTE_POOL_SIZE, - 0, _EMPTY_, + 0, + _EMPTY_, + time.Time{}, } varzURL := fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port) @@ -2759,7 +2761,7 @@ func TestMonitorCluster(t *testing.T) { // Having this here to make sure that if fields are added in ClusterOptsVarz, // we make sure to update this test (compiler will report an error if we don't) - _ = ClusterOptsVarz{"", "", 0, 0, nil, 2, false, false, 0, 0, _EMPTY_} + _ = ClusterOptsVarz{"", "", 0, 0, nil, 2, false, false, 0, 0, _EMPTY_, time.Time{}} // Alter the fields to make sure that we have a proper deep copy // of what may be stored in the server. Anything we change here @@ -2914,7 +2916,9 @@ func TestMonitorGateway(t *testing.T) { opts.Gateway.ConnectRetries, []RemoteGatewayOptsVarz{{"B", 1, nil}}, opts.Gateway.RejectUnknown, - 0, _EMPTY_, + 0, + _EMPTY_, + time.Time{}, } // Since URLs array is not guaranteed to be always the same order, // we don't add it in the expected GatewayOptsVarz, instead we @@ -2952,7 +2956,7 @@ func TestMonitorGateway(t *testing.T) { // Having this here to make sure that if fields are added in GatewayOptsVarz, // we make sure to update this test (compiler will report an error if we don't) - _ = GatewayOptsVarz{"", "", 0, 0, 0, false, false, "", 0, []RemoteGatewayOptsVarz{{"", 0, nil}}, false, 0, "default"} + _ = GatewayOptsVarz{"", "", 0, 0, 0, false, false, "", 0, []RemoteGatewayOptsVarz{{"", 0, nil}}, false, 0, "default", time.Time{}} // Alter the fields to make sure that we have a proper deep copy // of what may be stored in the server. Anything we change here @@ -3138,7 +3142,9 @@ func TestMonitorLeafNode(t *testing.T) { }, }, false, - 0, _EMPTY_, + 0, + _EMPTY_, + time.Time{}, } varzURL := fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port) @@ -3163,7 +3169,7 @@ func TestMonitorLeafNode(t *testing.T) { // Having this here to make sure that if fields are added in ClusterOptsVarz, // we make sure to update this test (compiler will report an error if we don't) - _ = LeafNodeOptsVarz{"", 0, 0, 0, false, false, []RemoteLeafOptsVarz{{"", 0, nil, nil, false}}, false, 0, _EMPTY_} + _ = LeafNodeOptsVarz{"", 0, 0, 0, false, false, []RemoteLeafOptsVarz{{"", 0, nil, nil, false}}, false, 0, _EMPTY_, time.Time{}} // Alter the fields to make sure that we have a proper deep copy // of what may be stored in the server. Anything we change here @@ -6388,3 +6394,46 @@ func TestMonitorVarzJSApiLevel(t *testing.T) { apiLevel := varz.JetStream.Stats.API.Level require_Equal(t, apiLevel, JSApiLevel) } + +func TestMonitorVarzTLSCertEndDate(t *testing.T) { + resetPreviousHTTPConnections() + opts := DefaultMonitorOptions() + tlsConfig, err := GenTLSConfig( + &TLSConfigOpts{ + CertFile: "../test/configs/certs/server-cert.pem", + KeyFile: "../test/configs/certs/server-key.pem", + CaFile: "../test/configs/certs/ca.pem", + }) + if err != nil { + t.Fatalf("Error generating TLS config: %v", err) + } + + opts.TLSConfig = tlsConfig + opts.Cluster.TLSConfig = tlsConfig + opts.Gateway.TLSConfig = tlsConfig + opts.LeafNode.TLSConfig = tlsConfig + opts.MQTT.TLSConfig = tlsConfig + opts.Websocket.TLSConfig = tlsConfig + + s := RunServer(opts) + defer s.Shutdown() + + url := fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port) + v := pollVarz(t, s, 0, url, nil) + + expected := time.Date(2032, 8, 24, 20, 23, 02, 0, time.UTC) + + check := func(t *testing.T, notAfter time.Time) { + t.Helper() + if notAfter != expected { + t.Fatalf("Expected expiration date '%v', got '%v'", expected, notAfter) + } + } + + check(t, v.TLSCertNotAfter) + check(t, v.Cluster.TLSCertNotAfter) + check(t, v.Gateway.TLSCertNotAfter) + check(t, v.LeafNode.TLSCertNotAfter) + check(t, v.MQTT.TLSCertNotAfter) + check(t, v.Websocket.TLSCertNotAfter) +} From 7e5165d7ef827462b66bd9d9a47137fde6df4ce8 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Fri, 9 Jan 2026 17:45:13 +0000 Subject: [PATCH 05/10] Update copyright notices [skip ci] Signed-off-by: Neil Twigg --- server/auth_test.go | 2 +- server/certidp/ocsp_responder.go | 2 +- server/consumer.go | 2 +- server/filestore.go | 2 +- server/jetstream.go | 2 +- server/jetstream_api.go | 2 +- server/jetstream_consumer_test.go | 2 +- server/jetstream_test.go | 2 +- server/jetstream_versioning_test.go | 2 +- server/monitor.go | 2 +- server/mqtt_test.go | 2 +- server/raft.go | 2 +- server/raft_test.go | 2 +- server/stream.go | 2 +- server/util.go | 2 +- test/gateway_test.go | 2 +- 16 files changed, 16 insertions(+), 16 deletions(-) diff --git a/server/auth_test.go b/server/auth_test.go index 05c3402f7d6..1cbaf84fcae 100644 --- a/server/auth_test.go +++ b/server/auth_test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2024 The NATS Authors +// Copyright 2012-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/server/certidp/ocsp_responder.go b/server/certidp/ocsp_responder.go index ea2b614ed6c..e560d6e6a5b 100644 --- a/server/certidp/ocsp_responder.go +++ b/server/certidp/ocsp_responder.go @@ -1,4 +1,4 @@ -// Copyright 2023-2024 The NATS Authors +// Copyright 2023-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/server/consumer.go b/server/consumer.go index 215112a71ef..966f2a55204 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1,4 +1,4 @@ -// Copyright 2019-2025 The NATS Authors +// Copyright 2019-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/server/filestore.go b/server/filestore.go index fd3233335b5..3f32e6c59e4 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1,4 +1,4 @@ -// Copyright 2019-2025 The NATS Authors +// Copyright 2019-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/server/jetstream.go b/server/jetstream.go index 8a554d9ef2b..8cf4c6f4ee0 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1,4 +1,4 @@ -// Copyright 2019-2025 The NATS Authors +// Copyright 2019-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 484bd46d4aa..1ae1db0ae80 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1,4 +1,4 @@ -// Copyright 2020-2025 The NATS Authors +// Copyright 2020-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index c023b99005d..62dcc9d9c16 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -1,4 +1,4 @@ -// Copyright 2022-2025 The NATS Authors +// Copyright 2022-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 14f8e3175c3..e74703cb6b3 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -1,4 +1,4 @@ -// Copyright 2019-2025 The NATS Authors +// Copyright 2019-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index 996ede4a27c..dbcd36a5c6e 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -1,4 +1,4 @@ -// Copyright 2024-2025 The NATS Authors +// Copyright 2024-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/server/monitor.go b/server/monitor.go index ceaa96fad12..618af37ca56 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1,4 +1,4 @@ -// Copyright 2013-2025 The NATS Authors +// Copyright 2013-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 066ee270d75..1e8730a1329 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -1,4 +1,4 @@ -// Copyright 2020-2025 The NATS Authors +// Copyright 2020-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/server/raft.go b/server/raft.go index e18881baf8e..65c0963a3dc 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1,4 +1,4 @@ -// Copyright 2020-2025 The NATS Authors +// Copyright 2020-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/server/raft_test.go b/server/raft_test.go index 2f31f11c2a5..f4e4db7b908 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1,4 +1,4 @@ -// Copyright 2021-2025 The NATS Authors +// Copyright 2021-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/server/stream.go b/server/stream.go index e4153dff542..df0126f7b03 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1,4 +1,4 @@ -// Copyright 2019-2025 The NATS Authors +// Copyright 2019-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/server/util.go b/server/util.go index cf4bf67490f..98cc102fb30 100644 --- a/server/util.go +++ b/server/util.go @@ -1,4 +1,4 @@ -// Copyright 2012-2024 The NATS Authors +// Copyright 2012-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/test/gateway_test.go b/test/gateway_test.go index de9cd48c0fe..a56ca26fa1a 100644 --- a/test/gateway_test.go +++ b/test/gateway_test.go @@ -1,4 +1,4 @@ -// Copyright 2018-2024 The NATS Authors +// Copyright 2018-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at From e39f782ff5078f4638cb905c4130b896705e656c Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Mon, 12 Jan 2026 10:46:14 +0100 Subject: [PATCH 06/10] [FIXED] Mirror consumer data race Signed-off-by: Maurice van Veen --- server/stream.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/server/stream.go b/server/stream.go index df0126f7b03..1aabf820d65 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2878,7 +2878,6 @@ func (mset *stream) setupMirrorConsumer() error { } mirror := mset.mirror - mirrorWg := &mirror.wg // We want to throttle here in terms of how fast we request new consumers, // or if the previous is still in progress. @@ -3037,7 +3036,16 @@ func (mset *stream) setupMirrorConsumer() error { // Wait for previous processMirrorMsgs go routine to be completely done. // If none is running, this will not block. - mirrorWg.Wait() + mset.mu.Lock() + if mset.mirror == nil { + // Mirror config has been removed. + mset.mu.Unlock() + return + } else { + wg := &mset.mirror.wg + mset.mu.Unlock() + wg.Wait() + } select { case ccr := <-respCh: From 8d57a576baaaf0834b0f44aed4f10300828ab685 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 13 Jan 2026 14:26:31 +0100 Subject: [PATCH 07/10] [FIXED] JetStream leaks subs with overlapping stream import Signed-off-by: Maurice van Veen --- server/client.go | 2 +- server/jetstream_cluster_3_test.go | 108 +++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 1 deletion(-) diff --git a/server/client.go b/server/client.go index 3682be50262..289873e96be 100644 --- a/server/client.go +++ b/server/client.go @@ -3265,7 +3265,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool sub.shadow = nil if len(shadowSubs) > 0 { isSpokeLeaf = c.isSpokeLeafNode() - updateRoute = !isSpokeLeaf && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF) && c.srv != nil + updateRoute = !isSpokeLeaf && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF || c.kind == JETSTREAM) && c.srv != nil } sub.close() c.mu.Unlock() diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 571f9474260..b01e1a0c89e 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -6604,3 +6604,111 @@ func TestJetStreamClusterDeletedNodeDoesNotReviveStreamAfterCatchup(t *testing.T return nil }) } + +// https://github.com/nats-io/nats-server/issues/7718 +func TestJetStreamClusterLeakedSubsWithStreamImportOverlappingJetStreamSubs(t *testing.T) { + tmpl := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 2GB, max_file_store: 2GB, store_dir: '%s'} + + leaf { + listen: 127.0.0.1:-1 + } + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + accounts { + ACC: { + jetstream: enabled + users: [{user: acc, password: acc}] + imports: [{stream: {account: zone, subject: ">"}}] + } + zone: { + jetstream: enabled + users: [{user: zone, password: zone}] + exports: [{stream: ">"}] + } + } + no_auth_user: acc +` + c := createJetStreamClusterWithTemplate(t, tmpl, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + checkExpectedSubs := func(expected uint32) (actual uint32) { + t.Helper() + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + e := expected + for _, s := range c.servers { + subs := s.NumSubscriptions() + if e == 0 { + e = subs + } else if e != subs { + return fmt.Errorf("expected %d subs, got %d", e, subs) + } + } + actual = e + return nil + }) + return actual + } + + // Track subscription counts between stream/consumer create/deletes. + var baseline, sc, cc uint32 + + // Perform a couple iterations to check we get to predictable subscription counts. + for range 3 { + // Zero means we don't know the expected count, but still ALL servers must equal. + initial := checkExpectedSubs(0) + + // If we've iterated once, we'll know the baseline. Each next iteration must be equal to this. + if baseline != 0 { + require_Equal(t, baseline, initial) + } + + // Add the stream. + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 1, + Storage: nats.FileStorage, + }) + require_NoError(t, err) + sl := c.streamLeader("ACC", "TEST") + require_NotNil(t, sl) + afterStreamCreate := checkExpectedSubs(sl.NumSubscriptions()) + if sc == 0 { + sc = afterStreamCreate + } + require_Equal(t, sc, afterStreamCreate) + + // Add the consumer. + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"}) + require_NoError(t, err) + afterConsumerCreate := checkExpectedSubs(sl.NumSubscriptions()) + if cc == 0 { + cc = afterConsumerCreate + } + require_Equal(t, cc, afterConsumerCreate) + + // Delete the consumer, the subscriptions should drop down to what they were after the stream was created. + require_NoError(t, js.DeleteConsumer("TEST", "CONSUMER")) + afterConsumerDelete := checkExpectedSubs(sl.NumSubscriptions()) + require_Equal(t, afterStreamCreate, afterConsumerDelete) + + // Deleting the stream should drop the subscriptions back to the baseline. + require_NoError(t, js.DeleteStream("TEST")) + afterStreamDelete := checkExpectedSubs(sl.NumSubscriptions()) + if baseline == 0 { + baseline = afterStreamDelete + } + require_Equal(t, baseline, afterStreamDelete) + } +} From 7771e5b205e8f4be0096b28c354839ba098eb08c Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Tue, 13 Jan 2026 18:12:43 +0000 Subject: [PATCH 08/10] Don't rebuild subject state in filestore on `noTrack` stores after truncate Signed-off-by: Neil Twigg --- server/filestore.go | 3 +++ server/filestore_test.go | 19 +++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/server/filestore.go b/server/filestore.go index 3f32e6c59e4..c7aa26edf54 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -9457,6 +9457,9 @@ func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) { func (fs *fileStore) resetGlobalPerSubjectInfo() { // Clear any global subject state. fs.psim, fs.tsl = fs.psim.Empty(), 0 + if fs.noTrackSubjects() { + return + } for _, mb := range fs.blks { fs.populateGlobalPerSubjectInfo(mb) } diff --git a/server/filestore_test.go b/server/filestore_test.go index 3436c960bbe..37e2f7e9e5a 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -11464,3 +11464,22 @@ func TestFileStoreCompactFullyResetsFirstAndLastSeq(t *testing.T) { checkMbState(1, 0, 0) }) } + +func TestFileStoreDoesntRebuildSubjectStateWithNoTrack(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + + // This implicitly was calling resetGlobalPerSubjectInfo and + // populating "foo" back into the psim. + require_NoError(t, fs.Truncate(1)) + + fs.mu.Lock() + defer fs.mu.Unlock() + require_Equal(t, fs.psim.Size(), 0) + }) +} From 9733463c77682e54eb6c686b6bd17501dc9bd98e Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 14 Jan 2026 13:10:17 +0000 Subject: [PATCH 09/10] Fix configuring `cluster_traffic` in config mode The configuration field was not being copied in `(*Account).shallowCopy()` and was therefore lost between the config code and the Raft code. Signed-off-by: Neil Twigg --- server/accounts.go | 1 + server/jetstream_cluster_4_test.go | 5 +++++ server/opts.go | 2 +- 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/server/accounts.go b/server/accounts.go index 56b969e6d5c..e3ab9dbdc6f 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -299,6 +299,7 @@ func (a *Account) shallowCopy(na *Account) { na.Nkey = a.Nkey na.Issuer = a.Issuer na.traceDest, na.traceDestSampling = a.traceDest, a.traceDestSampling + na.nrgAccount = a.nrgAccount if a.imports.streams != nil { na.imports.streams = make([]*streamImport, 0, len(a.imports.streams)) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index a57ebfc587d..7e2d519fd3a 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -2140,6 +2140,7 @@ func TestJetStreamClusterAccountNRGConfigNoPanic(t *testing.T) { accounts { ONE { jetstream: { cluster_traffic: system } } + TWO { jetstream: { cluster_traffic: owner } } } ` @@ -2150,6 +2151,10 @@ func TestJetStreamClusterAccountNRGConfigNoPanic(t *testing.T) { acc, err := s.lookupAccount("ONE") require_NoError(t, err) require_Equal(t, acc.nrgAccount, _EMPTY_) // Empty for the system account + + acc, err = s.lookupAccount("TWO") + require_NoError(t, err) + require_Equal(t, acc.nrgAccount, "TWO") } } diff --git a/server/opts.go b/server/opts.go index f6123a327d9..a7fcb2b183d 100644 --- a/server/opts.go +++ b/server/opts.go @@ -2236,7 +2236,7 @@ func parseJetStreamForAccount(v any, acc *Account, errors *[]error) error { case "cluster_traffic": vv, ok := mv.(string) if !ok { - return &configErr{tk, fmt.Sprintf("Expected either 'system' or 'account' string value for %q, got %v", mk, mv)} + return &configErr{tk, fmt.Sprintf("Expected either 'system' or 'owner' string value for %q, got %v", mk, mv)} } switch vv { case "system", _EMPTY_: From 90b7195ec96c6e0256cf750a8f6b3b1dafa3f440 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 15 Jan 2026 11:56:24 +0000 Subject: [PATCH 10/10] Fix `HasInterest` early exit on partial wildcard Signed-off-by: Neil Twigg --- server/gsl/gsl.go | 4 +++- server/gsl/gsl_test.go | 8 ++++++++ server/sublist.go | 4 +++- server/sublist_test.go | 8 ++++++++ 4 files changed, 22 insertions(+), 2 deletions(-) diff --git a/server/gsl/gsl.go b/server/gsl/gsl.go index 88274dd234c..da5addcb683 100644 --- a/server/gsl/gsl.go +++ b/server/gsl/gsl.go @@ -251,7 +251,9 @@ func matchLevelForAny[T comparable](l *level[T], toks []string, np *int) bool { if np != nil { *np += len(n.subs) } - return len(n.subs) > 0 + if len(n.subs) > 0 { + return true + } } if pwc != nil { if np != nil { diff --git a/server/gsl/gsl_test.go b/server/gsl/gsl_test.go index f8b149ed2a8..1360b5bfaae 100644 --- a/server/gsl/gsl_test.go +++ b/server/gsl/gsl_test.go @@ -235,6 +235,14 @@ func TestGenericSublistHasInterest(t *testing.T) { require_NoError(t, s.Remove("*", 66)) } +func TestGenericSublistHasInterestOverlapping(t *testing.T) { + s := NewSublist[int]() + require_NoError(t, s.Insert("stream.A.child", 11)) + require_NoError(t, s.Insert("stream.*", 11)) + require_True(t, s.HasInterest("stream.A.child")) + require_True(t, s.HasInterest("stream.A")) +} + func TestGenericSublistNumInterest(t *testing.T) { s := NewSublist[int]() require_NoError(t, s.Insert("foo", 11)) diff --git a/server/sublist.go b/server/sublist.go index f9cf20f0fd2..0c391728758 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -818,7 +818,9 @@ func matchLevelForAny(l *level, toks []string, np, nq *int) bool { *nq += len(qsub) } } - return len(n.plist) > 0 || len(n.psubs) > 0 || len(n.qsubs) > 0 + if len(n.plist) > 0 || len(n.psubs) > 0 || len(n.qsubs) > 0 { + return true + } } if pwc != nil { if np != nil && nq != nil { diff --git a/server/sublist_test.go b/server/sublist_test.go index e1eaf23d432..eec34bc7efe 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -1773,6 +1773,14 @@ func TestSublistHasInterest(t *testing.T) { sl.Remove(qsub) } +func TestSublistHasInterestOverlapping(t *testing.T) { + sl := NewSublistWithCache() + require_NoError(t, sl.Insert(newSub("stream.A.child"))) + require_NoError(t, sl.Insert(newSub("stream.*"))) + require_True(t, sl.HasInterest("stream.A.child")) + require_True(t, sl.HasInterest("stream.A")) +} + func TestSublistNumInterest(t *testing.T) { sl := NewSublistWithCache() fooSub := newSub("foo")