From 9ed66a60450912dfdc9baad5db57fe429c8fa91b Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Tue, 14 Oct 2025 17:55:35 +0100 Subject: [PATCH 01/24] Tweak cache expiry in `firstMatching` or `firstMatchingMulti` The first load of the last sequence via `firstMatching` or `firstMatchingMulti` will still continue to flag `expireOk`, as though we're optimising for the linear scan case, but if it becomes apparent that we are continuously reloading the same last sequence over and over again in this way, don't set `expireOk` or we'll just keep expiring and reloading the cache each time. Signed-off-by: Neil Twigg --- server/filestore.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 2ac494f853a..e487c454523 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2705,7 +2705,7 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm * if err != nil { continue } - expireOk := seq == lseq && mb.llseq == seq + expireOk := seq == lseq && mb.llseq != llseq && mb.llseq == seq updateLLTS = false // cacheLookup already updated it. if sl.HasInterest(fsm.subj) { return fsm, expireOk, nil @@ -2839,7 +2839,7 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor continue } updateLLTS = false // cacheLookup already updated it. - expireOk := seq == lseq && mb.llseq == seq + expireOk := seq == lseq && mb.llseq != llseq && mb.llseq == seq if isAll { return fsm, expireOk, nil } From 55bb241f6eb5a9304179416715b6dc9f7a88ac87 Mon Sep 17 00:00:00 2001 From: roeschter Date: Tue, 14 Oct 2025 13:25:39 +0200 Subject: [PATCH 02/24] Improved warning for Observer mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michael Röschter --- server/jetstream_cluster.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 1cef7348847..ab4a2b1e798 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -896,6 +896,9 @@ func (js *jetStream) setupMetaGroup() error { } if cfg.Observer { s.Noticef("Turning JetStream metadata controller Observer Mode on") + s.Noticef("In cases where the JetStream domain is not intended to be extended through a SYS account leaf node connection") + s.Noticef("and waiting for leader election until first contact is not acceptable,") + s.Noticef(`manually disable Observer Mode by setting the JetStream Option "extension_hint: %s"`, jsNoExtend) } } else { s.Noticef("JetStream cluster recovering state") @@ -909,7 +912,7 @@ func (js *jetStream) setupMetaGroup() error { cfg.Observer = false case extUndetermined: s.Noticef("Turning JetStream metadata controller Observer Mode on - no previous contact") - s.Noticef("In cases where JetStream will not be extended") + s.Noticef("In cases where the JetStream domain is not intended to be extended through a SYS account leaf node connection") s.Noticef("and waiting for leader election until first contact is not acceptable,") s.Noticef(`manually disable Observer Mode by setting the JetStream Option "extension_hint: %s"`, jsNoExtend) } From 78855d66982170809c9e07f41bf966a730e693fc Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 15 Oct 2025 15:52:55 +0200 Subject: [PATCH 03/24] (2.12) [FIXED] Atomic batch: check correct header for unsupported error Signed-off-by: Maurice van Veen --- server/stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/stream.go b/server/stream.go index 1329c1642a3..e5b20e9c23c 100644 --- a/server/stream.go +++ b/server/stream.go @@ -6507,7 +6507,7 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr } // Reject unsupported headers. - if getExpectedLastMsgId(hdr) != _EMPTY_ { + if getExpectedLastMsgId(bhdr) != _EMPTY_ { return errorOnUnsupported(seq, JSExpectedLastMsgId) } From 44f40573e38ca16a65d461bc5f0ff6412edd4e2b Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Sat, 18 Oct 2025 11:26:08 -0600 Subject: [PATCH 04/24] [FIXED] Message Tracing: Hop header set properly per gateway Setting the "hop" header for each gateway could cause header corruption. This is now fixed. A test dealing with gateway has been improved to include more than one gateway, which would have demonstrated the issue. The test now passes and ensures that the hop is different per gateway. Related to #7442 Signed-off-by: Ivan Kozlovic --- server/gateway.go | 27 ++++++++-- server/msgtrace_test.go | 110 +++++++++++++++++++++++++++------------- 2 files changed, 97 insertions(+), 40 deletions(-) diff --git a/server/gateway.go b/server/gateway.go index 085f0e18f54..f4982cc2ce6 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -2561,11 +2561,18 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr return false } + // Copy off original pa in case it changes. + pa := c.pa + mt, _ := c.isMsgTraceEnabled() if mt != nil { - pa := c.pa + // We are going to replace "pa" with our copy of c.pa, but to restore + // to the original copy of c.pa, we need to save it again. + cpa := c.pa msg = mt.setOriginAccountHeaderIfNeeded(c, acc, msg) - defer func() { c.pa = pa }() + defer func() { c.pa = cpa }() + // Update pa with our current c.pa state. + pa = c.pa } var ( @@ -2579,6 +2586,7 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr didDeliver bool prodIsMQTT = c.isMqtt() dlvMsgs int64 + dlvExtraSz int64 ) // Get a subscription from the pool @@ -2676,8 +2684,11 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr } } + // Assume original message + dmsg := msg if mt != nil { - msg = mt.setHopHeader(c, msg) + // If trace is enabled, we need to set the hop header per gateway. + dmsg = mt.setHopHeader(c, dmsg) } // Setup the message header. @@ -2727,16 +2738,22 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr sub.nm, sub.max = 0, 0 sub.client = gwc sub.subject = subject - if c.deliverMsg(prodIsMQTT, sub, acc, subject, mreply, mh, msg, false) { + if c.deliverMsg(prodIsMQTT, sub, acc, subject, mreply, mh, dmsg, false) { // We don't count internal deliveries so count only if sub.icb is nil if sub.icb == nil { dlvMsgs++ + dlvExtraSz += int64(len(dmsg) - len(msg)) } didDeliver = true } + + // If we set the header reset the origin pub args. + if mt != nil { + c.pa = pa + } } if dlvMsgs > 0 { - totalBytes := dlvMsgs * int64(len(msg)) + totalBytes := dlvMsgs*int64(len(msg)) + dlvExtraSz // For non MQTT producers, remove the CR_LF * number of messages if !prodIsMQTT { totalBytes -= dlvMsgs * int64(LEN_CR_LF) diff --git a/server/msgtrace_test.go b/server/msgtrace_test.go index 57b72186f94..769a5e2cd4c 100644 --- a/server/msgtrace_test.go +++ b/server/msgtrace_test.go @@ -1496,17 +1496,33 @@ func TestMsgTraceWithGateways(t *testing.T) { s1 := runGatewayServer(o1) defer s1.Shutdown() - waitForOutboundGateways(t, s1, 1, time.Second) - waitForInboundGateways(t, s2, 1, time.Second) - waitForOutboundGateways(t, s2, 1, time.Second) + o3 := testGatewayOptionsFromToWithServers(t, "C", "B", s2) + o3.NoSystemAccount = false + s3 := runGatewayServer(o3) + defer s3.Shutdown() + + waitForOutboundGateways(t, s1, 2, time.Second) + waitForInboundGateways(t, s1, 2, time.Second) + waitForInboundGateways(t, s2, 2, time.Second) + waitForOutboundGateways(t, s2, 2, time.Second) + waitForInboundGateways(t, s3, 2, time.Second) + waitForOutboundGateways(t, s3, 2, time.Second) nc2 := natsConnect(t, s2.ClientURL(), nats.Name("sub2")) defer nc2.Close() - sub2 := natsQueueSubSync(t, nc2, "foo.*", "my_queue") + sub2 := natsQueueSubSync(t, nc2, "foo.*", "my_queue_2") + + nc22 := natsConnect(t, s2.ClientURL(), nats.Name("sub22")) + defer nc22.Close() + sub22 := natsQueueSubSync(t, nc22, "*.*", "my_queue_22") - nc3 := natsConnect(t, s2.ClientURL(), nats.Name("sub3")) + nc3 := natsConnect(t, s3.ClientURL(), nats.Name("sub3")) defer nc3.Close() - sub3 := natsQueueSubSync(t, nc3, "*.*", "my_queue_2") + sub3 := natsQueueSubSync(t, nc3, "foo.*", "my_queue_3") + + nc32 := natsConnect(t, s3.ClientURL(), nats.Name("sub32")) + defer nc32.Close() + sub32 := natsQueueSubSync(t, nc32, "*.*", "my_queue_32") nc1 := natsConnect(t, s1.ClientURL(), nats.Name("sub1")) defer nc1.Close() @@ -1540,17 +1556,18 @@ func TestMsgTraceWithGateways(t *testing.T) { checkAppMsg := func(sub *nats.Subscription, expected bool) { if expected { appMsg := natsNexMsg(t, sub, time.Second) - require_Equal[string](t, string(appMsg.Data), "hello!") + require_Equal(t, string(appMsg.Data), "hello!") } // Check that no (more) messages are received. if msg, err := sub.NextMsg(100 * time.Millisecond); err != nats.ErrTimeout { t.Fatalf("Did not expect application message, got %s", msg.Data) } } - for _, sub := range []*nats.Subscription{sub1, sub2, sub3} { + for _, sub := range []*nats.Subscription{sub1, sub2, sub22, sub3, sub32} { checkAppMsg(sub, test.deliverMsg) } + var previousHop string check := func() { traceMsg := natsNexMsg(t, traceSub, time.Second) var e MsgTraceEvent @@ -1560,58 +1577,81 @@ func TestMsgTraceWithGateways(t *testing.T) { require_True(t, ingress != nil) switch ingress.Kind { case CLIENT: - require_Equal[string](t, e.Server.Name, s1.Name()) - require_Equal[string](t, ingress.Account, globalAccountName) - require_Equal[string](t, ingress.Subject, "foo.bar") + require_Equal(t, e.Server.Name, s1.Name()) + require_Equal(t, ingress.Account, globalAccountName) + require_Equal(t, ingress.Subject, "foo.bar") egress := e.Egresses() - require_Equal[int](t, len(egress), 2) + require_Equal(t, len(egress), 3) for _, eg := range egress { switch eg.Kind { case CLIENT: - require_Equal[string](t, eg.Name, "sub1") - require_Equal[string](t, eg.Subscription, "*.bar") - require_Equal[string](t, eg.Queue, _EMPTY_) + require_Equal(t, eg.Name, "sub1") + require_Equal(t, eg.Subscription, "*.bar") + require_Equal(t, eg.Queue, _EMPTY_) case GATEWAY: - require_Equal[string](t, eg.Name, s2.Name()) - require_Equal[string](t, eg.Error, _EMPTY_) - require_Equal[string](t, eg.Subscription, _EMPTY_) - require_Equal[string](t, eg.Queue, _EMPTY_) + if eg.Name != s2.Name() && eg.Name != s3.Name() { + t.Fatalf("Expected name to be %q or %q, got %q", s2.Name(), s3.Name(), eg.Name) + } + require_Equal(t, eg.Error, _EMPTY_) + require_Equal(t, eg.Subscription, _EMPTY_) + require_Equal(t, eg.Queue, _EMPTY_) default: t.Fatalf("Unexpected egress: %+v", eg) } } case GATEWAY: - require_Equal[string](t, e.Server.Name, s2.Name()) - require_Equal[string](t, ingress.Account, globalAccountName) - require_Equal[string](t, ingress.Subject, "foo.bar") + require_True(t, e.Request.Header != nil) + require_Len(t, len(e.Request.Header[MsgTraceHop]), 1) + hop := e.Request.Header[MsgTraceHop][0] + require_True(t, hop == "1" || hop == "2") + if previousHop == _EMPTY_ { + previousHop = hop + } else if hop == previousHop { + t.Fatalf("Expected different hop value, got the same %q", hop) + } + var sub2Name, queue2Name, sub3Name, queue3Name string + switch e.Server.Name { + case s2.Name(): + require_Equal(t, e.Server.Cluster, "B") + sub2Name, sub3Name = "sub2", "sub22" + queue2Name, queue3Name = "my_queue_2", "my_queue_22" + case s3.Name(): + require_Equal(t, e.Server.Cluster, "C") + sub2Name, sub3Name = "sub3", "sub32" + queue2Name, queue3Name = "my_queue_3", "my_queue_32" + default: + t.Fatalf("Unexpected server name %q", e.Server.Name) + } + require_Equal(t, ingress.Account, globalAccountName) + require_Equal(t, ingress.Subject, "foo.bar") egress := e.Egresses() - require_Equal[int](t, len(egress), 2) + require_Equal(t, len(egress), 2) var gotSub2, gotSub3 int for _, eg := range egress { require_True(t, eg.Kind == CLIENT) switch eg.Name { - case "sub2": - require_Equal[string](t, eg.Subscription, "foo.*") - require_Equal[string](t, eg.Queue, "my_queue") + case sub2Name: + require_Equal(t, eg.Subscription, "foo.*") + require_Equal(t, eg.Queue, queue2Name) gotSub2++ - case "sub3": - require_Equal[string](t, eg.Subscription, "*.*") - require_Equal[string](t, eg.Queue, "my_queue_2") + case sub3Name: + require_Equal(t, eg.Subscription, "*.*") + require_Equal(t, eg.Queue, queue3Name) gotSub3++ default: t.Fatalf("Unexpected egress name: %+v", eg) } } - require_Equal[int](t, gotSub2, 1) - require_Equal[int](t, gotSub3, 1) - + require_Equal(t, gotSub2, 1) + require_Equal(t, gotSub3, 1) default: t.Fatalf("Unexpected ingress: %+v", ingress) } } - // We should get 2 events - check() - check() + // We should get 3 events + for range 3 { + check() + } // Make sure we are not receiving more traces if tm, err := traceSub.NextMsg(250 * time.Millisecond); err == nil { t.Fatalf("Should not have received trace message: %s", tm.Data) From aaf5af7caeb094ecb31f17be2963af1c2e397ac7 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 17 Oct 2025 14:31:06 +0200 Subject: [PATCH 05/24] [IMPROVED] Interest desync after consumer create/update Signed-off-by: Maurice van Veen --- server/jetstream_cluster_1_test.go | 116 +++++++++++++++++++++++++++++ server/stream.go | 12 +++ 2 files changed, 128 insertions(+) diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index e480db7080c..8b2e54cb168 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -10447,6 +10447,122 @@ func TestJetStreamClusterJszRaftLeaderReporting(t *testing.T) { } } +func TestJetStreamClusterNoInterestDesyncOnConsumerCreate(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + + // Pick a random server that will not know about the new consumer being created. + // If servers determine "no interest" individually, these servers will desync. + rs := c.randomNonLeader() + sjs := rs.getJetStream() + meta := sjs.getMetaGroup() + require_NoError(t, meta.PauseApply()) + + sub, err := js.PullSubscribe(_EMPTY_, "DURABLE", nats.BindStream("TEST")) + require_NoError(t, err) + defer sub.Drain() + + checkConsumersAssigned := func(expected int) { + t.Helper() + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + var count int + for _, s := range c.servers { + _, _, jsa := s.globalAccount().getJetStreamFromAccount() + if jsa.consumerAssigned("TEST", "DURABLE") { + count++ + } + } + if count != expected { + return fmt.Errorf("expected %d, got %d", expected, count) + } + return nil + }) + } + // Confirm only two servers know about the consumer. + checkConsumersAssigned(2) + c.waitOnConsumerLeader(globalAccountName, "TEST", "DURABLE") + + // Publish a single message. All servers will receive this, but only two will store it. + _, err = js.Publish("foo", nil) + require_NoError(t, err) + checkLastSeq := func(lseq uint64) { + t.Helper() + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + for _, s := range c.servers { + mset, err := s.globalAccount().lookupStream("TEST") + if err != nil { + return err + } + if seq := mset.lastSeq(); seq != lseq { + return fmt.Errorf("expected %d, got %d", lseq, seq) + } + } + return nil + }) + } + checkLastSeq(1) + + // Resume the meta layer such that the consumer gets created on the remaining server. + meta.ResumeApply() + checkConsumersAssigned(3) + + // All servers will now store another published message. + _, err = js.Publish("foo", nil) + require_NoError(t, err) + checkLastSeq(2) + + // Make sure the consumer leader is on the same server that didn't store the first message. + cl := c.consumerLeader(globalAccountName, "TEST", "DURABLE") + if cl != rs { + mset, err := cl.globalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("DURABLE") + require_NotNil(t, o) + n := o.raftNode() + require_NoError(t, n.StepDown(rs.NodeName())) + c.waitOnConsumerLeader(globalAccountName, "TEST", "DURABLE") + cl = c.consumerLeader(globalAccountName, "TEST", "DURABLE") + require_Equal(t, cl, rs) + } + + // Since the consumer leader is the same as the server that didn't store the first message, + // it can only receive and ack the second message. + msgs, err := sub.Fetch(1, nats.MaxWait(time.Second)) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + metadata, err := msgs[0].Metadata() + require_NoError(t, err) + require_Equal(t, metadata.Sequence.Stream, 2) + require_Equal(t, metadata.NumPending, 0) + require_NoError(t, msgs[0].AckSync()) + + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + // The servers will eventually be synced up again, but this relies on the interest state being checked. + for _, s := range c.servers { + if s == rs { + continue + } + mset, err := s.globalAccount().lookupStream("TEST") + if err != nil { + return err + } + mset.checkInterestState() + } + return checkState(t, c, globalAccountName, "TEST") + }) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/stream.go b/server/stream.go index e5b20e9c23c..ba923b25a8b 100644 --- a/server/stream.go +++ b/server/stream.go @@ -7593,7 +7593,19 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) bool { // Only propose message deletion to the stream if we're consumer leader, otherwise all followers would also propose. // We must be the consumer leader, since we know for sure we've stored the message and don't register as pre-ack. if o != nil && !o.IsLeader() { + // Currently, interest-based streams can race on "no interest" because consumer creates/updates go over + // the meta layer and published messages go over the stream layer. Some servers could then either store + // or not store some initial set of messages that gained new interest. To get the stream back in sync, + // we allow moving the first sequence up. + // TODO(mvv): later on only the stream leader should determine "no interest" + interestRaiseFirst := mset.cfg.Retention == InterestPolicy && seq == state.FirstSeq mset.mu.Unlock() + if interestRaiseFirst { + if _, err := store.RemoveMsg(seq); err == ErrStoreEOF { + // This should not happen, but being pedantic. + mset.registerPreAckLock(o, seq) + } + } // Must still mark as removal if follower. If we become leader later, we must be able to retry the proposal. return true } From 288652c39bfc450712711747159bce8a91a1f606 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Sat, 18 Oct 2025 12:04:54 -0600 Subject: [PATCH 06/24] [FIXED] LeafNode proxy validation Saw the test TestConfigCheck flap. The reason is that the proxy validation was done when parsing the "proxy" field, but there was no guarantee that this field would be parsed after "urls", which may lead to validation being successful (because there was no URLs added yet). Move the check after all fields have been parsed for a given remote. Relates to #7242 Signed-off-by: Ivan Kozlovic --- server/opts.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/server/opts.go b/server/opts.go index f7aed1081e5..9931bf556a4 100644 --- a/server/opts.go +++ b/server/opts.go @@ -2889,6 +2889,7 @@ func parseRemoteLeafNodes(v any, errors *[]error, warnings *[]error) ([]*RemoteL continue } remote := &RemoteLeafOpts{} + var proxyToken token for k, v := range rm { tk, v = unwrapValue(v, <) switch strings.ToLower(k) { @@ -3022,7 +3023,7 @@ func parseRemoteLeafNodes(v any, errors *[]error, warnings *[]error) ([]*RemoteL continue } // Capture the token for the "proxy" field itself, before the map iteration - proxyToken := tk + proxyToken = tk for pk, pv := range proxyMap { tk, pv = unwrapValue(pv, <) switch strings.ToLower(pk) { @@ -3047,16 +3048,6 @@ func parseRemoteLeafNodes(v any, errors *[]error, warnings *[]error) ([]*RemoteL } } } - // Use the saved proxy token for validation errors, not the last field token - if warns, err := validateLeafNodeProxyOptions(remote); err != nil { - *errors = append(*errors, &configErr{proxyToken, err.Error()}) - continue - } else { - // Add any warnings about proxy configuration - for _, warn := range warns { - *warnings = append(*warnings, &configErr{proxyToken, warn}) - } - } default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -3070,6 +3061,16 @@ func parseRemoteLeafNodes(v any, errors *[]error, warnings *[]error) ([]*RemoteL } } } + // Use the saved proxy token for validation errors, not the last field token + if warns, err := validateLeafNodeProxyOptions(remote); err != nil { + *errors = append(*errors, &configErr{proxyToken, err.Error()}) + continue + } else { + // Add any warnings about proxy configuration + for _, warn := range warns { + *warnings = append(*warnings, &configErr{proxyToken, warn}) + } + } remotes = append(remotes, remote) } return remotes, nil From f30d860a0e1fd2be5376bbf5405b369d98373fb4 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 21 Oct 2025 10:42:09 +0200 Subject: [PATCH 07/24] (2.12) [FIXED] Leaf node token auth Signed-off-by: Maurice van Veen --- server/auth.go | 3 ++- server/leafnode.go | 9 +++++++-- server/leafnode_test.go | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 43 insertions(+), 3 deletions(-) diff --git a/server/auth.go b/server/auth.go index 2d735c0e2f6..ede297441b4 100644 --- a/server/auth.go +++ b/server/auth.go @@ -1123,7 +1123,8 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au return ok } - if c.kind == CLIENT { + // Check for the use of simple auth. + if c.kind == CLIENT || c.kind == LEAF { if proxyRequired = opts.ProxyRequired; proxyRequired && !trustedProxy { return setProxyAuthError(ErrAuthProxyRequired) } diff --git a/server/leafnode.go b/server/leafnode.go index 0049823d5cb..f306ebc9c15 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1049,17 +1049,22 @@ func (c *client) sendLeafConnect(clusterName string, headers bool) error { // In addition, and this is to allow auth callout, set user/password or // token if applicable. if userInfo := c.leaf.remote.curURL.User; userInfo != nil { - // For backward compatibility, if only username is provided, set both - // Token and User, not just Token. cinfo.User = userInfo.Username() var ok bool cinfo.Pass, ok = userInfo.Password() + // For backward compatibility, if only username is provided, set both + // Token and User, not just Token. if !ok { cinfo.Token = cinfo.User } } else if c.leaf.remote.username != _EMPTY_ { cinfo.User = c.leaf.remote.username cinfo.Pass = c.leaf.remote.password + // For backward compatibility, if only username is provided, set both + // Token and User, not just Token. + if cinfo.Pass == _EMPTY_ { + cinfo.Token = cinfo.User + } } b, err := json.Marshal(cinfo) if err != nil { diff --git a/server/leafnode_test.go b/server/leafnode_test.go index d0ea7305dd6..9d7f94bd227 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -10830,3 +10830,37 @@ func TestLeafNodeConfigureWriteDeadline(t *testing.T) { require_Equal(t, r.out.wdl, 6*time.Second) }) } + +// https://github.com/nats-io/nats-server/issues/7441 +func TestLeafNodesBasicTokenAuth(t *testing.T) { + hubConf := createConfFile(t, []byte(` + server_name: "HUB" + listen: "127.0.0.1:-1" + authorization { + token: secret + } + leafnodes { + listen: "127.0.0.1:-1" + } + `)) + hub, ohub := RunServerWithConfig(hubConf) + defer hub.Shutdown() + + port := ohub.LeafNode.Port + leafTmpl := ` + server_name: "LEAF" + listen: "127.0.0.1:-1" + leafnodes { + remotes: [ + { url: "nats://secret@localhost:%d" } + ] + } + ` + leafConf := createConfFile(t, fmt.Appendf(nil, leafTmpl, port)) + leaf, _ := RunServerWithConfig(leafConf) + defer leaf.Shutdown() + + // Verify that we have only 1 leaf + checkLeafNodeConnected(t, hub) + checkLeafNodeConnected(t, leaf) +} From c9d06541a7810c964844982b1044661e029b83f7 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 21 Oct 2025 17:34:14 +0200 Subject: [PATCH 08/24] [IMPROVED] Filestore MaxBytes/Msgs update performance Signed-off-by: Maurice van Veen --- server/filestore.go | 41 ++++++++++++++++++++++++++++++++++++++++ server/filestore_test.go | 41 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/server/filestore.go b/server/filestore.go index e487c454523..0a71b1a3cd8 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -4756,6 +4756,17 @@ func (fs *fileStore) enforceMsgLimit() { return } for nmsgs := fs.state.Msgs; nmsgs > uint64(fs.cfg.MaxMsgs); nmsgs = fs.state.Msgs { + // If the first block can be removed fully, purge it entirely without needing to walk sequences. + if len(fs.blks) > 0 { + fmb := fs.blks[0] + fmb.mu.RLock() + msgs := fmb.msgs + fmb.mu.RUnlock() + if nmsgs-msgs > uint64(fs.cfg.MaxMsgs) { + fs.purgeMsgBlock(fmb) + continue + } + } if removed, err := fs.deleteFirstMsg(); err != nil || !removed { fs.rebuildFirst() return @@ -4773,6 +4784,17 @@ func (fs *fileStore) enforceBytesLimit() { return } for bs := fs.state.Bytes; bs > uint64(fs.cfg.MaxBytes); bs = fs.state.Bytes { + // If the first block can be removed fully, purge it entirely without needing to walk sequences. + if len(fs.blks) > 0 { + fmb := fs.blks[0] + fmb.mu.RLock() + bytes := fmb.bytes + fmb.mu.RUnlock() + if bs-bytes > uint64(fs.cfg.MaxBytes) { + fs.purgeMsgBlock(fmb) + continue + } + } if removed, err := fs.deleteFirstMsg(); err != nil || !removed { fs.rebuildFirst() return @@ -9347,6 +9369,25 @@ func (fs *fileStore) forceRemoveMsgBlock(mb *msgBlock) { fs.removeMsgBlockFromList(mb) } +// Purges and removes the msgBlock from the store. +// Lock should be held. +func (fs *fileStore) purgeMsgBlock(mb *msgBlock) { + mb.mu.Lock() + // Update top level accounting. + msgs, bytes := mb.msgs, mb.bytes + if msgs > fs.state.Msgs { + msgs = fs.state.Msgs + } + if bytes > fs.state.Bytes { + bytes = fs.state.Bytes + } + fs.state.Msgs -= msgs + fs.state.Bytes -= bytes + fs.removeMsgBlock(mb) + mb.mu.Unlock() + fs.selectNextFirst() +} + // Called by purge to simply get rid of the cache and close our fds. // Lock should not be held. func (mb *msgBlock) dirtyClose() { diff --git a/server/filestore_test.go b/server/filestore_test.go index 2d9c14e087f..bf6675174cc 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -10963,3 +10963,44 @@ func TestFileStoreEraseMsgErr(t *testing.T) { fs.EraseMsg(2) }) } + +func TestFileStorePurgeMsgBlock(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fcfg.BlockSize = 10 * 33 + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + for range 20 { + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + } + + fs.mu.RLock() + blks := len(fs.blks) + fs.mu.RUnlock() + require_Equal(t, blks, 2) + + state := fs.State() + require_Equal(t, state.FirstSeq, 1) + require_Equal(t, state.LastSeq, 20) + require_Equal(t, state.Msgs, 20) + require_Equal(t, state.Bytes, 20*33) + + // Purging the block should both remove the block and do the accounting. + fmb := fs.getFirstBlock() + fs.mu.Lock() + fs.purgeMsgBlock(fmb) + blks = len(fs.blks) + fs.mu.Unlock() + + require_Equal(t, blks, 1) + state = fs.State() + require_Equal(t, state.FirstSeq, 11) + require_Equal(t, state.LastSeq, 20) + require_Equal(t, state.Msgs, 10) + require_Equal(t, state.Bytes, 10*33) + }) +} From 0a03552f7d978ed27283fcf421dca81de4f0f7dc Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 22 Oct 2025 11:59:43 +0200 Subject: [PATCH 09/24] (2.12) [FIXED] DirectGet batch deadlock with parallel write Signed-off-by: Maurice van Veen --- server/jetstream_test.go | 62 ++++++++++++++++++++++++++++++++++++++++ server/stream.go | 4 +-- 2 files changed, 64 insertions(+), 2 deletions(-) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 6762f4fcb86..020ad0eb589 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -22254,3 +22254,65 @@ func TestJetStreamScheduledMessageNotDeactivated(t *testing.T) { }) } } + +func TestJetStreamDirectGetBatchParallelWriteDeadlock(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: nats.FileStorage, + AllowDirect: true, + }) + require_NoError(t, err) + + mset, err := s.globalAccount().lookupStream("TEST") + require_NoError(t, err) + for range 2 { + require_NoError(t, mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 0, 0, nil, false, true)) + } + + // We'll lock the message blocks such that we can't read, but NumPending should still function. + fs := mset.store.(*fileStore) + fs.lockAllMsgBlocks() + total, validThrough := fs.NumPending(1, _EMPTY_, false) + require_Equal(t, total, 2) + require_Equal(t, validThrough, 2) + + // We'll now run things in the following order: + // - do a read through Direct Batch Get, which is blocked by the message blocks being locked + // - do a write in parallel, which is blocked by the read to complete + // - unlock the message blocks while both read and write goroutines are active + // If there's no deadlock the read and write will complete, and 3 messages will end up in the stream. + var wg, read sync.WaitGroup + read.Add(1) + wg.Add(1) + go func() { + read.Done() + mset.getDirectRequest(&JSApiMsgGetRequest{Seq: 1, Batch: 2}, _EMPTY_) + }() + go func() { + // Make sure we enter getDirectRequest first. + read.Wait() + <-time.After(100 * time.Millisecond) + wg.Done() + mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 0, 0, nil, false, true) + }() + go func() { + // Run some time after we've entered processJetStreamMsg above. + wg.Wait() + <-time.After(100 * time.Millisecond) + fs.unlockAllMsgBlocks() + }() + read.Wait() + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + if msgs := mset.state().Msgs; msgs != 3 { + return fmt.Errorf("expected 3 msgs, got %d", msgs) + } + return nil + }) +} diff --git a/server/stream.go b/server/stream.go index ba923b25a8b..514d4b970de 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5287,8 +5287,8 @@ func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) { // If batch was requested send EOB. if isBatchRequest { - // Update if the stream's lasts sequence has moved past our validThrough. - if mset.lastSeq() > validThrough { + // Update if the stream's last sequence has moved past our validThrough. + if mset.lseq > validThrough { np, _ = store.NumPending(seq, req.NextFor, false) } hdr := fmt.Appendf(nil, eob, np, lseq) From 40e3cb5e0ad1656974f4425f5d9e1b632f73b7d8 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 23 Oct 2025 12:03:55 +0200 Subject: [PATCH 10/24] [FIXED] Header search without alloc & bug fixes Signed-off-by: Maurice van Veen --- server/client.go | 63 +++++++++++++++++++------- server/client_test.go | 101 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 146 insertions(+), 18 deletions(-) diff --git a/server/client.go b/server/client.go index b7ef2ba60b5..af670bd33b1 100644 --- a/server/client.go +++ b/server/client.go @@ -4345,7 +4345,7 @@ func (c *client) setupResponseServiceImport(acc *Account, si *serviceImport, tra // Will remove a header if present. func removeHeaderIfPresent(hdr []byte, key string) []byte { - start := bytes.Index(hdr, []byte(key+":")) + start := getHeaderKeyIndex(key, hdr) // key can't be first and we want to check that it is preceded by a '\n' if start < 1 || hdr[start-1] != '\n' { return hdr @@ -4463,22 +4463,13 @@ func sliceHeader(key string, hdr []byte) []byte { if len(hdr) == 0 { return nil } - index := bytes.Index(hdr, stringToBytes(key+":")) - hdrLen := len(hdr) - // Check that we have enough characters, this will handle the -1 case of the key not - // being found and will also handle not having enough characters for trailing CRLF. - if index < 2 { - return nil - } - // There should be a terminating CRLF. - if index >= hdrLen-1 || hdr[index-1] != '\n' || hdr[index-2] != '\r' { + index := getHeaderKeyIndex(key, hdr) + if index == -1 { return nil } - // The key should be immediately followed by a : separator. + // Skip over the key and the : separator. index += len(key) + 1 - if index >= hdrLen || hdr[index-1] != ':' { - return nil - } + hdrLen := len(hdr) // Skip over whitespace before the value. for index < hdrLen && hdr[index] == ' ' { index++ @@ -4494,11 +4485,49 @@ func sliceHeader(key string, hdr []byte) []byte { return hdr[start:index:index] } +// getHeaderKeyIndex returns an index into the header slice for the given key. +// Returns -1 if not found. +func getHeaderKeyIndex(key string, hdr []byte) int { + if len(hdr) == 0 { + return -1 + } + bkey := stringToBytes(key) + keyLen, hdrLen := len(key), len(hdr) + var offset int + for { + index := bytes.Index(hdr[offset:], bkey) + // Check that we have enough characters, this will handle the -1 case of the key not + // being found and will also handle not having enough characters for trailing CRLF. + if index < 2 { + return -1 + } + index += offset + // There should be a terminating CRLF. + if index >= hdrLen-1 || hdr[index-1] != '\n' || hdr[index-2] != '\r' { + offset = index + keyLen + continue + } + // The key should be immediately followed by a : separator. + if index+keyLen >= hdrLen { + return -1 + } + if hdr[index+keyLen] != ':' { + offset = index + keyLen + continue + } + return index + } +} + func setHeader(key, val string, hdr []byte) []byte { - prefix := []byte(key + ": ") - start := bytes.Index(hdr, prefix) + start := getHeaderKeyIndex(key, hdr) if start >= 0 { - valStart := start + len(prefix) + valStart := start + len(key) + 1 + // Preserve single whitespace if used. + hdrLen := len(hdr) + if valStart < hdrLen && hdr[valStart] == ' ' { + valStart++ + } valEnd := bytes.Index(hdr[valStart:], []byte("\r")) if valEnd < 0 { return hdr // malformed headers diff --git a/server/client_test.go b/server/client_test.go index 88b6baf0063..03d0fca2e72 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -3195,7 +3195,7 @@ func TestSliceHeader(t *testing.T) { require_True(t, bytes.Equal(sliced, copied)) } -func TestSliceHeaderOrdering(t *testing.T) { +func TestSliceHeaderOrderingPrefix(t *testing.T) { hdr := []byte("NATS/1.0\r\n\r\n") // These headers share the same prefix, the longer subject @@ -3215,6 +3215,105 @@ func TestSliceHeaderOrdering(t *testing.T) { require_True(t, bytes.Equal(sliced, copied)) } +func TestSliceHeaderOrderingSuffix(t *testing.T) { + hdr := []byte("NATS/1.0\r\n\r\n") + + // These headers share the same suffix, the longer subject + // must not invalidate the existence of the shorter one. + hdr = genHeader(hdr, "Previous-Nats-Msg-Id", "user") + hdr = genHeader(hdr, "Nats-Msg-Id", "control") + + sliced := sliceHeader("Nats-Msg-Id", hdr) + copied := getHeader("Nats-Msg-Id", hdr) + + require_NotNil(t, sliced) + require_NotNil(t, copied) + require_True(t, bytes.Equal(sliced, copied)) + require_Equal(t, string(copied), "control") +} + +func TestRemoveHeaderIfPresentOrderingPrefix(t *testing.T) { + hdr := []byte("NATS/1.0\r\n\r\n") + + // These headers share the same prefix, the longer subject + // must not invalidate the existence of the shorter one. + hdr = genHeader(hdr, JSExpectedLastSubjSeqSubj, "foo") + hdr = genHeader(hdr, JSExpectedLastSubjSeq, "24") + + hdr = removeHeaderIfPresent(hdr, JSExpectedLastSubjSeq) + ehdr := genHeader(nil, JSExpectedLastSubjSeqSubj, "foo") + require_True(t, bytes.Equal(hdr, ehdr)) +} + +func TestRemoveHeaderIfPresentOrderingSuffix(t *testing.T) { + hdr := []byte("NATS/1.0\r\n\r\n") + + // These headers share the same suffix, the longer subject + // must not invalidate the existence of the shorter one. + hdr = genHeader(hdr, "Previous-Nats-Msg-Id", "user") + hdr = genHeader(hdr, "Nats-Msg-Id", "control") + + hdr = removeHeaderIfPresent(hdr, "Nats-Msg-Id") + ehdr := genHeader(nil, "Previous-Nats-Msg-Id", "user") + require_True(t, bytes.Equal(hdr, ehdr)) +} + +func TestSetHeaderOrderingPrefix(t *testing.T) { + for _, space := range []bool{true, false} { + title := "Normal" + if !space { + title = "Trimmed" + } + t.Run(title, func(t *testing.T) { + hdr := []byte("NATS/1.0\r\n\r\n") + + // These headers share the same prefix, the longer subject + // must not invalidate the existence of the shorter one. + hdr = genHeader(hdr, JSExpectedLastSubjSeqSubj, "foo") + hdr = genHeader(hdr, JSExpectedLastSubjSeq, "24") + if !space { + hdr = bytes.ReplaceAll(hdr, []byte(" "), nil) + } + + hdr = setHeader(JSExpectedLastSubjSeq, "12", hdr) + ehdr := genHeader(nil, JSExpectedLastSubjSeqSubj, "foo") + ehdr = genHeader(ehdr, JSExpectedLastSubjSeq, "12") + if !space { + ehdr = bytes.ReplaceAll(ehdr, []byte(" "), nil) + } + require_True(t, bytes.Equal(hdr, ehdr)) + }) + } +} + +func TestSetHeaderOrderingSuffix(t *testing.T) { + for _, space := range []bool{true, false} { + title := "Normal" + if !space { + title = "Trimmed" + } + t.Run(title, func(t *testing.T) { + hdr := []byte("NATS/1.0\r\n\r\n") + + // These headers share the same suffix, the longer subject + // must not invalidate the existence of the shorter one. + hdr = genHeader(hdr, "Previous-Nats-Msg-Id", "user") + hdr = genHeader(hdr, "Nats-Msg-Id", "control") + if !space { + hdr = bytes.ReplaceAll(hdr, []byte(" "), nil) + } + + hdr = setHeader("Nats-Msg-Id", "other", hdr) + ehdr := genHeader(nil, "Previous-Nats-Msg-Id", "user") + ehdr = genHeader(ehdr, "Nats-Msg-Id", "other") + if !space { + ehdr = bytes.ReplaceAll(ehdr, []byte(" "), nil) + } + require_True(t, bytes.Equal(hdr, ehdr)) + }) + } +} + func TestInProcessAllowedConnectionType(t *testing.T) { tmpl := ` listen: "127.0.0.1:-1" From f1364b976e740b7b7d06e842d23341b47432081f Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 23 Oct 2025 14:20:36 +0200 Subject: [PATCH 11/24] [FIXED] Consumer send 404 No Messages on EOS Signed-off-by: Maurice van Veen --- server/consumer.go | 22 +++++++++++++---- server/jetstream_consumer_test.go | 40 +++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 080ec7a1fdc..b6721885089 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4519,7 +4519,8 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) { var pre *waitingRequest for wr := wq.head; wr != nil; { // Check expiration. - if (eos && wr.noWait && wr.d > 0) || (!wr.expires.IsZero() && now.After(wr.expires)) { + expires := !wr.expires.IsZero() && now.After(wr.expires) + if (eos && wr.noWait) || expires { rdWait := o.replicateDeliveries() if rdWait { // Check if we need to send the timeout after pending replicated deliveries, or can do so immediately. @@ -4528,13 +4529,26 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) { } else { wd.pn, wd.pb = wr.n, wr.b } + // If we still need to wait for replicated deliveries, remove from waiting list. + if rdWait { + wr = remove(pre, wr) + continue + } } - if !rdWait { + // Normally it's a timeout. + if expires || !wr.noWait || wr.d > 0 { hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b) o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + wr = remove(pre, wr) + continue + } else if wr.expires.IsZero() { + // But if we're NoWait without expiry, we've reached the end of the stream, and we've not delivered any messages. + // Return no messages instead, which is the same as if we'd rejected the pull request initially. + hdr := fmt.Appendf(nil, "NATS/1.0 404 No Messages\r\n\r\n") + o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + wr = remove(pre, wr) + continue } - wr = remove(pre, wr) - continue } // Now check interest. interest := wr.acc.sl.HasInterest(wr.interest) diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index cb7883d277d..e6eae9d9753 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -10341,3 +10341,43 @@ func TestJetStreamConsumerMaxDeliverUnderflow(t *testing.T) { o.mu.RUnlock() require_Equal(t, maxdc, 0) } + +// https://github.com/nats-io/nats-server/issues/7457 +func TestJetStreamConsumerNoWaitNoMessagesOnEos(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}}) + require_NoError(t, err) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"}) + require_NoError(t, err) + + sub, err := nc.SubscribeSync("reply") + require_NoError(t, err) + defer sub.Drain() + require_NoError(t, nc.Flush()) + + mset, err := s.globalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + // Fiddle with the pending count such that the NoWait request will go through, + // and the "404 No Messages" will be sent when hitting the end of the stream. + o.mu.Lock() + o.npc++ + o.mu.Unlock() + + req := &JSApiConsumerGetNextRequest{NoWait: true} + jreq, err := json.Marshal(req) + require_NoError(t, err) + o.processNextMsgRequest("reply", jreq) + + msg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, msg.Header.Get("Status"), "404") + require_Equal(t, msg.Header.Get("Description"), "No Messages") +} From df48a14e88a40a7bcc679b8d4f50c01acf8312e5 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 23 Oct 2025 17:28:37 +0200 Subject: [PATCH 12/24] (2.14) [FIXED] Consumer send 404 No Messages on EOS after delivering messages Signed-off-by: Maurice van Veen --- server/consumer.go | 4 +-- server/jetstream_consumer_test.go | 43 +++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index b6721885089..226e5a3177e 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4536,12 +4536,12 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) { } } // Normally it's a timeout. - if expires || !wr.noWait || wr.d > 0 { + if expires { hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b) o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) wr = remove(pre, wr) continue - } else if wr.expires.IsZero() { + } else if wr.expires.IsZero() || wr.d > 0 { // But if we're NoWait without expiry, we've reached the end of the stream, and we've not delivered any messages. // Return no messages instead, which is the same as if we'd rejected the pull request initially. hdr := fmt.Appendf(nil, "NATS/1.0 404 No Messages\r\n\r\n") diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index e6eae9d9753..9cc79db264d 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -10381,3 +10381,46 @@ func TestJetStreamConsumerNoWaitNoMessagesOnEos(t *testing.T) { require_Equal(t, msg.Header.Get("Status"), "404") require_Equal(t, msg.Header.Get("Description"), "No Messages") } + +// https://github.com/nats-io/nats-server/issues/5373 +func TestJetStreamConsumerNoWaitNoMessagesOnEosWithDeliveredMsgs(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}}) + require_NoError(t, err) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"}) + require_NoError(t, err) + + _, err = js.Publish("foo", []byte("msg")) + require_NoError(t, err) + + sub, err := nc.SubscribeSync("reply") + require_NoError(t, err) + defer sub.Drain() + require_NoError(t, nc.Flush()) + + mset, err := s.globalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + req := &JSApiConsumerGetNextRequest{NoWait: true, Batch: 2} + jreq, err := json.Marshal(req) + require_NoError(t, err) + o.processNextMsgRequest("reply", jreq) + + msg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, msg.Subject, "foo") + require_Equal(t, string(msg.Data), "msg") + + // We requested two messages but the stream only contained 1. + msg, err = sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, msg.Header.Get("Status"), "404") + require_Equal(t, msg.Header.Get("Description"), "No Messages") +} From 5d8652bac3adde73f13425eb48b7de5ee6c5ba39 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Mon, 27 Oct 2025 09:26:11 +0000 Subject: [PATCH 13/24] Update Go dependencies Signed-off-by: Neil Twigg --- go.mod | 4 ++-- go.sum | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 7f127aca4c2..1dd04c5cf90 100644 --- a/go.mod +++ b/go.mod @@ -7,10 +7,10 @@ toolchain go1.24.9 require ( github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op github.com/google/go-tpm v0.9.6 - github.com/klauspost/compress v1.18.0 + github.com/klauspost/compress v1.18.1 github.com/minio/highwayhash v1.0.3 github.com/nats-io/jwt/v2 v2.8.0 - github.com/nats-io/nats.go v1.46.1 + github.com/nats-io/nats.go v1.47.0 github.com/nats-io/nkeys v0.4.11 github.com/nats-io/nuid v1.0.1 go.uber.org/automaxprocs v1.6.0 diff --git a/go.sum b/go.sum index 49aa2382556..b351d963d39 100644 --- a/go.sum +++ b/go.sum @@ -4,14 +4,14 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/go-tpm v0.9.6 h1:Ku42PT4LmjDu1H5C5ISWLlpI1mj+Zq7sPGKoRw2XROA= github.com/google/go-tpm v0.9.6/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= +github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g= github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= -github.com/nats-io/nats.go v1.46.1 h1:bqQ2ZcxVd2lpYI97xYASeRTY3I5boe/IVmuUDPitHfo= -github.com/nats-io/nats.go v1.46.1/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nats.go v1.47.0 h1:YQdADw6J/UfGUd2Oy6tn4Hq6YHxCaJrVKayxxFqYrgM= +github.com/nats-io/nats.go v1.47.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= From d9a249cd811d50cd8552622243e936fd33985c2a Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Mon, 27 Oct 2025 09:27:07 +0000 Subject: [PATCH 14/24] Update GHA dependencies Signed-off-by: Neil Twigg --- .github/workflows/release.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index b1cdc3fa503..c8e8b280358 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -33,13 +33,13 @@ jobs: - name: Install cosign # Use commit hash here to avoid a re-tagging attack, as this is a third-party action - # Commit d7543c93d881b35a8faa02e8e3605f69b7a1ce62 = tag v3.10.0 - uses: sigstore/cosign-installer@d7543c93d881b35a8faa02e8e3605f69b7a1ce62 + # Commit faadad0cce49287aee09b3a48701e75088a2c6ad = tag v4.0.0 + uses: sigstore/cosign-installer@faadad0cce49287aee09b3a48701e75088a2c6ad - name: Install syft # Use commit hash here to avoid a re-tagging attack, as this is a third-party action - # Commit f8bdd1d8ac5e901a77a92f111440fdb1b593736b = tag v0.20.6 - uses: anchore/sbom-action/download-syft@f8bdd1d8ac5e901a77a92f111440fdb1b593736b + # Commit 8e94d75ddd33f69f691467e42275782e4bfefe84 = tag v0.20.9 + uses: anchore/sbom-action/download-syft@8e94d75ddd33f69f691467e42275782e4bfefe84 with: syft-version: "v1.27.1" From e01763534eb733acb11d345e968e1bff7f5efc48 Mon Sep 17 00:00:00 2001 From: Alex Bozhenko Date: Fri, 24 Oct 2025 12:31:55 -0700 Subject: [PATCH 15/24] add expvarz Signed-off-by: Alex Bozhenko --- server/monitor.go | 4 +++- server/server.go | 5 +++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/server/monitor.go b/server/monitor.go index 83a239d5300..c59a47b81a6 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1501,7 +1501,8 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { LeafNodes %s Gateways %s Raft Groups %s - Health Probe %s + Health Probe %s + Expvar %s Help `, @@ -1518,6 +1519,7 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { s.basePath(GatewayzPath), GatewayzPath, s.basePath(RaftzPath), RaftzPath, s.basePath(HealthzPath), HealthzPath, + s.basePath(ExpvarzPath), ExpvarzPath, ) } diff --git a/server/server.go b/server/server.go index fb3e472b87e..38982bd382e 100644 --- a/server/server.go +++ b/server/server.go @@ -44,6 +44,8 @@ import ( // Allow dynamic profiling. _ "net/http/pprof" + "expvar" + "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats-server/v2/logger" @@ -3017,6 +3019,7 @@ const ( HealthzPath = "/healthz" IPQueuesPath = "/ipqueuesz" RaftzPath = "/raftz" + ExpvarzPath = "/debug/vars" ) func (s *Server) basePath(p string) string { @@ -3135,6 +3138,8 @@ func (s *Server) startMonitoring(secure bool) error { mux.HandleFunc(s.basePath(IPQueuesPath), s.HandleIPQueuesz) // Raftz mux.HandleFunc(s.basePath(RaftzPath), s.HandleRaftz) + // Expvarz + mux.Handle(s.basePath(ExpvarzPath), expvar.Handler()) // Do not set a WriteTimeout because it could cause cURL/browser // to return empty response or unable to display page if the From 9c39cba1ef5e9d193c6e4c5144457f9ad43e7ed6 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 22 Oct 2025 17:27:31 +0200 Subject: [PATCH 16/24] [FIXED] NRG: Always only report leader after noop-entry Signed-off-by: Maurice van Veen --- server/raft.go | 39 +++++--------------- server/raft_test.go | 89 +++++++++++++++++++++++++-------------------- 2 files changed, 59 insertions(+), 69 deletions(-) diff --git a/server/raft.go b/server/raft.go index 7ea1ee54c85..163d3e6e061 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2652,9 +2652,6 @@ func (n *raft) runAsLeader() { n.unsubscribe(rpsub) n.Unlock() }() - - // To send out our initial peer state. - n.sendPeerState() n.Unlock() hb := time.NewTicker(hbInterval) @@ -4627,37 +4624,19 @@ func (n *raft) switchToLeader() { } n.Lock() + defer n.Unlock() n.debug("Switching to leader") - // Check if we have items pending as we are taking over. - sendHB := n.pindex > n.commit - n.lxfer = false n.updateLeader(n.id) - leadChange := n.switchState(Leader) - - if leadChange { - // Wait for messages to be applied if we've stored more, otherwise signal immediately. - // It's important to wait signaling we're leader if we're not up-to-date yet, as that - // would mean we're in a consistent state compared with the previous leader. - if n.pindex > n.applied { - n.aflr = n.pindex - } else { - // We know we have applied all entries in our log and can signal immediately. - // For sanity reset applied floor back down to 0, so we aren't able to signal twice. - n.aflr = 0 - if !n.leaderState.Swap(true) { - // Only update timestamp if leader state actually changed. - nowts := time.Now().UTC() - n.leaderSince.Store(&nowts) - } - n.updateLeadChange(true) - } - } - n.Unlock() + n.switchState(Leader) - if sendHB { - n.sendHeartbeat() - } + // To send out our initial peer state. + // In our implementation this is equivalent to sending a NOOP-entry upon becoming leader. + // Wait for this message (and potentially more) to be applied. + // It's important to wait signaling we're leader if we're not up-to-date yet, as that + // would mean we're in a consistent state compared with the previous leader. + n.sendPeerState() + n.aflr = n.pindex } diff --git a/server/raft_test.go b/server/raft_test.go index e202237c9b3..2ae33c73b6f 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1570,11 +1570,11 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { aeMsg2 := encode(t, &appendEntry{leader: nats1, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) // Timeline, we temporarily became leader - aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 2, pterm: 1, pindex: 2, entries: nil}) - aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 2, pterm: 1, pindex: 2, entries: entries}) + aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 3, pterm: 1, pindex: 3, entries: nil}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 3, pterm: 1, pindex: 3, entries: entries}) // Timeline, old leader is back. - aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 3, commit: 2, pterm: 1, pindex: 2, entries: nil}) + aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 3, commit: 3, pterm: 1, pindex: 3, entries: nil}) // Simply receive first message. n.processAppendEntry(aeMsg1, n.aesub) @@ -1604,10 +1604,10 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { reply: _EMPTY_, success: true, }) - require_Equal(t, n.commit, 2) + require_Equal(t, n.commit, 3) // Simulate upper layer calling down to apply. - n.Applied(2) + n.Applied(3) // Install snapshot and check it exists. err = n.InstallSnapshot(nil) @@ -1621,9 +1621,9 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { // Store a third message, it stays uncommitted. require_NoError(t, n.storeToWAL(aeMsg3)) - require_Equal(t, n.commit, 2) + require_Equal(t, n.commit, 3) require_Equal(t, n.wal.State().Msgs, 1) - entry, err = n.loadEntry(3) + entry, err = n.loadEntry(4) require_NoError(t, err) require_Equal(t, entry.leader, nats0) @@ -1631,8 +1631,8 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { n.stepdown(noLeader) n.processAppendEntry(aeHeartbeat2, n.aesub) require_Equal(t, n.wal.State().Msgs, 0) - require_Equal(t, n.commit, 2) - require_Equal(t, n.applied, 2) + require_Equal(t, n.commit, 3) + require_Equal(t, n.applied, 3) } func TestNRGIgnoreDoubleSnapshot(t *testing.T) { @@ -2212,7 +2212,7 @@ func TestNRGHealthCheckWaitForPendingCommitsWhenPaused(t *testing.T) { require_True(t, n.Healthy()) } -func TestNRGHeartbeatCanEstablishQuorumAfterLeaderChange(t *testing.T) { +func TestNRGAppendEntryCanEstablishQuorumAfterLeaderChange(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() @@ -2224,7 +2224,7 @@ func TestNRGHeartbeatCanEstablishQuorumAfterLeaderChange(t *testing.T) { // Timeline aeMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) - aeHeartbeatResponse := &appendEntryResponse{term: 1, index: 1, peer: nats0, success: true} + aeHeartbeatResponse := &appendEntryResponse{term: 1, index: 2, peer: nats0, success: true} // Process first message. n.processAppendEntry(aeMsg, n.aesub) @@ -2234,16 +2234,16 @@ func TestNRGHeartbeatCanEstablishQuorumAfterLeaderChange(t *testing.T) { // Simulate becoming leader, and not knowing if the stored entry has quorum and can be committed. // Switching to leader should send a heartbeat. n.switchToLeader() - require_Equal(t, n.aflr, 1) + require_Equal(t, n.aflr, 2) require_Equal(t, n.commit, 0) // We simulate receiving the successful heartbeat response here. It should move the commit up. n.processAppendEntryResponse(aeHeartbeatResponse) - require_Equal(t, n.commit, 1) - require_Equal(t, n.aflr, 1) + require_Equal(t, n.commit, 2) + require_Equal(t, n.aflr, 2) // Once the entry is applied, it should reset the applied floor. - n.Applied(1) + n.Applied(2) require_Equal(t, n.aflr, 0) } @@ -2251,10 +2251,6 @@ func TestNRGQuorumAccounting(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() - // Create a sample entry, the content doesn't matter, just that it's stored. - esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) - entries := []*Entry{newEntry(EntryNormal, esm)} - nats1 := "yrzKKRBu" // "nats-1" nats2 := "cnrtt3eg" // "nats-2" @@ -2267,10 +2263,8 @@ func TestNRGQuorumAccounting(t *testing.T) { require_Equal(t, n.csz, 5) require_Equal(t, n.qn, 3) - // Switch this node to leader, and send an entry. + // Switch this node to leader which sends an entry. n.switchToLeader() - require_Equal(t, n.pindex, 0) - n.sendAppendEntry(entries) require_Equal(t, n.pindex, 1) // The first response MUST NOT indicate quorum has been reached. @@ -2286,10 +2280,6 @@ func TestNRGRevalidateQuorumAfterLeaderChange(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() - // Create a sample entry, the content doesn't matter, just that it's stored. - esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) - entries := []*Entry{newEntry(EntryNormal, esm)} - nats1 := "yrzKKRBu" // "nats-1" nats2 := "cnrtt3eg" // "nats-2" @@ -2302,12 +2292,10 @@ func TestNRGRevalidateQuorumAfterLeaderChange(t *testing.T) { require_Equal(t, n.csz, 5) require_Equal(t, n.qn, 3) - // Switch this node to leader, and send an entry. + // Switch this node to leader which sends an entry. n.term++ n.switchToLeader() require_Equal(t, n.term, 1) - require_Equal(t, n.pindex, 0) - n.sendAppendEntry(entries) require_Equal(t, n.pindex, 1) // We have one server that signals the message was stored. The leader will add 1 to the acks count. @@ -2354,10 +2342,10 @@ func TestNRGSignalLeadChangeFalseIfCampaignImmediately(t *testing.T) { n.switchToCandidate() n.switchToLeader() select { - case isLeader := <-n.LeadChangeC(): - require_True(t, isLeader) + case <-n.LeadChangeC(): + t.Error("Expected no leadChange signal") default: - t.Error("Expected leadChange signal") + // Expecting no signal yet. } }, }, @@ -2460,16 +2448,10 @@ func TestNRGIgnoreTrackResponseWhenNotLeader(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() - // Create a sample entry, the content doesn't matter, just that it's stored. - esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) - entries := []*Entry{newEntry(EntryNormal, esm)} - - // Switch this node to leader, and send two entries. The first will get quorum, the second will not. + // Switch this node to leader which sends an entry. n.term++ n.switchToLeader() require_Equal(t, n.term, 1) - require_Equal(t, n.pindex, 0) - n.sendAppendEntry(entries) require_Equal(t, n.pindex, 1) require_Equal(t, n.pterm, 1) require_Equal(t, n.commit, 0) @@ -3753,6 +3735,35 @@ func TestNRGParallelCatchupRollback(t *testing.T) { require_Equal(t, n.pindex, 2) } +func TestNRGReportLeaderAfterNoopEntry(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + require_Equal(t, n.State(), Follower) + require_Equal(t, n.term, 0) + require_False(t, n.Leader()) + + n.switchToCandidate() + require_Equal(t, n.State(), Candidate) + require_Equal(t, n.term, 1) + require_False(t, n.Leader()) + + // Switching to leader will put us into Leader state, + // but we're not necessarily an up-to-date leader yet. + n.switchToLeader() + require_Equal(t, n.State(), Leader) + require_Equal(t, n.term, 1) + require_Equal(t, n.pindex, 1) // Should've sent a NOOP-entry to establish leadership. + require_Equal(t, n.applied, 0) + require_False(t, n.Leader()) + + // Once we commit and apply the final entry, we should starting to report we're leader. + n.commit = 1 + n.Applied(1) + require_Equal(t, n.applied, 1) + require_True(t, n.Leader()) +} + // This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before // proposing the next one. // The test may fail if: From 0d53d5b15306dc22795c99e7f081086ab66571f1 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Mon, 20 Oct 2025 14:53:50 +0200 Subject: [PATCH 17/24] [FIXED] Gateway RS+/- blocks on account fetch Signed-off-by: Maurice van Veen --- server/gateway_test.go | 57 ++++++++++++++++++++++++++ server/jetstream_super_cluster_test.go | 2 +- server/leafnode.go | 3 +- server/server.go | 11 ++++- 4 files changed, 69 insertions(+), 4 deletions(-) diff --git a/server/gateway_test.go b/server/gateway_test.go index c457d115026..de23b97c063 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -33,6 +33,7 @@ import ( "github.com/nats-io/nats-server/v2/logger" "github.com/nats-io/nats.go" + "github.com/nats-io/nkeys" "golang.org/x/crypto/ocsp" . "github.com/nats-io/nats-server/v2/internal/ocsp" @@ -7557,3 +7558,59 @@ func TestGatewayConfigureWriteDeadline(t *testing.T) { require_Equal(t, r.out.wdl, 5*time.Second) }) } + +func TestGatewayProcessRSubNoBlockingAccountFetch(t *testing.T) { + createAccountPubKey := func() string { + kp, err := nkeys.CreateAccount() + require_NoError(t, err) + pubkey, err := kp.PublicKey() + require_NoError(t, err) + return pubkey + } + sysPub := createAccountPubKey() + accPub := createAccountPubKey() + dir := t.TempDir() + conf := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + server_name: srv + operator: %s + system_account: %s + resolver: { + type: cache + dir: '%s' + timeout: "2s" + } + gateway: { + name: "clust-B" + listen: 127.0.0.1:-1 + } + `, ojwt, sysPub, dir))) + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + // Set up a mock gateway client. + c := s.createInternalAccountClient() + c.mu.Lock() + c.gw = &gateway{} + c.gw.outsim = &sync.Map{} + c.nc = &net.IPConn{} + c.mu.Unlock() + + // Receiving a R+ should not be blocking, since we're in the gateway's readLoop. + start := time.Now() + require_NoError(t, c.processGatewayRSub(fmt.Appendf(nil, "%s subj queue 0", accPub))) + c.mu.Lock() + subs := len(c.subs) + c.mu.Unlock() + require_Len(t, subs, 1) + require_LessThan(t, time.Since(start), 100*time.Millisecond) + + // Receiving a R- should not be blocking, since we're in the gateway's readLoop. + start = time.Now() + require_NoError(t, c.processGatewayRUnsub(fmt.Appendf(nil, "%s subj queue", accPub))) + c.mu.Lock() + subs = len(c.subs) + c.mu.Unlock() + require_Len(t, subs, 0) + require_LessThan(t, time.Since(start), 100*time.Millisecond) +} diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index bb626ec242b..332db4b5fd6 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -4411,7 +4411,7 @@ func TestJetStreamSuperClusterMixedModeSwitchToInterestOnlyOperatorConfig(t *tes if gw.Name == opts.Gateway.Name { continue } - checkGWInterestOnlyMode(t, s, gw.Name, apub) + checkGWInterestOnlyModeOrNotPresent(t, s, gw.Name, apub, true) } } s = sc.serverByName(si.Cluster.Leader) diff --git a/server/leafnode.go b/server/leafnode.go index f306ebc9c15..5e233f7d99d 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -2426,7 +2426,8 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) { // updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-. func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscription, delta int32) { - acc, err := s.LookupAccount(accName) + // Since we're in the gateway's readLoop, and we would otherwise block, don't allow fetching. + acc, err := s.lookupOrFetchAccount(accName, false) if acc == nil || err != nil { s.Debugf("No or bad account for %q, failed to update interest from gateway", accName) return diff --git a/server/server.go b/server/server.go index 38982bd382e..3367c467260 100644 --- a/server/server.go +++ b/server/server.go @@ -2051,6 +2051,13 @@ func (s *Server) setRouteInfo(acc *Account) { // associated with an account name. // Lock MUST NOT be held upon entry. func (s *Server) lookupAccount(name string) (*Account, error) { + return s.lookupOrFetchAccount(name, true) +} + +// lookupOrFetchAccount is a function to return the account structure +// associated with an account name. +// Lock MUST NOT be held upon entry. +func (s *Server) lookupOrFetchAccount(name string, fetch bool) (*Account, error) { var acc *Account if v, ok := s.accounts.Load(name); ok { acc = v.(*Account) @@ -2060,7 +2067,7 @@ func (s *Server) lookupAccount(name string) (*Account, error) { // return the latest information from the resolver. if acc.IsExpired() { s.Debugf("Requested account [%s] has expired", name) - if s.AccountResolver() != nil { + if s.AccountResolver() != nil && fetch { if err := s.updateAccount(acc); err != nil { // This error could mask expired, so just return expired here. return nil, ErrAccountExpired @@ -2072,7 +2079,7 @@ func (s *Server) lookupAccount(name string) (*Account, error) { return acc, nil } // If we have a resolver see if it can fetch the account. - if s.AccountResolver() == nil { + if s.AccountResolver() == nil || !fetch { return nil, ErrMissingAccount } return s.fetchAccount(name) From d74cc2540d07f38da6fd0d5bbeb08d1c417f5874 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Tue, 28 Oct 2025 16:29:57 +0000 Subject: [PATCH 18/24] Add `meta_compact` option to control JetStream meta group compaction/snapshotting Signed-off-by: Neil Twigg --- server/jetstream_cluster.go | 26 +++++++++------ server/jetstream_cluster_4_test.go | 53 ++++++++++++++++++++++++++++++ server/jetstream_test.go | 41 +++++++++++++++++++++++ server/opts.go | 7 ++++ server/reload.go | 4 ++- 5 files changed, 120 insertions(+), 11 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index ab4a2b1e798..5b45b679d83 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1344,13 +1344,17 @@ func (js *jetStream) monitorCluster() { js.setMetaRecovering() // Snapshotting function. - doSnapshot := func() { + doSnapshot := func(force bool) { // Suppress during recovery. if js.isMetaRecovering() { return } - // For the meta layer we want to snapshot when asked if we need one or have any entries that we can compact. - if ne, _ := n.Size(); ne > 0 || n.NeedSnapshot() { + // Look up what the threshold is for compaction. Re-reading from config here as it is reloadable. + js.srv.optsMu.RLock() + thresh := js.srv.opts.JetStreamMetaCompact + js.srv.optsMu.RUnlock() + // For the meta layer we want to snapshot when over the above threshold (which could be 0 by default). + if ne, _ := n.Size(); force || ne > thresh || n.NeedSnapshot() { snap, err := js.metaSnapshot() if err != nil { s.Warnf("Error generating JetStream cluster snapshot: %v", err) @@ -1379,15 +1383,15 @@ func (js *jetStream) monitorCluster() { select { case <-s.quitCh: // Server shutting down, but we might receive this before qch, so try to snapshot. - doSnapshot() + doSnapshot(false) return case <-rqch: // Clean signal from shutdown routine so do best effort attempt to snapshot meta layer. - doSnapshot() + doSnapshot(false) return case <-qch: // Clean signal from shutdown routine so do best effort attempt to snapshot meta layer. - doSnapshot() + doSnapshot(false) // Return the signal back since shutdown will be waiting. close(qch) return @@ -1423,6 +1427,8 @@ func (js *jetStream) monitorCluster() { // Clear. ru = nil s.Debugf("Recovered JetStream cluster metadata") + // Snapshot now so we start with freshly compacted log. + doSnapshot(true) oc = time.AfterFunc(30*time.Second, js.checkForOrphans) // Do a health check here as well. go checkHealth() @@ -1435,9 +1441,9 @@ func (js *jetStream) monitorCluster() { _, nb = n.Applied(ce.Index) } if js.hasPeerEntries(ce.Entries) || (didSnap && !isLeader) { - doSnapshot() + doSnapshot(true) } else if nb > compactSizeMin && time.Since(lastSnapTime) > minSnapDelta { - doSnapshot() + doSnapshot(false) } } else { s.Warnf("Error applying JetStream cluster entries: %v", err) @@ -1453,11 +1459,11 @@ func (js *jetStream) monitorCluster() { s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil) // Install a snapshot as we become leader. js.checkClusterSize() - doSnapshot() + doSnapshot(false) } case <-t.C: - doSnapshot() + doSnapshot(false) // Periodically check the cluster size. if n.Leader() { js.checkClusterSize() diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 516ae2459f5..250bacb7aab 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -7078,3 +7078,56 @@ func TestJetStreamClusterAccountMaxConnectionsReconnect(t *testing.T) { return nil }) } + +func TestJetStreamClusterMetaCompactThreshold(t *testing.T) { + for _, thres := range []uint64{0, 5, 10} { + t.Run(fmt.Sprintf("%d", thres), func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R1TEST", 3) + defer c.shutdown() + for _, s := range c.servers { + s.optsMu.Lock() + s.opts.JetStreamMetaCompact = thres + s.optsMu.Unlock() + } + + nc, _ := jsClientConnect(t, c.servers[0]) + defer nc.Close() + + leader := c.leader() + _, cc := leader.getJetStreamCluster() + rg := cc.meta.(*raft) + + for i := range uint64(15) { + rg.RLock() + papplied := rg.papplied + rg.RUnlock() + + jsStreamCreate(t, nc, &StreamConfig{ + Name: fmt.Sprintf("test_%d", i), + Subjects: []string{fmt.Sprintf("test.%d", i)}, + Storage: MemoryStorage, + }) + + // Kicking the leader change channel is the easiest way to + // trick monitorCluster() into calling doSnapshot(). + entries, _ := cc.meta.Size() + cc.meta.(*raft).leadc <- true + + // Should we have compacted on this iteration? + if entries > thres { + checkFor(t, time.Second, 5*time.Millisecond, func() error { + rg.RLock() + npapplied := rg.papplied + rg.RUnlock() + if npapplied <= papplied { + return fmt.Errorf("haven't snapshotted yet (%d <= %d)", npapplied, papplied) + } + return nil + }) + entries, _ = cc.meta.Size() + require_Equal(t, entries, 0) + } + } + }) + } +} diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 020ad0eb589..d5e831a87b0 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -22316,3 +22316,44 @@ func TestJetStreamDirectGetBatchParallelWriteDeadlock(t *testing.T) { return nil }) } + +func TestJetStreamReloadMetaCompact(t *testing.T) { + storeDir := t.TempDir() + + conf := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + jetstream: { + max_mem_store: 2MB + max_file_store: 8MB + store_dir: '%s' + } + `, storeDir))) + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + require_Equal(t, s.getOpts().JetStreamMetaCompact, 0) + + reloadUpdateConfig(t, s, conf, fmt.Sprintf(` + listen: 127.0.0.1:-1 + jetstream: { + max_mem_store: 2MB + max_file_store: 8MB + store_dir: '%s' + meta_compact: 100 + } + `, storeDir)) + + require_Equal(t, s.getOpts().JetStreamMetaCompact, 100) + + reloadUpdateConfig(t, s, conf, fmt.Sprintf(` + listen: 127.0.0.1:-1 + jetstream: { + max_mem_store: 2MB + max_file_store: 8MB + store_dir: '%s' + meta_compact: 0 + } + `, storeDir)) + + require_Equal(t, s.getOpts().JetStreamMetaCompact, 0) +} diff --git a/server/opts.go b/server/opts.go index 9931bf556a4..d05b3074954 100644 --- a/server/opts.go +++ b/server/opts.go @@ -383,6 +383,7 @@ type Options struct { JetStreamTpm JSTpmOpts JetStreamMaxCatchup int64 JetStreamRequestQueueLimit int64 + JetStreamMetaCompact uint64 StreamMaxBufferedMsgs int `json:"-"` StreamMaxBufferedSize int64 `json:"-"` StoreDir string `json:"-"` @@ -2603,6 +2604,12 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} } opts.JetStreamRequestQueueLimit = lim + case "meta_compact": + thres, ok := mv.(int64) + if !ok || thres < 0 { + return &configErr{tk, fmt.Sprintf("Expected an absolute size for %q, got %v", mk, mv)} + } + opts.JetStreamMetaCompact = uint64(thres) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/reload.go b/server/reload.go index 89594d1b9e3..d3530a550ad 100644 --- a/server/reload.go +++ b/server/reload.go @@ -1257,7 +1257,7 @@ func imposeOrder(value any) error { slices.SortFunc(value.Gateways, func(i, j *RemoteGatewayOpts) int { return cmp.Compare(i.Name, j.Name) }) case WebsocketOpts: slices.Sort(value.AllowedOrigins) - case string, bool, uint8, uint16, int, int32, int64, time.Duration, float64, nil, LeafNodeOpts, ClusterOpts, *tls.Config, PinnedCertSet, + case string, bool, uint8, uint16, uint64, int, int32, int64, time.Duration, float64, nil, LeafNodeOpts, ClusterOpts, *tls.Config, PinnedCertSet, *URLAccResolver, *MemAccResolver, *DirAccResolver, *CacheDirAccResolver, Authentication, MQTTOpts, jwt.TagList, *OCSPConfig, map[string]string, JSLimitOpts, StoreCipher, *OCSPResponseCacheConfig, *ProxiesConfig: // explicitly skipped types @@ -1659,6 +1659,8 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { return nil, fmt.Errorf("config reload not supported for jetstream max memory and store") } } + case "jetstreammetacompact": + // Allowed at runtime but monitorCluster looks at s.opts directly, so no further work needed here. case "websocket": // Similar to gateways tmpOld := oldValue.(WebsocketOpts) From c14e666fbb7a5513f67ecd7558d8bcb0282a2a5a Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 28 Oct 2025 10:24:12 +0100 Subject: [PATCH 19/24] [FIXED] Default to allowing binary stream snapshots Signed-off-by: Maurice van Veen --- server/events.go | 48 ++++++++++++++++++++++++------------------------ server/route.go | 16 ++++++++++++++-- server/server.go | 21 ++++++++++++--------- 3 files changed, 50 insertions(+), 35 deletions(-) diff --git a/server/events.go b/server/events.go index ff2ee46367e..8bcb3a713fa 100644 --- a/server/events.go +++ b/server/events.go @@ -1735,18 +1735,18 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su node := getHash(si.Name) accountNRG := si.AccountNRG() oldInfo, _ := s.nodeToInfo.Swap(node, nodeInfo{ - si.Name, - si.Version, - si.Cluster, - si.Domain, - si.ID, - si.Tags, - cfg, - stats, - false, - si.JetStreamEnabled(), - si.BinaryStreamSnapshot(), - accountNRG, + name: si.Name, + version: si.Version, + cluster: si.Cluster, + domain: si.Domain, + id: si.ID, + tags: si.Tags, + cfg: cfg, + stats: stats, + offline: false, + js: si.JetStreamEnabled(), + binarySnapshots: si.BinaryStreamSnapshot(), + accountNRG: accountNRG, }) if oldInfo == nil || accountNRG != oldInfo.(nodeInfo).accountNRG { // One of the servers we received statsz from changed its mind about @@ -1789,18 +1789,18 @@ func (s *Server) processNewServer(si *ServerInfo) { // Only update if non-existent if _, ok := s.nodeToInfo.Load(node); !ok { s.nodeToInfo.Store(node, nodeInfo{ - si.Name, - si.Version, - si.Cluster, - si.Domain, - si.ID, - si.Tags, - nil, - nil, - false, - si.JetStreamEnabled(), - si.BinaryStreamSnapshot(), - si.AccountNRG(), + name: si.Name, + version: si.Version, + cluster: si.Cluster, + domain: si.Domain, + id: si.ID, + tags: si.Tags, + cfg: nil, + stats: nil, + offline: false, + js: si.JetStreamEnabled(), + binarySnapshots: si.BinaryStreamSnapshot(), + accountNRG: si.AccountNRG(), }) } } diff --git a/server/route.go b/server/route.go index 008e6ede51c..57e5320fa70 100644 --- a/server/route.go +++ b/server/route.go @@ -2346,8 +2346,20 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo bool, gossipMod if doOnce { // check to be consistent and future proof. but will be same domain if s.sameDomain(info.Domain) { - s.nodeToInfo.Store(rHash, - nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false, false}) + s.nodeToInfo.Store(rHash, nodeInfo{ + name: rn, + version: s.info.Version, + cluster: s.info.Cluster, + domain: info.Domain, + id: id, + tags: nil, + cfg: nil, + stats: nil, + offline: false, + js: info.JetStream, + binarySnapshots: true, // Updated default to true. Versions 2.10.0+ support it. + accountNRG: false, + }) } } diff --git a/server/server.go b/server/server.go index 3367c467260..b366475da6a 100644 --- a/server/server.go +++ b/server/server.go @@ -843,15 +843,18 @@ func NewServer(opts *Options) (*Server, error) { if opts.JetStream { ourNode := getHash(serverName) s.nodeToInfo.Store(ourNode, nodeInfo{ - serverName, - VERSION, - opts.Cluster.Name, - opts.JetStreamDomain, - info.ID, - opts.Tags, - &JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, CompressOK: true}, - nil, - false, true, true, true, + name: serverName, + version: VERSION, + cluster: opts.Cluster.Name, + domain: opts.JetStreamDomain, + id: info.ID, + tags: opts.Tags, + cfg: &JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, CompressOK: true}, + stats: nil, + offline: false, + js: true, + binarySnapshots: true, + accountNRG: true, }) } From f07c0a7f1de1ccda998a6c8f885e325db07c49e6 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 29 Oct 2025 10:24:26 +0100 Subject: [PATCH 20/24] [TEST] Add assert.Unreachable when using legacy JSON stream snapshot Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 5b45b679d83..a71b73b0cd3 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -8765,6 +8765,13 @@ func (mset *stream) stateSnapshotLocked() []byte { } // Older v1 version with deleted as a sorted []uint64. + // For a stream with millions or billions of interior deletes, this will be huge. + // Now that all server versions 2.10.+ support binary snapshots, we should never fall back. + assert.Unreachable("Legacy JSON stream snapshot used", map[string]any{ + "stream": mset.cfg.Name, + "account": mset.acc.Name, + }) + state := mset.store.State() snap := &streamSnapshot{ Msgs: state.Msgs, From 2b62e9284c8e4862e4f6aaf8d7e4d0af3178d279 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 30 Oct 2025 10:25:23 +0000 Subject: [PATCH 21/24] Don't snapshot on monitor goroutine or Raft node quit signal unless shutting down Signed-off-by: Neil Twigg --- server/jetstream_cluster.go | 24 ++++++++++++++++++++---- server/jetstream_cluster_1_test.go | 12 +++++------- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index a71b73b0cd3..a40e9590cd1 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2623,11 +2623,19 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps return case <-mqch: // Clean signal from shutdown routine so do best effort attempt to snapshot. - doSnapshot() + // Don't snapshot if not shutting down, monitor goroutine could be going away + // on a scale down or a remove for example. + if s.isShuttingDown() { + doSnapshot() + } return case <-qch: // Clean signal from shutdown routine so do best effort attempt to snapshot. - doSnapshot() + // Don't snapshot if not shutting down, Raft node could be going away on a + // scale down or remove for example. + if s.isShuttingDown() { + doSnapshot() + } return case <-aq.ch: var ne, nb uint64 @@ -5521,11 +5529,19 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { return case <-mqch: // Clean signal from shutdown routine so do best effort attempt to snapshot. - doSnapshot(false) + // Don't snapshot if not shutting down, monitor goroutine could be going away + // on a scale down or a remove for example. + if s.isShuttingDown() { + doSnapshot(false) + } return case <-qch: // Clean signal from shutdown routine so do best effort attempt to snapshot. - doSnapshot(false) + // Don't snapshot if not shutting down, Raft node could be going away on a + // scale down or remove for example. + if s.isShuttingDown() { + doSnapshot(false) + } return case <-aq.ch: ces := aq.pop() diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 8b2e54cb168..80257d0f163 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -9234,13 +9234,11 @@ func TestJetStreamClusterAsyncFlushFileStoreFlushOnSnapshot(t *testing.T) { // Confirm above write is pending. require_Equal(t, lmb.pendingWriteSize(), 33) - // Stop stream monitor routine, which will install a snapshot on shutdown. - mset.mu.Lock() - if mset.mqch != nil { - close(mset.mqch) - mset.mqch = nil - } - mset.mu.Unlock() + // Make the upper layer snapshot by sending leader change signal. + // It doesn't matter that we're already leader, it still gets handled. + // Previously this used the mqch, but that now only snapshots on shutdown. + n.leadc <- true + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { n.Lock() snap, err := n.loadLastSnapshot() From cbfc3309bccb6570b1f567781a725b930e9b092d Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Mon, 27 Oct 2025 13:21:06 +0000 Subject: [PATCH 22/24] Parallelisation when enabling JetStream Signed-off-by: Neil Twigg --- server/accounts.go | 4 +- server/events_test.go | 2 +- server/jetstream.go | 339 ++++++++++++++++++++------------------- server/jetstream_test.go | 16 +- server/mqtt_test.go | 2 +- server/util.go | 23 +++ 6 files changed, 213 insertions(+), 173 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index a9df6e0365a..548f6943ec4 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -3794,7 +3794,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim // If JetStream is enabled for this server we will call into configJetStream for the account // regardless of enabled or disabled. It handles both cases. if jsEnabled { - if err := s.configJetStream(a); err != nil { + if err := s.configJetStream(a, nil); err != nil { s.Errorf("Error configuring jetstream for account [%s]: %v", tl, err.Error()) a.mu.Lock() // Absent reload of js server cfg, this is going to be broken until js is disabled @@ -4371,7 +4371,7 @@ func (dr *DirAccResolver) Start(s *Server) error { s.Warnf("DirResolver - Error checking for JetStream support for account %q: %v", pubKey, err) } } else if jsa == nil { - if err = s.configJetStream(acc); err != nil { + if err = s.configJetStream(acc, nil); err != nil { s.Errorf("DirResolver - Error configuring JetStream for account %q: %v", pubKey, err) } } diff --git a/server/events_test.go b/server/events_test.go index 8d4531e6bd8..68d2b83c55a 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1238,7 +1238,7 @@ func TestAccountReqMonitoring(t *testing.T) { s.EnableJetStream(&JetStreamConfig{StoreDir: t.TempDir()}) unusedAcc, _ := createAccount(s) acc, akp := createAccount(s) - acc.EnableJetStream(nil) + acc.EnableJetStream(nil, nil) subsz := fmt.Sprintf(accDirectReqSubj, acc.Name, "SUBSZ") connz := fmt.Sprintf(accDirectReqSubj, acc.Name, "CONNZ") jsz := fmt.Sprintf(accDirectReqSubj, acc.Name, "JSZ") diff --git a/server/jetstream.go b/server/jetstream.go index 0aa2e527aaf..840e56635ae 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -194,6 +194,11 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error { } s.Noticef("Starting JetStream") + start := time.Now() + defer func() { + s.Noticef("Took %s to start JetStream", time.Since(start)) + }() + if config == nil || config.MaxMemory <= 0 || config.MaxStore <= 0 { var storeDir, domain, uniqueTag string var maxStore, maxMem int64 @@ -686,6 +691,11 @@ func (s *Server) DisableJetStream() error { } func (s *Server) enableJetStreamAccounts() error { + // Reuse the same task workers across all accounts, so that we don't explode + // with a large number of goroutines on multi-account systems. + tq := parallelTaskQueue(0) + defer close(tq) + // If we have no configured accounts setup then setup imports on global account. if s.globalAccountOnly() { gacc := s.GlobalAccount() @@ -694,10 +704,10 @@ func (s *Server) enableJetStreamAccounts() error { gacc.jsLimits = defaultJSAccountTiers } gacc.mu.Unlock() - if err := s.configJetStream(gacc); err != nil { + if err := s.configJetStream(gacc, tq); err != nil { return err } - } else if err := s.configAllJetStreamAccounts(); err != nil { + } else if err := s.configAllJetStreamAccounts(tq); err != nil { return fmt.Errorf("Error enabling jetstream on configured accounts: %v", err) } return nil @@ -761,7 +771,7 @@ func (a *Account) enableJetStreamInfoServiceImportOnly() error { return a.enableAllJetStreamServiceImportsAndMappings() } -func (s *Server) configJetStream(acc *Account) error { +func (s *Server) configJetStream(acc *Account, tq chan<- func()) error { if acc == nil { return nil } @@ -778,7 +788,7 @@ func (s *Server) configJetStream(acc *Account) error { return err } } else { - if err := acc.EnableJetStream(jsLimits); err != nil { + if err := acc.EnableJetStream(jsLimits, tq); err != nil { return err } if s.gateway.enabled { @@ -799,7 +809,7 @@ func (s *Server) configJetStream(acc *Account) error { } // configAllJetStreamAccounts walk all configured accounts and turn on jetstream if requested. -func (s *Server) configAllJetStreamAccounts() error { +func (s *Server) configAllJetStreamAccounts(tq chan<- func()) error { // Check to see if system account has been enabled. We could arrive here via reload and // a non-default system account. s.checkJetStreamExports() @@ -839,7 +849,7 @@ func (s *Server) configAllJetStreamAccounts() error { // Process any jetstream enabled accounts here. These will be accounts we are // already aware of at startup etc. for _, acc := range jsAccounts { - if err := s.configJetStream(acc); err != nil { + if err := s.configJetStream(acc, tq); err != nil { return err } } @@ -852,7 +862,7 @@ func (s *Server) configAllJetStreamAccounts() error { // Only load up ones not already loaded since they are processed above. if _, ok := accounts.Load(accName); !ok { if acc, err := s.lookupAccount(accName); err != nil && acc != nil { - if err := s.configJetStream(acc); err != nil { + if err := s.configJetStream(acc, tq); err != nil { return err } } @@ -1100,7 +1110,7 @@ func (a *Account) assignJetStreamLimits(limits map[string]JetStreamAccountLimits // EnableJetStream will enable JetStream on this account with the defined limits. // This is a helper for JetStreamEnableAccount. -func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) error { +func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq chan<- func()) error { a.mu.RLock() s := a.srv a.mu.RUnlock() @@ -1253,30 +1263,139 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro } } - // Collect consumers, do after all streams. - type ce struct { - mset *stream - odir string - } - var consumers []*ce - - // Collect any interest policy streams to check for - // https://github.com/nats-io/nats-server/issues/3612 - var ipstreams []*stream - // Remember if we should be encrypted and what cipher we think we should use. encrypted := s.getOpts().JetStreamKey != _EMPTY_ - plaintext := true sc := s.getOpts().JetStreamCipher + doConsumers := func(mset *stream, odir string) { + ofis, _ := os.ReadDir(odir) + if len(ofis) > 0 { + s.Noticef(" Recovering %d consumers for stream - '%s > %s'", len(ofis), mset.accName(), mset.name()) + } + for _, ofi := range ofis { + metafile := filepath.Join(odir, ofi.Name(), JetStreamMetaFile) + metasum := filepath.Join(odir, ofi.Name(), JetStreamMetaFileSum) + if _, err := os.Stat(metafile); os.IsNotExist(err) { + s.Warnf(" Missing consumer metafile %q", metafile) + continue + } + buf, err := os.ReadFile(metafile) + if err != nil { + s.Warnf(" Error reading consumer metafile %q: %v", metafile, err) + continue + } + if _, err := os.Stat(metasum); os.IsNotExist(err) { + s.Warnf(" Missing consumer checksum for %q", metasum) + continue + } + + // Check if we are encrypted. + if key, err := os.ReadFile(filepath.Join(odir, ofi.Name(), JetStreamMetaFileKey)); err == nil { + s.Debugf(" Consumer metafile is encrypted, reading encrypted keyfile") + // Decode the buffer before proceeding. + ctxName := mset.name() + tsep + ofi.Name() + nbuf, _, err := s.decryptMeta(sc, key, buf, a.Name, ctxName) + if err != nil { + s.Warnf(" Error decrypting our consumer metafile: %v", err) + continue + } + buf = nbuf + } + + var cfg FileConsumerInfo + decoder := json.NewDecoder(bytes.NewReader(buf)) + decoder.DisallowUnknownFields() + strictErr := decoder.Decode(&cfg) + if strictErr != nil { + cfg = FileConsumerInfo{} + if err := json.Unmarshal(buf, &cfg); err != nil { + s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, err) + continue + } + } + if supported := supportsRequiredApiLevel(cfg.Metadata); !supported || strictErr != nil { + var offlineReason string + if !supported { + apiLevel := getRequiredApiLevel(cfg.Metadata) + if strictErr != nil { + offlineReason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(strictErr.Error(), "json: ")) + } else { + offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel) + } + s.Warnf(" Detected unsupported consumer '%s > %s > %s': %s", a.Name, mset.name(), cfg.Name, offlineReason) + } else { + offlineReason = fmt.Sprintf("decoding error: %v", strictErr) + s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, strictErr) + } + singleServerMode := !s.JetStreamIsClustered() && s.standAloneMode() + if singleServerMode { + if !mset.closed.Load() { + s.Warnf(" Stopping unsupported stream '%s > %s'", a.Name, mset.name()) + mset.mu.Lock() + mset.offlineReason = fmt.Sprintf("stopped - unsupported consumer %q", cfg.Name) + mset.mu.Unlock() + mset.stop(false, false) + } + + // Fake a consumer, so we can respond to API requests as single-server. + o := &consumer{ + mset: mset, + js: s.getJetStream(), + acc: a, + srv: s, + cfg: cfg.ConsumerConfig, + active: false, + stream: mset.name(), + name: cfg.Name, + dseq: 1, + sseq: 1, + created: time.Now().UTC(), + closed: true, + offlineReason: offlineReason, + } + if !cfg.Created.IsZero() { + o.created = cfg.Created + } + + mset.mu.Lock() + mset.setConsumer(o) + mset.mu.Unlock() + } + continue + } + + isEphemeral := !isDurableConsumer(&cfg.ConsumerConfig) + if isEphemeral { + // This is an ephemeral consumer and this could fail on restart until + // the consumer can reconnect. We will create it as a durable and switch it. + cfg.ConsumerConfig.Durable = ofi.Name() + } + obs, err := mset.addConsumerWithAssignment(&cfg.ConsumerConfig, _EMPTY_, nil, true, ActionCreateOrUpdate, false) + if err != nil { + s.Warnf(" Error adding consumer %q: %v", cfg.Name, err) + continue + } + if isEphemeral { + obs.switchToEphemeral() + } + if !cfg.Created.IsZero() { + obs.setCreatedTime(cfg.Created) + } + if err != nil { + s.Warnf(" Error restoring consumer %q state: %v", cfg.Name, err) + } + } + } + // Now recover the streams. fis, _ := os.ReadDir(sdir) - for _, fi := range fis { + doStream := func(fi os.DirEntry) error { + plaintext := true mdir := filepath.Join(sdir, fi.Name()) // Check for partially deleted streams. They are marked with "." prefix. if strings.HasPrefix(fi.Name(), tsep) { go os.RemoveAll(mdir) - continue + return nil } key := sha256.Sum256([]byte(fi.Name())) hh, err := highwayhash.New64(key[:]) @@ -1287,27 +1406,27 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro metasum := filepath.Join(mdir, JetStreamMetaFileSum) if _, err := os.Stat(metafile); os.IsNotExist(err) { s.Warnf(" Missing stream metafile for %q", metafile) - continue + return nil } buf, err := os.ReadFile(metafile) if err != nil { s.Warnf(" Error reading metafile %q: %v", metafile, err) - continue + return nil } if _, err := os.Stat(metasum); os.IsNotExist(err) { s.Warnf(" Missing stream checksum file %q", metasum) - continue + return nil } sum, err := os.ReadFile(metasum) if err != nil { s.Warnf(" Error reading Stream metafile checksum %q: %v", metasum, err) - continue + return nil } hh.Write(buf) checksum := hex.EncodeToString(hh.Sum(nil)) if checksum != string(sum) { s.Warnf(" Stream metafile %q: checksums do not match %q vs %q", metafile, sum, checksum) - continue + return nil } // Track if we are converting ciphers. @@ -1320,14 +1439,14 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro s.Debugf(" Stream metafile is encrypted, reading encrypted keyfile") if len(keyBuf) < minMetaKeySize { s.Warnf(" Bad stream encryption key length of %d", len(keyBuf)) - continue + return nil } // Decode the buffer before proceeding. var nbuf []byte nbuf, convertingCiphers, err = s.decryptMeta(sc, keyBuf, buf, a.Name, fi.Name()) if err != nil { s.Warnf(" Error decrypting our stream metafile: %v", err) - continue + return nil } buf = nbuf plaintext = false @@ -1341,7 +1460,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro cfg = FileStreamInfo{} if err := json.Unmarshal(buf, &cfg); err != nil { s.Warnf(" Error unmarshalling stream metafile %q: %v", metafile, err) - continue + return nil } } if supported := supportsRequiredApiLevel(cfg.Metadata); !supported || strictErr != nil { @@ -1384,13 +1503,16 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro // Now do the consumers. odir := filepath.Join(sdir, fi.Name(), consumerDir) - consumers = append(consumers, &ce{mset, odir}) + doConsumers(mset, odir) } - continue + return nil } if cfg.Template != _EMPTY_ { - if err := jsa.addStreamNameToTemplate(cfg.Template, cfg.Name); err != nil { + jsa.mu.Lock() + err := jsa.addStreamNameToTemplate(cfg.Template, cfg.Name) + jsa.mu.Unlock() + if err != nil { s.Warnf(" Error adding stream %q to template %q: %v", cfg.Name, cfg.Template, err) } } @@ -1415,7 +1537,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro } } if hadSubjErr { - continue + return nil } // The other possible bug is assigning subjects to mirrors, so check for that and patch as well. @@ -1449,7 +1571,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro s.Warnf(" Error replacing meta keyfile for stream %q: %v", cfg.Name, err) } } - continue + return nil } if !cfg.Created.IsZero() { mset.setCreatedTime(cfg.Created) @@ -1514,146 +1636,41 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro s.Noticef(" Restored %s messages for stream '%s > %s' in %v", comma(int64(state.Msgs)), mset.accName(), mset.name(), time.Since(rt).Round(time.Millisecond)) + // Now do the consumers. + odir := filepath.Join(sdir, fi.Name(), consumerDir) + doConsumers(mset, odir) + // Collect to check for dangling messages. // TODO(dlc) - Can be removed eventually. if cfg.StreamConfig.Retention == InterestPolicy { - ipstreams = append(ipstreams, mset) + mset.checkForOrphanMsgs() + mset.checkConsumerReplication() } - // Now do the consumers. - odir := filepath.Join(sdir, fi.Name(), consumerDir) - consumers = append(consumers, &ce{mset, odir}) + return nil } - for _, e := range consumers { - ofis, _ := os.ReadDir(e.odir) - if len(ofis) > 0 { - s.Noticef(" Recovering %d consumers for stream - '%s > %s'", len(ofis), e.mset.accName(), e.mset.name()) - } - for _, ofi := range ofis { - metafile := filepath.Join(e.odir, ofi.Name(), JetStreamMetaFile) - metasum := filepath.Join(e.odir, ofi.Name(), JetStreamMetaFileSum) - if _, err := os.Stat(metafile); os.IsNotExist(err) { - s.Warnf(" Missing consumer metafile %q", metafile) - continue - } - buf, err := os.ReadFile(metafile) - if err != nil { - s.Warnf(" Error reading consumer metafile %q: %v", metafile, err) - continue - } - if _, err := os.Stat(metasum); os.IsNotExist(err) { - s.Warnf(" Missing consumer checksum for %q", metasum) - continue - } - - // Check if we are encrypted. - if key, err := os.ReadFile(filepath.Join(e.odir, ofi.Name(), JetStreamMetaFileKey)); err == nil { - s.Debugf(" Consumer metafile is encrypted, reading encrypted keyfile") - // Decode the buffer before proceeding. - ctxName := e.mset.name() + tsep + ofi.Name() - nbuf, _, err := s.decryptMeta(sc, key, buf, a.Name, ctxName) - if err != nil { - s.Warnf(" Error decrypting our consumer metafile: %v", err) - continue - } - buf = nbuf - } - - var cfg FileConsumerInfo - decoder := json.NewDecoder(bytes.NewReader(buf)) - decoder.DisallowUnknownFields() - strictErr := decoder.Decode(&cfg) - if strictErr != nil { - cfg = FileConsumerInfo{} - if err := json.Unmarshal(buf, &cfg); err != nil { - s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, err) - continue - } - } - if supported := supportsRequiredApiLevel(cfg.Metadata); !supported || strictErr != nil { - var offlineReason string - if !supported { - apiLevel := getRequiredApiLevel(cfg.Metadata) - if strictErr != nil { - offlineReason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(strictErr.Error(), "json: ")) - } else { - offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel) - } - s.Warnf(" Detected unsupported consumer '%s > %s > %s': %s", a.Name, e.mset.name(), cfg.Name, offlineReason) - } else { - offlineReason = fmt.Sprintf("decoding error: %v", strictErr) - s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, strictErr) - } - singleServerMode := !s.JetStreamIsClustered() && s.standAloneMode() - if singleServerMode { - if !e.mset.closed.Load() { - s.Warnf(" Stopping unsupported stream '%s > %s'", a.Name, e.mset.name()) - e.mset.mu.Lock() - e.mset.offlineReason = fmt.Sprintf("stopped - unsupported consumer %q", cfg.Name) - e.mset.mu.Unlock() - e.mset.stop(false, false) - } - - // Fake a consumer, so we can respond to API requests as single-server. - o := &consumer{ - mset: e.mset, - js: s.getJetStream(), - acc: a, - srv: s, - cfg: cfg.ConsumerConfig, - active: false, - stream: e.mset.name(), - name: cfg.Name, - dseq: 1, - sseq: 1, - created: time.Now().UTC(), - closed: true, - offlineReason: offlineReason, - } - if !cfg.Created.IsZero() { - o.created = cfg.Created - } - - e.mset.mu.Lock() - e.mset.setConsumer(o) - e.mset.mu.Unlock() - } - continue - } - - isEphemeral := !isDurableConsumer(&cfg.ConsumerConfig) - if isEphemeral { - // This is an ephemeral consumer and this could fail on restart until - // the consumer can reconnect. We will create it as a durable and switch it. - cfg.ConsumerConfig.Durable = ofi.Name() - } - obs, err := e.mset.addConsumerWithAssignment(&cfg.ConsumerConfig, _EMPTY_, nil, true, ActionCreateOrUpdate, false) - if err != nil { - s.Warnf(" Error adding consumer %q: %v", cfg.Name, err) - continue - } - if isEphemeral { - obs.switchToEphemeral() - } - if !cfg.Created.IsZero() { - obs.setCreatedTime(cfg.Created) - } - if err != nil { - s.Warnf(" Error restoring consumer %q state: %v", cfg.Name, err) + if tq != nil { + // If a parallelTaskQueue was provided then use that for concurrency. + var wg sync.WaitGroup + wg.Add(len(fis)) + for _, fi := range fis { + tq <- func() { + doStream(fi) + wg.Done() } } + wg.Wait() + } else { + // No parallelTaskQueue provided, do inline as before. + for _, fi := range fis { + doStream(fi) + } } // Make sure to cleanup any old remaining snapshots. os.RemoveAll(filepath.Join(jsa.storeDir, snapsDir)) - // Check interest policy streams for auto cleanup. - for _, mset := range ipstreams { - mset.checkForOrphanMsgs() - mset.checkConsumerReplication() - } - s.Debugf("JetStream state for account %q recovered", a.Name) return nil diff --git a/server/jetstream_test.go b/server/jetstream_test.go index d5e831a87b0..d5d47fb55b2 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -152,7 +152,7 @@ func TestJetStreamEnableAndDisableAccount(t *testing.T) { } acc, _ := s.LookupOrRegisterAccount("$FOO") - if err := acc.EnableJetStream(nil); err != nil { + if err := acc.EnableJetStream(nil, nil); err != nil { t.Fatalf("Did not expect error on enabling account: %v", err) } if na := s.JetStreamNumAccounts(); na != 1 { @@ -171,7 +171,7 @@ func TestJetStreamEnableAndDisableAccount(t *testing.T) { } // Should get an error for trying to enable a non-registered account. acc = NewAccount("$BAZ") - if err := acc.EnableJetStream(nil); err == nil { + if err := acc.EnableJetStream(nil, nil); err == nil { t.Fatalf("Expected error on enabling account that was not registered") } } @@ -4869,20 +4869,20 @@ func TestJetStreamSystemLimits(t *testing.T) { } } - if err := facc.EnableJetStream(limits(24, 192)); err != nil { + if err := facc.EnableJetStream(limits(24, 192), nil); err != nil { t.Fatalf("Unexpected error: %v", err) } // Use up rest of our resources in memory - if err := bacc.EnableJetStream(limits(1000, 0)); err != nil { + if err := bacc.EnableJetStream(limits(1000, 0), nil); err != nil { t.Fatalf("Unexpected error: %v", err) } // Now ask for more memory. Should error. - if err := zacc.EnableJetStream(limits(1000, 0)); err == nil { + if err := zacc.EnableJetStream(limits(1000, 0), nil); err == nil { t.Fatalf("Expected an error when exhausting memory resource limits") } // Disk too. - if err := zacc.EnableJetStream(limits(0, 10000)); err == nil { + if err := zacc.EnableJetStream(limits(0, 10000), nil); err == nil { t.Fatalf("Expected an error when exhausting memory resource limits") } facc.DisableJetStream() @@ -4896,7 +4896,7 @@ func TestJetStreamSystemLimits(t *testing.T) { t.Fatalf("Expected reserved memory and store to be 0, got %v and %v", friendlyBytes(rm), friendlyBytes(rd)) } - if err := facc.EnableJetStream(limits(24, 192)); err != nil { + if err := facc.EnableJetStream(limits(24, 192), nil); err != nil { t.Fatalf("Unexpected error: %v", err) } // Test Adjust @@ -7409,7 +7409,7 @@ func TestJetStreamCanNotEnableOnSystemAccount(t *testing.T) { defer s.Shutdown() sa := s.SystemAccount() - if err := sa.EnableJetStream(nil); err == nil { + if err := sa.EnableJetStream(nil, nil); err == nil { t.Fatalf("Expected an error trying to enable on the system account") } } diff --git a/server/mqtt_test.go b/server/mqtt_test.go index e779096ea07..c08c75e94b3 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -1035,7 +1035,7 @@ func testMQTTEnableJSForAccount(t *testing.T, s *Server, accName string) { MaxStore: 1024 * 1024, }, } - if err := acc.EnableJetStream(limits); err != nil { + if err := acc.EnableJetStream(limits, nil); err != nil { t.Fatalf("Error enabling JS: %v", err) } } diff --git a/server/util.go b/server/util.go index dcfa1000d43..4e08e3f0cda 100644 --- a/server/util.go +++ b/server/util.go @@ -23,6 +23,7 @@ import ( "net" "net/url" "reflect" + "runtime" "strconv" "strings" "time" @@ -340,3 +341,25 @@ func generateInfoJSON(info *Info) []byte { pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)} return bytes.Join(pcs, []byte(" ")) } + +// parallelTaskQueue starts a number of goroutines and returns a channel +// which functions can be sent to for queued parallel execution. The +// goroutines will stop running when the returned channel is closed and +// all queued tasks have completed. The passed in mp limits concurrency, +// or a value <= 0 will default to GOMAXPROCS. +func parallelTaskQueue(mp int) chan<- func() { + if rmp := runtime.GOMAXPROCS(-1); mp <= 0 { + mp = rmp + } else { + mp = max(rmp, mp) + } + tq := make(chan func(), mp) + for range mp { + go func() { + for fn := range tq { + fn() + } + }() + } + return tq +} From 98eced2899060e020a98778e9709f2482d8ffb92 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 30 Oct 2025 14:41:28 +0000 Subject: [PATCH 23/24] Add `BenchmarkJetStreamParallelStartup` Signed-off-by: Neil Twigg --- server/jetstream_benchmark_test.go | 60 ++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/server/jetstream_benchmark_test.go b/server/jetstream_benchmark_test.go index bdd263da39d..c9dd7097e0d 100644 --- a/server/jetstream_benchmark_test.go +++ b/server/jetstream_benchmark_test.go @@ -19,8 +19,13 @@ import ( "encoding/json" "fmt" "math" + "math/bits" "math/rand" + "os" + "path/filepath" + "runtime" "strconv" + "strings" "sync" "sync/atomic" "testing" @@ -2184,6 +2189,61 @@ func BenchmarkJetStreamPublishConcurrent(b *testing.B) { } } +func BenchmarkJetStreamParallelStartup(b *testing.B) { + omp := runtime.GOMAXPROCS(-1) + streams, msgs, cardinality := omp, 100_000, 10_000 + + _, s, shutdown, nc, js := startJSClusterAndConnect(b, 1) + defer shutdown() + jsc := *s.JetStreamConfig() + sd := strings.TrimSuffix(jsc.StoreDir, "/jetstream") + + b.Logf("Building %d streams with %d messages, %d subjects...", streams, msgs, cardinality) + start := time.Now() + for i := range streams { + jsStreamCreate(b, nc, &StreamConfig{ + Name: fmt.Sprintf("stream_%d", i), + Subjects: []string{fmt.Sprintf("%d.>", i)}, + Storage: FileStorage, + }) + for n := range msgs { + subj := fmt.Sprintf("%d.%d", i, n%cardinality) + _, err := js.Publish(subj, nil) + require_NoError(b, err) + } + } + b.Logf("Streams built in %s", time.Since(start)) + + bench := func(b *testing.B) { + s.shutdownJetStream() + jsc.StoreDir = sd + require_NoError(b, filepath.Walk(jsc.StoreDir, func(path string, info os.FileInfo, err error) error { + require_NoError(b, err) + if info.Mode().IsRegular() && info.Name() == "index.db" { + return os.Truncate(path, 0) + } + return nil + })) + b.ResetTimer() + s.EnableJetStream(&jsc) + } + + // Try to step down GOMAXPROCS in common CPU core counts. + mp := 1 << (bits.Len(uint(omp)) - 1) + if omp > mp { + b.Run(fmt.Sprintf("GOMAXPROCS=%d", omp), func(b *testing.B) { + bench(b) + }) + } + for ; mp >= 1; mp >>= 1 { + b.Run(fmt.Sprintf("GOMAXPROCS=%d", mp), func(b *testing.B) { + runtime.GOMAXPROCS(mp) + defer runtime.GOMAXPROCS(omp) + bench(b) + }) + } +} + // Helper function to stand up a JS-enabled single server or cluster func startJSClusterAndConnect(b *testing.B, clusterSize int) (c *cluster, s *Server, shutdown func(), nc *nats.Conn, js nats.JetStreamContext) { b.Helper() From b786da5a2135df39aaf793f72f9c9d778e465631 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 30 Oct 2025 15:21:01 +0100 Subject: [PATCH 24/24] [FIXED] Inconsistent index race condition Signed-off-by: Maurice van Veen --- server/filestore.go | 101 +++++++++++++++++++++++++++++--------------- 1 file changed, 68 insertions(+), 33 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 0a71b1a3cd8..3b99dcaad72 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -9913,12 +9913,17 @@ func timestampNormalized(t time.Time) int64 { // writeFullState will proceed to write the full meta state iff not complex and time consuming. // Since this is for quick recovery it is optional and should not block/stall normal operations. func (fs *fileStore) writeFullState() error { - return fs._writeFullState(false) + return fs._writeFullState(false, true) } -// forceWriteFullState will proceed to write the full meta state. This should only be called by stop() +// forceWriteFullState will proceed to write the full meta state. func (fs *fileStore) forceWriteFullState() error { - return fs._writeFullState(true) + return fs._writeFullState(true, true) +} + +// forceWriteFullStateLocked will proceed to write the full meta state. This should only be called by stop() +func (fs *fileStore) forceWriteFullStateLocked() error { + return fs._writeFullState(true, false) } // This will write the full binary state for the stream. @@ -9928,11 +9933,22 @@ func (fs *fileStore) forceWriteFullState() error { // 2. PSIM - Per Subject Index Map - Tracks first and last blocks with subjects present. // 3. MBs - Index, Bytes, First and Last Sequence and Timestamps, and the deleted map (avl.seqset). // 4. Last block index and hash of record inclusive to this stream state. -func (fs *fileStore) _writeFullState(force bool) error { +func (fs *fileStore) _writeFullState(force, needLock bool) error { + fsLock := func() { + if needLock { + fs.mu.Lock() + } + } + fsUnlock := func() { + if needLock { + fs.mu.Unlock() + } + } + start := time.Now() - fs.mu.Lock() + fsLock() if fs.closed || fs.dirty == 0 { - fs.mu.Unlock() + fsUnlock() return nil } @@ -9951,7 +9967,7 @@ func (fs *fileStore) _writeFullState(force bool) error { numDeleted = int((fs.state.LastSeq - fs.state.FirstSeq + 1) - fs.state.Msgs) } if numSubjects > numThreshold || numDeleted > numThreshold { - fs.mu.Unlock() + fsUnlock() return errStateTooBig } } @@ -10059,13 +10075,15 @@ func (fs *fileStore) _writeFullState(force bool) error { // Encrypt if needed. if fs.prf != nil { if err := fs.setupAEK(); err != nil { - fs.mu.Unlock() + fsUnlock() return err } nonce := make([]byte, fs.aek.NonceSize(), fs.aek.NonceSize()+len(buf)+fs.aek.Overhead()) if n, err := rand.Read(nonce); err != nil { + fsUnlock() return err } else if n != len(nonce) { + fsUnlock() return fmt.Errorf("not enough nonce bytes read (%d != %d)", n, len(nonce)) } buf = fs.aek.Seal(nonce, nonce, buf, nil) @@ -10082,13 +10100,17 @@ func (fs *fileStore) _writeFullState(force bool) error { statesEqual := trackingStatesEqual(&fs.state, &mstate) // Release lock. - fs.mu.Unlock() + fsUnlock() // Check consistency here. if !statesEqual { fs.warn("Stream state encountered internal inconsistency on write") // Rebuild our fs state from the mb state. - fs.rebuildState(nil) + if needLock { + fs.rebuildState(nil) + } else { + fs.rebuildStateLocked(nil) + } return errCorruptState } @@ -10113,14 +10135,14 @@ func (fs *fileStore) _writeFullState(force bool) error { // Update dirty if successful. if err == nil { - fs.mu.Lock() + fsLock() fs.dirty -= priorDirty - fs.mu.Unlock() + fsUnlock() } // Attempt to write both files, an error in one should not prevent the other from being written. - ttlErr := fs.writeTTLState() - schedErr := fs.writeMsgSchedulingState() + ttlErr := fs.writeTTLState(needLock) + schedErr := fs.writeMsgSchedulingState(needLock) if ttlErr != nil { return ttlErr } else if schedErr != nil { @@ -10129,30 +10151,42 @@ func (fs *fileStore) _writeFullState(force bool) error { return nil } -func (fs *fileStore) writeTTLState() error { - fs.mu.RLock() +func (fs *fileStore) writeTTLState(needLock bool) error { + if needLock { + fs.mu.RLock() + } if fs.ttls == nil { - fs.mu.RUnlock() + if needLock { + fs.mu.RUnlock() + } return nil } fn := filepath.Join(fs.fcfg.StoreDir, msgDir, ttlStreamStateFile) // Must be lseq+1 to identify up to which sequence the TTLs are valid. buf := fs.ttls.Encode(fs.state.LastSeq + 1) - fs.mu.RUnlock() + if needLock { + fs.mu.RUnlock() + } return fs.writeFileWithOptionalSync(fn, buf, defaultFilePerms) } -func (fs *fileStore) writeMsgSchedulingState() error { - fs.mu.RLock() +func (fs *fileStore) writeMsgSchedulingState(needLock bool) error { + if needLock { + fs.mu.RLock() + } if fs.scheduling == nil { - fs.mu.RUnlock() + if needLock { + fs.mu.RUnlock() + } return nil } fn := filepath.Join(fs.fcfg.StoreDir, msgDir, msgSchedulingStreamStateFile) // Must be lseq+1 to identify up to which sequence the schedules are valid. buf := fs.scheduling.encode(fs.state.LastSeq + 1) - fs.mu.RUnlock() + if needLock { + fs.mu.RUnlock() + } return fs.writeFileWithOptionalSync(fn, buf, defaultFilePerms) } @@ -10170,18 +10204,10 @@ func (fs *fileStore) stop(delete, writeState bool) error { return ErrStoreClosed } - // Mark as closing. Do before releasing the lock to writeFullState + // Mark as closing. Do before releasing the lock to wait on the state flush loop // so we don't end up with this function running more than once. fs.closing = true - if writeState { - fs.checkAndFlushLastBlock() - } - fs.closeAllMsgBlocks(false) - - fs.cancelSyncTimer() - fs.cancelAgeChk() - // Release the state flusher loop. if fs.qch != nil { close(fs.qch) @@ -10193,9 +10219,18 @@ func (fs *fileStore) stop(delete, writeState bool) error { fsld := fs.fsld fs.mu.Unlock() <-fsld - // Write full state if needed. If not dirty this is a no-op. - fs.forceWriteFullState() fs.mu.Lock() + + fs.checkAndFlushLastBlock() + } + fs.closeAllMsgBlocks(false) + + fs.cancelSyncTimer() + fs.cancelAgeChk() + + if writeState { + // Write full state if needed. If not dirty this is a no-op. + fs.forceWriteFullStateLocked() } // Mark as closed. Last message block needs to be cleared after