diff --git a/server/client.go b/server/client.go index 135df35827a..29fe28aaf68 100644 --- a/server/client.go +++ b/server/client.go @@ -3337,7 +3337,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool sub.shadow = nil if len(shadowSubs) > 0 { isSpokeLeaf = c.isSpokeLeafNode() - updateRoute = !isSpokeLeaf && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF) && c.srv != nil + updateRoute = !isSpokeLeaf && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF || c.kind == JETSTREAM) && c.srv != nil } sub.close() c.mu.Unlock() diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index d106c14210c..976ae1f4835 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -6702,3 +6702,111 @@ func TestJetStreamClusterDeletedNodeDoesNotReviveStreamAfterCatchup(t *testing.T return nil }) } + +// https://github.com/nats-io/nats-server/issues/7718 +func TestJetStreamClusterLeakedSubsWithStreamImportOverlappingJetStreamSubs(t *testing.T) { + tmpl := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 2GB, max_file_store: 2GB, store_dir: '%s'} + + leaf { + listen: 127.0.0.1:-1 + } + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + accounts { + ACC: { + jetstream: enabled + users: [{user: acc, password: acc}] + imports: [{stream: {account: zone, subject: ">"}}] + } + zone: { + jetstream: enabled + users: [{user: zone, password: zone}] + exports: [{stream: ">"}] + } + } + no_auth_user: acc +` + c := createJetStreamClusterWithTemplate(t, tmpl, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + checkExpectedSubs := func(expected uint32) (actual uint32) { + t.Helper() + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + e := expected + for _, s := range c.servers { + subs := s.NumSubscriptions() + if e == 0 { + e = subs + } else if e != subs { + return fmt.Errorf("expected %d subs, got %d", e, subs) + } + } + actual = e + return nil + }) + return actual + } + + // Track subscription counts between stream/consumer create/deletes. + var baseline, sc, cc uint32 + + // Perform a couple iterations to check we get to predictable subscription counts. + for range 3 { + // Zero means we don't know the expected count, but still ALL servers must equal. + initial := checkExpectedSubs(0) + + // If we've iterated once, we'll know the baseline. Each next iteration must be equal to this. + if baseline != 0 { + require_Equal(t, baseline, initial) + } + + // Add the stream. + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 1, + Storage: nats.FileStorage, + }) + require_NoError(t, err) + sl := c.streamLeader("ACC", "TEST") + require_NotNil(t, sl) + afterStreamCreate := checkExpectedSubs(sl.NumSubscriptions()) + if sc == 0 { + sc = afterStreamCreate + } + require_Equal(t, sc, afterStreamCreate) + + // Add the consumer. + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"}) + require_NoError(t, err) + afterConsumerCreate := checkExpectedSubs(sl.NumSubscriptions()) + if cc == 0 { + cc = afterConsumerCreate + } + require_Equal(t, cc, afterConsumerCreate) + + // Delete the consumer, the subscriptions should drop down to what they were after the stream was created. + require_NoError(t, js.DeleteConsumer("TEST", "CONSUMER")) + afterConsumerDelete := checkExpectedSubs(sl.NumSubscriptions()) + require_Equal(t, afterStreamCreate, afterConsumerDelete) + + // Deleting the stream should drop the subscriptions back to the baseline. + require_NoError(t, js.DeleteStream("TEST")) + afterStreamDelete := checkExpectedSubs(sl.NumSubscriptions()) + if baseline == 0 { + baseline = afterStreamDelete + } + require_Equal(t, baseline, afterStreamDelete) + } +}