diff --git a/server/client.go b/server/client.go index 882e1ec31f6..a54df315b78 100644 --- a/server/client.go +++ b/server/client.go @@ -3171,6 +3171,13 @@ func (c *client) addShadowSub(sub *subscription, ime *ime, enact bool) (*subscri // Update our route map here. But only if we are not a leaf node or a hub leafnode. if c.kind != LEAF || c.isHubLeafNode() { c.srv.updateRemoteSubscription(im.acc, &nsub, 1) + } else if c.kind == LEAF { + // Update all leafnodes that connect to this server. Note that we could have + // used the updateLeafNodes() function since when it does invoke updateSmap() + // this function already takes care of not sending to a spoke leafnode since + // the `nsub` here is already from a spoke leafnode, but to be explicit, we + // use this version that updates only leafnodes that connect to this server. + im.acc.updateLeafNodesEx(&nsub, 1, true) } return &nsub, nil diff --git a/server/leafnode.go b/server/leafnode.go index cdfca04d706..bcd8b2f3c1e 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -2262,9 +2262,11 @@ func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscrip acc.updateLeafNodes(sub, delta) } -// updateLeafNodes will make sure to update the account smap for the subscription. +// updateLeafNodesEx will make sure to update the account smap for the subscription. // Will also forward to all leaf nodes as needed. -func (acc *Account) updateLeafNodes(sub *subscription, delta int32) { +// If `hubOnly` is true, then will update only leaf nodes that connect to this server +// (that is, for which this server acts as a hub to them). +func (acc *Account) updateLeafNodesEx(sub *subscription, delta int32, hubOnly bool) { if acc == nil || sub == nil { return } @@ -2312,12 +2314,19 @@ func (acc *Account) updateLeafNodes(sub *subscription, delta int32) { if ln == sub.client { continue } + ln.mu.Lock() // Don't advertise interest from leafnodes to other isolated leafnodes. if sub.client.kind == LEAF && ln.isIsolatedLeafNode() { + ln.mu.Unlock() + continue + } + // If `hubOnly` is true, it means that we want to update only leafnodes + // that connect to this server (so isHubLeafNode() would return `true`). + if hubOnly && !ln.isHubLeafNode() { + ln.mu.Unlock() continue } // Check to make sure this sub does not have an origin cluster that matches the leafnode. - ln.mu.Lock() // If skipped, make sure that we still let go the "$LDS." subscription that allows // the detection of loops as long as different cluster. clusterDifferent := cluster != ln.remoteCluster() @@ -2328,6 +2337,12 @@ func (acc *Account) updateLeafNodes(sub *subscription, delta int32) { } } +// updateLeafNodes will make sure to update the account smap for the subscription. +// Will also forward to all leaf nodes as needed. +func (acc *Account) updateLeafNodes(sub *subscription, delta int32) { + acc.updateLeafNodesEx(sub, delta, false) +} + // This will make an update to our internal smap and determine if we should send out // an interest update to the remote side. // Lock should be held. diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 5dd8a58f8a7..810df7c4cb8 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -10342,3 +10342,226 @@ func TestLeafNodeIsolatedLeafSubjectPropagation(t *testing.T) { }) } } + +func TestLeafNodeDaisyChainWithAccountImportExport(t *testing.T) { + hubConf := createConfFile(t, []byte(` + server_name: hub + listen: "127.0.0.1:-1" + + leafnodes { + listen: "127.0.0.1:-1" + } + accounts { + SYS: { + users: [{ user: s, password: s}], + }, + ODC: { + jetstream: enabled + users: [ + { + user: u, password: u, + permissions: { + publish: {deny: ["local.>","hub2leaf.>"]} + subscribe: {deny: ["local.>","leaf2leaf.>"]} + } + } + ] + } + } + `)) + hub, ohub := RunServerWithConfig(hubConf) + defer hub.Shutdown() + + storeDir := t.TempDir() + leafJSConf := createConfFile(t, fmt.Appendf(nil, ` + server_name: leaf-js + listen: "127.0.0.1:-1" + + jetstream { + store_dir="%s/leaf-js" + domain=leaf-js + } + accounts { + ODC: { + jetstream: enabled + users: [{ user: u, password: u}] + }, + } + leafnodes { + remotes [ + { + urls: ["leaf://u:u@127.0.0.1:%d"] # connects to hub + account: ODC + } + ] + } + `, storeDir, ohub.LeafNode.Port)) + leafJS, _ := RunServerWithConfig(leafJSConf) + defer leafJS.Shutdown() + + checkLeafNodeConnected(t, hub) + checkLeafNodeConnected(t, leafJS) + + otherConf := createConfFile(t, []byte(` + server_name: other + listen: "127.0.0.1:-1" + leafnodes { + listen: "127.0.0.1:-1" + } + `)) + other, oother := RunServerWithConfig(otherConf) + defer other.Shutdown() + + tmpl := ` + server_name: %s + listen: "127.0.0.1:-1" + + leafnodes { + listen: "127.0.0.1:-1" + remotes [ + { + urls: ["leaf://u:u@127.0.0.1:%d"] + account: ODC_DEV + } + { + urls: ["leaf://127.0.0.1:%d"] + account: ODC_DEV + } + ] + } + cluster { + name: "hubsh" + listen: "127.0.0.1:-1" + %s + } + accounts: { + ODC_DEV: { + users: [ + {user: o, password: o} + ] + imports: [ + {service: {account: "SH1", subject: "$JS.leaf-sh.API.>"}} + {stream: {account: "SH1", subject: "sync.leaf-sh.jspush.>"}} + ] + exports: [ + {stream: ">"} + {service: ">", response_type: "Singleton"} + ] + } + SH1: { + users: [ + {user: s, password: s} + ] + exports: [ + {service: "$JS.leaf-sh.API.>", response_type: "Stream"} + {stream: "sync.leaf-sh.jspush.>"} + ] + } + } + ` + hubSh1Conf := createConfFile(t, fmt.Appendf(nil, tmpl, "hubsh1", ohub.LeafNode.Port, oother.LeafNode.Port, _EMPTY_)) + hubSh1, ohubSh1 := RunServerWithConfig(hubSh1Conf) + defer hubSh1.Shutdown() + + hubSh2Conf := createConfFile(t, fmt.Appendf(nil, tmpl, "hubsh2", ohub.LeafNode.Port, oother.LeafNode.Port, + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", ohubSh1.Cluster.Port))) + hubSh2, ohubSh2 := RunServerWithConfig(hubSh2Conf) + defer hubSh2.Shutdown() + + checkClusterFormed(t, hubSh1, hubSh2) + + checkLeafNodeConnectedCount(t, hub, 3) + checkLeafNodeConnectedCount(t, hubSh1, 2) + checkLeafNodeConnectedCount(t, hubSh2, 2) + + leafShConf := createConfFile(t, fmt.Appendf(nil, ` + server_name: leafsh + listen: "127.0.0.1:-1" + + jetstream { + store_dir="%s/leafsh" + domain=leaf-sh + } + accounts { + SH: { + jetstream: enabled + users: [{user: u, password: u}] + } + } + leafnodes { + remotes [ + { + urls: ["leaf://s:s@127.0.0.1:%d"] + account: SH + } + ] + } + `, storeDir, ohubSh2.LeafNode.Port)) + leafSh, _ := RunServerWithConfig(leafShConf) + defer leafSh.Shutdown() + + checkLeafNodeConnectedCount(t, hubSh2, 3) + checkLeafNodeConnected(t, leafSh) + + ncLeafSh, jsLeafSh := jsClientConnect(t, leafSh, nats.UserInfo("u", "u")) + defer ncLeafSh.Close() + + sc := &nats.StreamConfig{ + Name: "leaf-sh", + Subjects: []string{"leaf2leaf.>"}, + Retention: nats.LimitsPolicy, + Storage: nats.FileStorage, + AllowRollup: true, + AllowDirect: true, + } + _, err := jsLeafSh.AddStream(sc) + require_NoError(t, err) + + ncLeafJS, jsLeafJS := jsClientConnect(t, leafJS, nats.UserInfo("u", "u")) + defer ncLeafJS.Close() + + sc = &nats.StreamConfig{ + Name: "leaf-js", + Retention: nats.LimitsPolicy, + Storage: nats.FileStorage, + AllowRollup: true, + AllowDirect: true, + Sources: []*nats.StreamSource{ + { + Name: "leaf-sh", + External: &nats.ExternalStream{ + APIPrefix: "$JS.leaf-sh.API", + DeliverPrefix: "sync.leaf-sh.jspush"}, + }, + }, + } + _, err = jsLeafJS.AddStream(sc) + require_NoError(t, err) + + for range 10 { + _, err = jsLeafSh.Publish("leaf2leaf.v1.test", []byte("hello")) + require_NoError(t, err) + } + + check := func(js nats.JetStreamContext, stream string) { + t.Helper() + checkFor(t, 2*time.Second, 50*time.Millisecond, func() error { + si, err := js.StreamInfo(stream) + if err != nil { + return err + } + if n := si.State.Msgs; n != 10 { + return fmt.Errorf("Expected 10 messages, got %v", n) + } + return nil + }) + } + check(jsLeafSh, "leaf-sh") + check(jsLeafJS, "leaf-js") + + acc := other.GlobalAccount() + acc.mu.RLock() + sr := acc.sl.ReverseMatch("sync.leaf-sh.jspush.>") + acc.mu.RUnlock() + require_Len(t, len(sr.psubs), 0) +}