diff --git a/server/client.go b/server/client.go index 97cfc179d34..0e974512e23 100644 --- a/server/client.go +++ b/server/client.go @@ -2995,8 +2995,10 @@ func (c *client) addShadowSub(sub *subscription, ime *ime, enact bool) (*subscri return nil, fmt.Errorf(errs) } - // Update our route map here. - c.srv.updateRemoteSubscription(im.acc, &nsub, 1) + // Update our route map here. But only if we are not a leaf node or a hub leafnode. + if c.kind != LEAF || c.isHubLeafNode() { + c.srv.updateRemoteSubscription(im.acc, &nsub, 1) + } return &nsub, nil } diff --git a/server/leafnode_test.go b/server/leafnode_test.go index c46067ed129..a1e5de0d5f5 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -7751,3 +7751,111 @@ func TestLeafNodeDetectsStaleConnectionIfNoInfo(t *testing.T) { }) } } + +// https://github.com/nats-io/nats-server/issues/5473 +func TestLeafNodeDupeDeliveryQueueSubAndPlainSub(t *testing.T) { + clusterCommonConf := ` + accounts: { + tenant: { + users: [ { user:t, password: t } ] + exports: [{stream: system-a.events.>}] + } + system-a: { + users: [ { user:sa, password: sa } ] + imports: [ + {stream: {subject: system-a.events.>, account: tenant}, prefix: tenant} + ] + } + $SYS: { users = [ {user: "s", password: "s"} ] } + } + leafnodes { + remotes: [{ + urls: [ "nats-leaf://sa:sa@127.0.0.1:17422" ] + account: system-a + }] + }` + + confCluster0 := createConfFile(t, []byte(fmt.Sprintf(` + server_name: a-0 + port: -1 + cluster: { + name: cluster-a + listen: 127.0.0.1:16122 + routes = [ nats://127.0.0.1:16123 ] + pool_size: -1 + } + %s`, clusterCommonConf))) + + confCluster1 := createConfFile(t, []byte(fmt.Sprintf(` + server_name: a-1 + port: -1 + cluster: { + name: cluster-a + listen: 127.0.0.1:16123 + routes = [ nats://127.0.0.1:16122 ] + pool_size: -1 + } + %s`, clusterCommonConf))) + + serverB := createConfFile(t, []byte(` + server_name: b + port: -1 + leafnodes: { port: 17422 } + accounts: { + system-a: { + users: [ { user: sa, password: sa } ] + exports: [{stream: *.system-a.>}] + } + system-b: { + users: [ { user: sb, password: sb } ] + imports: [ {stream: {subject: *.system-a.>, account: system-a }}] + } + $SYS: { users = [ {user: "s", password: "s"} ] } + }`)) + + // Start server B + srvB, _ := RunServerWithConfig(serverB) + defer srvB.Shutdown() + + // Start the cluster servers. + srvA0, _ := RunServerWithConfig(confCluster0) + defer srvA0.Shutdown() + // Make sure this is connected first before starting the second server in cluster A. + checkLeafNodeConnectedCount(t, srvB, 1) + // Start second A server. + srvA1, _ := RunServerWithConfig(confCluster1) + defer srvA1.Shutdown() + // Make sure they are routed together. + checkNumRoutes(t, srvA0, 1) + checkNumRoutes(t, srvA1, 1) + // Make sure each cluster server is connected to server B. + checkLeafNodeConnectedCount(t, srvB, 2) + + // Create plain subscriber on server B attached to system-b account. + ncB := natsConnect(t, srvB.ClientURL(), nats.UserInfo("sb", "sb")) + defer ncB.Close() + sub, err := ncB.SubscribeSync("*.system-a.events.>") + require_NoError(t, err) + // Create a new sub that has a queue group as well. + subq, err := ncB.QueueSubscribeSync("*.system-a.events.objectnotfound", "SYSB") + require_NoError(t, err) + ncB.Flush() + time.Sleep(250 * time.Millisecond) + + // Connect to cluster A + ncA := natsConnect(t, srvA0.ClientURL(), nats.UserInfo("t", "t")) + defer ncA.Close() + + err = ncA.Publish("system-a.events.objectnotfound", []byte("EventA")) + require_NoError(t, err) + ncA.Flush() + // Wait for them to be received. + time.Sleep(250 * time.Millisecond) + + n, _, err := sub.Pending() + require_NoError(t, err) + require_Equal(t, n, 1) + n, _, err = subq.Pending() + require_NoError(t, err) + require_Equal(t, n, 1) +}