Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3337,7 +3337,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool
sub.shadow = nil
if len(shadowSubs) > 0 {
isSpokeLeaf = c.isSpokeLeafNode()
updateRoute = !isSpokeLeaf && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF) && c.srv != nil
updateRoute = !isSpokeLeaf && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF || c.kind == JETSTREAM) && c.srv != nil
}
sub.close()
c.mu.Unlock()
Expand Down
108 changes: 108 additions & 0 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6702,3 +6702,111 @@ func TestJetStreamClusterDeletedNodeDoesNotReviveStreamAfterCatchup(t *testing.T
return nil
})
}

// https://github.com/nats-io/nats-server/issues/7718
func TestJetStreamClusterLeakedSubsWithStreamImportOverlappingJetStreamSubs(t *testing.T) {
tmpl := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 2GB, max_file_store: 2GB, store_dir: '%s'}

leaf {
listen: 127.0.0.1:-1
}

cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}

accounts {
ACC: {
jetstream: enabled
users: [{user: acc, password: acc}]
imports: [{stream: {account: zone, subject: ">"}}]
}
zone: {
jetstream: enabled
users: [{user: zone, password: zone}]
exports: [{stream: ">"}]
}
}
no_auth_user: acc
`
c := createJetStreamClusterWithTemplate(t, tmpl, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

checkExpectedSubs := func(expected uint32) (actual uint32) {
t.Helper()
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
e := expected
for _, s := range c.servers {
subs := s.NumSubscriptions()
if e == 0 {
e = subs
} else if e != subs {
return fmt.Errorf("expected %d subs, got %d", e, subs)
}
}
actual = e
return nil
})
return actual
}

// Track subscription counts between stream/consumer create/deletes.
var baseline, sc, cc uint32

// Perform a couple iterations to check we get to predictable subscription counts.
for range 3 {
// Zero means we don't know the expected count, but still ALL servers must equal.
initial := checkExpectedSubs(0)

// If we've iterated once, we'll know the baseline. Each next iteration must be equal to this.
if baseline != 0 {
require_Equal(t, baseline, initial)
}

// Add the stream.
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 1,
Storage: nats.FileStorage,
})
require_NoError(t, err)
sl := c.streamLeader("ACC", "TEST")
require_NotNil(t, sl)
afterStreamCreate := checkExpectedSubs(sl.NumSubscriptions())
if sc == 0 {
sc = afterStreamCreate
}
require_Equal(t, sc, afterStreamCreate)

// Add the consumer.
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"})
require_NoError(t, err)
afterConsumerCreate := checkExpectedSubs(sl.NumSubscriptions())
if cc == 0 {
cc = afterConsumerCreate
}
require_Equal(t, cc, afterConsumerCreate)

// Delete the consumer, the subscriptions should drop down to what they were after the stream was created.
require_NoError(t, js.DeleteConsumer("TEST", "CONSUMER"))
afterConsumerDelete := checkExpectedSubs(sl.NumSubscriptions())
require_Equal(t, afterStreamCreate, afterConsumerDelete)

// Deleting the stream should drop the subscriptions back to the baseline.
require_NoError(t, js.DeleteStream("TEST"))
afterStreamDelete := checkExpectedSubs(sl.NumSubscriptions())
if baseline == 0 {
baseline = afterStreamDelete
}
require_Equal(t, baseline, afterStreamDelete)
}
}