Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3171,6 +3171,13 @@ func (c *client) addShadowSub(sub *subscription, ime *ime, enact bool) (*subscri
// Update our route map here. But only if we are not a leaf node or a hub leafnode.
if c.kind != LEAF || c.isHubLeafNode() {
c.srv.updateRemoteSubscription(im.acc, &nsub, 1)
} else if c.kind == LEAF {
// Update all leafnodes that connect to this server. Note that we could have
// used the updateLeafNodes() function since when it does invoke updateSmap()
// this function already takes care of not sending to a spoke leafnode since
// the `nsub` here is already from a spoke leafnode, but to be explicit, we
// use this version that updates only leafnodes that connect to this server.
im.acc.updateLeafNodesEx(&nsub, 1, true)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@derekcollison I am pretty sure that - as of now, and with the test case I have added - we would not need a new function, but decided after much thinking, that I would have this version that excludes some type of leafnode connections. Let me know if you want me to remove that and simply use updateLeafNodes().

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree

}

return &nsub, nil
Expand Down
21 changes: 18 additions & 3 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -2262,9 +2262,11 @@ func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscrip
acc.updateLeafNodes(sub, delta)
}

// updateLeafNodes will make sure to update the account smap for the subscription.
// updateLeafNodesEx will make sure to update the account smap for the subscription.
// Will also forward to all leaf nodes as needed.
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
// If `hubOnly` is true, then will update only leaf nodes that connect to this server
// (that is, for which this server acts as a hub to them).
func (acc *Account) updateLeafNodesEx(sub *subscription, delta int32, hubOnly bool) {
if acc == nil || sub == nil {
return
}
Expand Down Expand Up @@ -2312,12 +2314,19 @@ func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
if ln == sub.client {
continue
}
ln.mu.Lock()
// Don't advertise interest from leafnodes to other isolated leafnodes.
if sub.client.kind == LEAF && ln.isIsolatedLeafNode() {
ln.mu.Unlock()
continue
}
// If `hubOnly` is true, it means that we want to update only leafnodes
// that connect to this server (so isHubLeafNode() would return `true`).
if hubOnly && !ln.isHubLeafNode() {
ln.mu.Unlock()
continue
}
// Check to make sure this sub does not have an origin cluster that matches the leafnode.
ln.mu.Lock()
// If skipped, make sure that we still let go the "$LDS." subscription that allows
// the detection of loops as long as different cluster.
clusterDifferent := cluster != ln.remoteCluster()
Expand All @@ -2328,6 +2337,12 @@ func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
}
}

// updateLeafNodes will make sure to update the account smap for the subscription.
// Will also forward to all leaf nodes as needed.
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
acc.updateLeafNodesEx(sub, delta, false)
}

// This will make an update to our internal smap and determine if we should send out
// an interest update to the remote side.
// Lock should be held.
Expand Down
223 changes: 223 additions & 0 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10342,3 +10342,226 @@ func TestLeafNodeIsolatedLeafSubjectPropagation(t *testing.T) {
})
}
}

func TestLeafNodeDaisyChainWithAccountImportExport(t *testing.T) {
hubConf := createConfFile(t, []byte(`
server_name: hub
listen: "127.0.0.1:-1"

leafnodes {
listen: "127.0.0.1:-1"
}
accounts {
SYS: {
users: [{ user: s, password: s}],
},
ODC: {
jetstream: enabled
users: [
{
user: u, password: u,
permissions: {
publish: {deny: ["local.>","hub2leaf.>"]}
subscribe: {deny: ["local.>","leaf2leaf.>"]}
}
}
]
}
}
`))
hub, ohub := RunServerWithConfig(hubConf)
defer hub.Shutdown()

storeDir := t.TempDir()
leafJSConf := createConfFile(t, fmt.Appendf(nil, `
server_name: leaf-js
listen: "127.0.0.1:-1"

jetstream {
store_dir="%s/leaf-js"
domain=leaf-js
}
accounts {
ODC: {
jetstream: enabled
users: [{ user: u, password: u}]
},
}
leafnodes {
remotes [
{
urls: ["leaf://u:u@127.0.0.1:%d"] # connects to hub
account: ODC
}
]
}
`, storeDir, ohub.LeafNode.Port))
leafJS, _ := RunServerWithConfig(leafJSConf)
defer leafJS.Shutdown()

checkLeafNodeConnected(t, hub)
checkLeafNodeConnected(t, leafJS)

otherConf := createConfFile(t, []byte(`
server_name: other
listen: "127.0.0.1:-1"
leafnodes {
listen: "127.0.0.1:-1"
}
`))
other, oother := RunServerWithConfig(otherConf)
defer other.Shutdown()

tmpl := `
server_name: %s
listen: "127.0.0.1:-1"

leafnodes {
listen: "127.0.0.1:-1"
remotes [
{
urls: ["leaf://u:u@127.0.0.1:%d"]
account: ODC_DEV
}
{
urls: ["leaf://127.0.0.1:%d"]
account: ODC_DEV
}
]
}
cluster {
name: "hubsh"
listen: "127.0.0.1:-1"
%s
}
accounts: {
ODC_DEV: {
users: [
{user: o, password: o}
]
imports: [
{service: {account: "SH1", subject: "$JS.leaf-sh.API.>"}}
{stream: {account: "SH1", subject: "sync.leaf-sh.jspush.>"}}
]
exports: [
{stream: ">"}
{service: ">", response_type: "Singleton"}
]
}
SH1: {
users: [
{user: s, password: s}
]
exports: [
{service: "$JS.leaf-sh.API.>", response_type: "Stream"}
{stream: "sync.leaf-sh.jspush.>"}
]
}
}
`
hubSh1Conf := createConfFile(t, fmt.Appendf(nil, tmpl, "hubsh1", ohub.LeafNode.Port, oother.LeafNode.Port, _EMPTY_))
hubSh1, ohubSh1 := RunServerWithConfig(hubSh1Conf)
defer hubSh1.Shutdown()

hubSh2Conf := createConfFile(t, fmt.Appendf(nil, tmpl, "hubsh2", ohub.LeafNode.Port, oother.LeafNode.Port,
fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", ohubSh1.Cluster.Port)))
hubSh2, ohubSh2 := RunServerWithConfig(hubSh2Conf)
defer hubSh2.Shutdown()

checkClusterFormed(t, hubSh1, hubSh2)

checkLeafNodeConnectedCount(t, hub, 3)
checkLeafNodeConnectedCount(t, hubSh1, 2)
checkLeafNodeConnectedCount(t, hubSh2, 2)

leafShConf := createConfFile(t, fmt.Appendf(nil, `
server_name: leafsh
listen: "127.0.0.1:-1"

jetstream {
store_dir="%s/leafsh"
domain=leaf-sh
}
accounts {
SH: {
jetstream: enabled
users: [{user: u, password: u}]
}
}
leafnodes {
remotes [
{
urls: ["leaf://s:s@127.0.0.1:%d"]
account: SH
}
]
}
`, storeDir, ohubSh2.LeafNode.Port))
leafSh, _ := RunServerWithConfig(leafShConf)
defer leafSh.Shutdown()

checkLeafNodeConnectedCount(t, hubSh2, 3)
checkLeafNodeConnected(t, leafSh)

ncLeafSh, jsLeafSh := jsClientConnect(t, leafSh, nats.UserInfo("u", "u"))
defer ncLeafSh.Close()

sc := &nats.StreamConfig{
Name: "leaf-sh",
Subjects: []string{"leaf2leaf.>"},
Retention: nats.LimitsPolicy,
Storage: nats.FileStorage,
AllowRollup: true,
AllowDirect: true,
}
_, err := jsLeafSh.AddStream(sc)
require_NoError(t, err)

ncLeafJS, jsLeafJS := jsClientConnect(t, leafJS, nats.UserInfo("u", "u"))
defer ncLeafJS.Close()

sc = &nats.StreamConfig{
Name: "leaf-js",
Retention: nats.LimitsPolicy,
Storage: nats.FileStorage,
AllowRollup: true,
AllowDirect: true,
Sources: []*nats.StreamSource{
{
Name: "leaf-sh",
External: &nats.ExternalStream{
APIPrefix: "$JS.leaf-sh.API",
DeliverPrefix: "sync.leaf-sh.jspush"},
},
},
}
_, err = jsLeafJS.AddStream(sc)
require_NoError(t, err)

for range 10 {
_, err = jsLeafSh.Publish("leaf2leaf.v1.test", []byte("hello"))
require_NoError(t, err)
}

check := func(js nats.JetStreamContext, stream string) {
t.Helper()
checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
si, err := js.StreamInfo(stream)
if err != nil {
return err
}
if n := si.State.Msgs; n != 10 {
return fmt.Errorf("Expected 10 messages, got %v", n)
}
return nil
})
}
check(jsLeafSh, "leaf-sh")
check(jsLeafJS, "leaf-js")

acc := other.GlobalAccount()
acc.mu.RLock()
sr := acc.sl.ReverseMatch("sync.leaf-sh.jspush.>")
acc.mu.RUnlock()
require_Len(t, len(sr.psubs), 0)
}