diff --git a/server/client.go b/server/client.go index a54df315b78..c11cc82f0ef 100644 --- a/server/client.go +++ b/server/client.go @@ -3286,14 +3286,12 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool // Check to see if we have shadow subscriptions. var updateRoute bool - var updateGWs bool + var isSpokeLeaf bool shadowSubs := sub.shadow sub.shadow = nil if len(shadowSubs) > 0 { - updateRoute = (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF) && c.srv != nil - if updateRoute { - updateGWs = c.srv.gateway.enabled - } + isSpokeLeaf = c.isSpokeLeafNode() + updateRoute = !isSpokeLeaf && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF) && c.srv != nil } sub.close() c.mu.Unlock() @@ -3302,16 +3300,12 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool for _, nsub := range shadowSubs { if err := nsub.im.acc.sl.Remove(nsub); err != nil { c.Debugf("Could not remove shadow import subscription for account %q", nsub.im.acc.Name) - } else { - if updateRoute { - c.srv.updateRouteSubscriptionMap(nsub.im.acc, nsub, -1) - } - if updateGWs { - c.srv.gatewayUpdateSubInterest(nsub.im.acc.Name, nsub, -1) - } } - // Now check on leafnode updates. - nsub.im.acc.updateLeafNodes(nsub, -1) + if updateRoute { + c.srv.updateRemoteSubscription(nsub.im.acc, nsub, -1) + } else if isSpokeLeaf { + nsub.im.acc.updateLeafNodesEx(nsub, -1, true) + } } // Now check to see if this was part of a respMap entry for service imports. diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 810df7c4cb8..329d243ef8e 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -9712,30 +9712,48 @@ func TestLeafNodeDupeDeliveryQueueSubAndPlainSub(t *testing.T) { // Create plain subscriber on server B attached to system-b account. ncB := natsConnect(t, srvB.ClientURL(), nats.UserInfo("sb", "sb")) defer ncB.Close() - sub, err := ncB.SubscribeSync("*.system-a.events.>") - require_NoError(t, err) - // Create a new sub that has a queue group as well. - subq, err := ncB.QueueSubscribeSync("*.system-a.events.objectnotfound", "SYSB") - require_NoError(t, err) - ncB.Flush() + sub := natsSubSync(t, ncB, "*.system-a.events.>") + subq := natsQueueSubSync(t, ncB, "*.system-a.events.objectnotfound", "SBQ") + natsFlush(t, ncB) + + // Create a subscription on SA1 (we will send from SA0). We want to make sure that + // when subscription on B is removed, this does not affect the subject interest + // in SA0 on behalf of SA1. + ncSAA1 := natsConnect(t, srvA1.ClientURL(), nats.UserInfo("sa", "sa")) + defer ncSAA1.Close() + sub2 := natsSubSync(t, ncSAA1, "*.system-a.events.>") + subq2 := natsQueueSubSync(t, ncSAA1, "*.system-a.events.objectnotfound", "SBQ") + natsFlush(t, ncSAA1) + time.Sleep(250 * time.Millisecond) - // Connect to cluster A + // Connect to cluster A on SA0. ncA := natsConnect(t, srvA0.ClientURL(), nats.UserInfo("t", "t")) defer ncA.Close() - err = ncA.Publish("system-a.events.objectnotfound", []byte("EventA")) - require_NoError(t, err) - ncA.Flush() - // Wait for them to be received. + natsPub(t, ncA, "system-a.events.objectnotfound", []byte("EventA")) + natsFlush(t, ncA) + + natsNexMsg(t, sub, time.Second) + natsNexMsg(t, sub2, time.Second) + if _, err := subq.NextMsg(250 * time.Millisecond); err != nil { + natsNexMsg(t, subq2, time.Second) + } + + // Unsubscribe the subscriptions from server B. + natsUnsub(t, sub) + natsUnsub(t, subq) + natsFlush(t, ncB) + + // Wait for subject propagation. time.Sleep(250 * time.Millisecond) - n, _, err := sub.Pending() - require_NoError(t, err) - require_Equal(t, n, 1) - n, _, err = subq.Pending() - require_NoError(t, err) - require_Equal(t, n, 1) + // Publish again, subscriptions on SA1 should receive it. + natsPub(t, ncA, "system-a.events.objectnotfound", []byte("EventA")) + natsFlush(t, ncA) + + natsNexMsg(t, sub2, time.Second) + natsNexMsg(t, subq2, time.Second) } func TestLeafNodeServerKickClient(t *testing.T) {