Skip to content
Merged
1 change: 1 addition & 0 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func (a *Account) shallowCopy(na *Account) {
na.Nkey = a.Nkey
na.Issuer = a.Issuer
na.traceDest, na.traceDestSampling = a.traceDest, a.traceDestSampling
na.nrgAccount = a.nrgAccount

if a.imports.streams != nil {
na.imports.streams = make([]*streamImport, 0, len(a.imports.streams))
Expand Down
2 changes: 1 addition & 1 deletion server/auth_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2012-2024 The NATS Authors
// Copyright 2012-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down
2 changes: 1 addition & 1 deletion server/certidp/ocsp_responder.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023-2024 The NATS Authors
// Copyright 2023-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down
2 changes: 1 addition & 1 deletion server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3265,7 +3265,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool
sub.shadow = nil
if len(shadowSubs) > 0 {
isSpokeLeaf = c.isSpokeLeafNode()
updateRoute = !isSpokeLeaf && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF) && c.srv != nil
updateRoute = !isSpokeLeaf && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF || c.kind == JETSTREAM) && c.srv != nil
}
sub.close()
c.mu.Unlock()
Expand Down
6 changes: 4 additions & 2 deletions server/consumer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2025 The NATS Authors
// Copyright 2019-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -941,21 +941,23 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}
if cName != _EMPTY_ {
if eo, ok := mset.consumers[cName]; ok {
mset.mu.Unlock()
if action == ActionCreate {
ocfg := eo.config()
copyConsumerMetadata(config, &ocfg)
if !reflect.DeepEqual(config, &ocfg) {
mset.mu.Unlock()
return nil, NewJSConsumerAlreadyExistsError()
}
}
// Check for overlapping subjects if we are a workqueue
if cfg.Retention == WorkQueuePolicy {
subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects)
if !mset.partitionUnique(cName, subjects) {
mset.mu.Unlock()
return nil, NewJSConsumerWQConsumerNotUniqueError()
}
}
mset.mu.Unlock()
err := eo.updateConfig(config)
if err == nil {
return eo, nil
Expand Down
7 changes: 5 additions & 2 deletions server/filestore.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2025 The NATS Authors
// Copyright 2019-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -2182,7 +2182,7 @@ func (fs *fileStore) recoverMsgs() error {
if fs.ld != nil {
var emptyBlks []*msgBlock
for _, mb := range fs.blks {
if mb.msgs == 0 && mb.rbytes == 0 {
if mb.msgs == 0 && mb.rbytes == 0 && mb != fs.lmb {
emptyBlks = append(emptyBlks, mb)
}
}
Expand Down Expand Up @@ -9457,6 +9457,9 @@ func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) {
func (fs *fileStore) resetGlobalPerSubjectInfo() {
// Clear any global subject state.
fs.psim, fs.tsl = fs.psim.Empty(), 0
if fs.noTrackSubjects() {
return
}
for _, mb := range fs.blks {
fs.populateGlobalPerSubjectInfo(mb)
}
Expand Down
19 changes: 19 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11464,3 +11464,22 @@ func TestFileStoreCompactFullyResetsFirstAndLastSeq(t *testing.T) {
checkMbState(1, 0, 0)
})
}

func TestFileStoreDoesntRebuildSubjectStateWithNoTrack(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil)
require_NoError(t, err)
defer fs.Stop()

_, _, err = fs.StoreMsg("foo", nil, nil, 0)
require_NoError(t, err)

// This implicitly was calling resetGlobalPerSubjectInfo and
// populating "foo" back into the psim.
require_NoError(t, fs.Truncate(1))

fs.mu.Lock()
defer fs.mu.Unlock()
require_Equal(t, fs.psim.Size(), 0)
})
}
4 changes: 3 additions & 1 deletion server/gsl/gsl.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ func matchLevelForAny[T comparable](l *level[T], toks []string, np *int) bool {
if np != nil {
*np += len(n.subs)
}
return len(n.subs) > 0
if len(n.subs) > 0 {
return true
}
}
if pwc != nil {
if np != nil {
Expand Down
8 changes: 8 additions & 0 deletions server/gsl/gsl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ func TestGenericSublistHasInterest(t *testing.T) {
require_NoError(t, s.Remove("*", 66))
}

func TestGenericSublistHasInterestOverlapping(t *testing.T) {
s := NewSublist[int]()
require_NoError(t, s.Insert("stream.A.child", 11))
require_NoError(t, s.Insert("stream.*", 11))
require_True(t, s.HasInterest("stream.A.child"))
require_True(t, s.HasInterest("stream.A"))
}

func TestGenericSublistNumInterest(t *testing.T) {
s := NewSublist[int]()
require_NoError(t, s.Insert("foo", 11))
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2025 The NATS Authors
// Copyright 2019-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_api.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020-2025 The NATS Authors
// Copyright 2020-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down
108 changes: 108 additions & 0 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6604,3 +6604,111 @@ func TestJetStreamClusterDeletedNodeDoesNotReviveStreamAfterCatchup(t *testing.T
return nil
})
}

// https://github.com/nats-io/nats-server/issues/7718
func TestJetStreamClusterLeakedSubsWithStreamImportOverlappingJetStreamSubs(t *testing.T) {
tmpl := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 2GB, max_file_store: 2GB, store_dir: '%s'}

leaf {
listen: 127.0.0.1:-1
}

cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}

accounts {
ACC: {
jetstream: enabled
users: [{user: acc, password: acc}]
imports: [{stream: {account: zone, subject: ">"}}]
}
zone: {
jetstream: enabled
users: [{user: zone, password: zone}]
exports: [{stream: ">"}]
}
}
no_auth_user: acc
`
c := createJetStreamClusterWithTemplate(t, tmpl, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

checkExpectedSubs := func(expected uint32) (actual uint32) {
t.Helper()
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
e := expected
for _, s := range c.servers {
subs := s.NumSubscriptions()
if e == 0 {
e = subs
} else if e != subs {
return fmt.Errorf("expected %d subs, got %d", e, subs)
}
}
actual = e
return nil
})
return actual
}

// Track subscription counts between stream/consumer create/deletes.
var baseline, sc, cc uint32

// Perform a couple iterations to check we get to predictable subscription counts.
for range 3 {
// Zero means we don't know the expected count, but still ALL servers must equal.
initial := checkExpectedSubs(0)

// If we've iterated once, we'll know the baseline. Each next iteration must be equal to this.
if baseline != 0 {
require_Equal(t, baseline, initial)
}

// Add the stream.
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 1,
Storage: nats.FileStorage,
})
require_NoError(t, err)
sl := c.streamLeader("ACC", "TEST")
require_NotNil(t, sl)
afterStreamCreate := checkExpectedSubs(sl.NumSubscriptions())
if sc == 0 {
sc = afterStreamCreate
}
require_Equal(t, sc, afterStreamCreate)

// Add the consumer.
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"})
require_NoError(t, err)
afterConsumerCreate := checkExpectedSubs(sl.NumSubscriptions())
if cc == 0 {
cc = afterConsumerCreate
}
require_Equal(t, cc, afterConsumerCreate)

// Delete the consumer, the subscriptions should drop down to what they were after the stream was created.
require_NoError(t, js.DeleteConsumer("TEST", "CONSUMER"))
afterConsumerDelete := checkExpectedSubs(sl.NumSubscriptions())
require_Equal(t, afterStreamCreate, afterConsumerDelete)

// Deleting the stream should drop the subscriptions back to the baseline.
require_NoError(t, js.DeleteStream("TEST"))
afterStreamDelete := checkExpectedSubs(sl.NumSubscriptions())
if baseline == 0 {
baseline = afterStreamDelete
}
require_Equal(t, baseline, afterStreamDelete)
}
}
5 changes: 5 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2140,6 +2140,7 @@ func TestJetStreamClusterAccountNRGConfigNoPanic(t *testing.T) {

accounts {
ONE { jetstream: { cluster_traffic: system } }
TWO { jetstream: { cluster_traffic: owner } }
}
`

Expand All @@ -2150,6 +2151,10 @@ func TestJetStreamClusterAccountNRGConfigNoPanic(t *testing.T) {
acc, err := s.lookupAccount("ONE")
require_NoError(t, err)
require_Equal(t, acc.nrgAccount, _EMPTY_) // Empty for the system account

acc, err = s.lookupAccount("TWO")
require_NoError(t, err)
require_Equal(t, acc.nrgAccount, "TWO")
}
}

Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2025 The NATS Authors
// Copyright 2022-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down
51 changes: 50 additions & 1 deletion server/jetstream_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2025 The NATS Authors
// Copyright 2019-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -21006,3 +21006,52 @@ func TestJetStreamServerEncryptionRecoveryWithoutStreamStateFile(t *testing.T) {
})
}
}

func TestJetStreamFileStoreErrorOpeningBlockAfterTruncate(t *testing.T) {
storeDir := t.TempDir()
conf := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:-1
jetstream: {store_dir: %q}
`, storeDir)))

s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
require_NoError(t, err)

pubAck, err := js.Publish("foo", nil)
require_NoError(t, err)
require_Equal(t, pubAck.Sequence, 1)

// Shut down the server and manually truncate the message blocks to be entirely empty, simulating data loss.
mset, err := s.globalAccount().lookupStream("TEST")
require_NoError(t, err)
fs := mset.store.(*fileStore)
blk := filepath.Join(fs.fcfg.StoreDir, msgDir, "1.blk")
index := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)
nc.Close()
s.Shutdown()

// Truncate the block such that it isn't fully empty, but doesn't contain any messages.
require_NoError(t, os.Truncate(blk, 1))
require_NoError(t, os.Remove(index))

// Restart the server and reconnect.
s, _ = RunServerWithConfig(conf)
defer s.Shutdown()
nc, js = jsClientConnect(t, s)
defer nc.Close()

// Publish another message. Due to the simulated data loss, the stream sequence should continue
// counting after truncating the corrupted data.
pubAck, err = js.Publish("foo", nil)
require_NoError(t, err)
require_Equal(t, pubAck.Sequence, 1)
}
2 changes: 1 addition & 1 deletion server/jetstream_versioning_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2024-2025 The NATS Authors
// Copyright 2024-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down
Loading