Skip to content

Commit b6c1164

Browse files
committed
Improve leafnode subject interest isolation
It is now possible to configure leafnode subject interest isolation in three ways: 1. For all leafnode connections on the node using the top-level `isolate_leafnode_interest` or `isolate` option in the leafnode block 2. Isolating specific remotes from east-west interest originating locally using the `isolate_leafnode_interest` or `isolate` option in the `remotes` config 3. Asking the remote side to isolate us from east-west interest originating remotely using the `request_isolation` option in the `remotes` config Signed-off-by: Neil Twigg <[email protected]>
1 parent d76fa91 commit b6c1164

File tree

3 files changed

+237
-2
lines changed

3 files changed

+237
-2
lines changed

server/leafnode.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@ func (c *client) isHubLeafNode() bool {
133133
}
134134

135135
func (c *client) isIsolatedLeafNode() bool {
136+
// TODO(nat): In future we may want to pass in and consider an isolation
137+
// group name here, which the hub and/or leaf could provide, so that we
138+
// can isolate away certain LNs but not others on an opt-in basis. For
139+
// now we will just isolate all LN interest until then.
136140
return c.kind == LEAF && c.leaf.isolated
137141
}
138142

@@ -826,6 +830,7 @@ func (c *client) sendLeafConnect(clusterName string, headers bool) error {
826830
Compression: c.leaf.compression,
827831
RemoteAccount: c.acc.GetName(),
828832
Proto: c.srv.getServerProto(),
833+
Isolate: c.leaf.remote.RequestIsolation,
829834
}
830835

831836
// If a signature callback is specified, this takes precedence over anything else.
@@ -994,7 +999,9 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
994999

9951000
// If the leafnode subject interest should be isolated, flag it here.
9961001
s.optsMu.RLock()
997-
c.leaf.isolated = s.opts.LeafNode.IsolateLeafnodeInterest
1002+
if c.leaf.isolated = s.opts.LeafNode.IsolateLeafnodeInterest; !c.leaf.isolated && remote != nil {
1003+
c.leaf.isolated = remote.LocalIsolation
1004+
}
9981005
s.optsMu.RUnlock()
9991006

10001007
// For accepted LN connections, ws will be != nil if it was accepted
@@ -1844,6 +1851,7 @@ type leafConnectInfo struct {
18441851
Headers bool `json:"headers,omitempty"`
18451852
JetStream bool `json:"jetstream,omitempty"`
18461853
DenyPub []string `json:"deny_pub,omitempty"`
1854+
Isolate bool `json:"isolate,omitempty"`
18471855

18481856
// There was an existing field called:
18491857
// >> Comp bool `json:"compression,omitempty"`
@@ -1954,6 +1962,8 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
19541962
c.leaf.remoteServer = proto.Name
19551963
// Remember the remote account name
19561964
c.leaf.remoteAccName = proto.RemoteAccount
1965+
// Remember if the leafnode requested isolation.
1966+
c.leaf.isolated = c.leaf.isolated || proto.Isolate
19571967

19581968
// If the other side has declared itself a hub, so we will take on the spoke role.
19591969
if proto.Hub {

server/leafnode_test.go

Lines changed: 216 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10277,7 +10277,7 @@ func TestLeafNodesDisableRemote(t *testing.T) {
1027710277
require_Equal(t, string(msg.Data), "hello4")
1027810278
}
1027910279

10280-
func TestLeafNodeIsolatedLeafSubjectPropagation(t *testing.T) {
10280+
func TestLeafNodeIsolatedLeafSubjectPropagationGlobal(t *testing.T) {
1028110281
for tname, isolated := range map[string]bool{
1028210282
"Isolated": true,
1028310283
"Normal": false,
@@ -10326,6 +10326,221 @@ func TestLeafNodeIsolatedLeafSubjectPropagation(t *testing.T) {
1032610326
checkLeafNodeConnectedCount(t, sp1, 1)
1032710327
checkLeafNodeConnectedCount(t, sp2, 1)
1032810328

10329+
// We expect that the hub side will have answered the request from the spoke for
10330+
// isolation if needed, but the spokes themselves will not themselves isolate
10331+
// subscription interest in the other direction unless also configured to do so.
10332+
for _, c := range sh.leafs {
10333+
require_Equal(t, c.leaf.isolated, isolated)
10334+
}
10335+
for _, c := range sp1.leafs {
10336+
require_False(t, c.leaf.isolated)
10337+
}
10338+
for _, c := range sp2.leafs {
10339+
require_False(t, c.leaf.isolated)
10340+
}
10341+
10342+
nch := natsConnect(t, sh.ClientURL(), nats.UserInfo("HA", "pwd"))
10343+
nc1 := natsConnect(t, sp1.ClientURL(), nats.UserInfo("A", "pwd"))
10344+
10345+
// Create a north-south subscription on the hub that should be visible to both spokes.
10346+
nssub, err := nch.SubscribeSync("northsouth")
10347+
require_NoError(t, err)
10348+
checkSubInterest(t, sh, "HA", "northsouth", time.Second) // Visible to the hub.
10349+
checkSubInterest(t, sp1, "A", "northsouth", time.Second) // Visible to both spokes.
10350+
checkSubInterest(t, sp2, "A", "northsouth", time.Second) // Visible to both spokes.
10351+
10352+
// The spoke subscriptions should be visible to the hub in all cases, but only
10353+
// visible to other spokes if they are not isolated.
10354+
ewsub, err := nc1.SubscribeSync("eastwest")
10355+
require_NoError(t, err)
10356+
checkSubInterest(t, sh, "HA", "eastwest", time.Second) // Visible to the hub.
10357+
checkSubInterest(t, sp1, "A", "eastwest", time.Second) // Visible to the spoke with the sub.
10358+
if isolated {
10359+
checkSubNoInterest(t, sp2, "A", "eastwest", time.Second) // Not visible to the other spoke.
10360+
} else {
10361+
checkSubInterest(t, sp2, "A", "eastwest", time.Second) // Visible to the other spoke.
10362+
}
10363+
require_NoError(t, ewsub.Unsubscribe())
10364+
checkSubNoInterest(t, sh, "HA", "eastwest", time.Second)
10365+
checkSubNoInterest(t, sp1, "A", "eastwest", time.Second)
10366+
checkSubNoInterest(t, sp2, "A", "eastwest", time.Second)
10367+
10368+
// ... but a subscription from the hub should be visible to both.
10369+
require_NoError(t, nssub.Unsubscribe())
10370+
checkSubNoInterest(t, sh, "HA", "northsouth", time.Second)
10371+
checkSubNoInterest(t, sp1, "A", "northsouth", time.Second)
10372+
checkSubNoInterest(t, sp2, "A", "northsouth", time.Second)
10373+
})
10374+
}
10375+
}
10376+
10377+
func TestLeafNodeIsolatedLeafSubjectPropagationRequestIsolation(t *testing.T) {
10378+
for tname, isolated := range map[string]bool{
10379+
"Isolated": true,
10380+
"Normal": false,
10381+
} {
10382+
t.Run(tname, func(t *testing.T) {
10383+
hubTmpl := `
10384+
port: -1
10385+
server_name: "%s"
10386+
accounts {
10387+
HA { users: [{user: HA, password: pwd}] }
10388+
}
10389+
leafnodes {
10390+
port: -1
10391+
}
10392+
`
10393+
confH := createConfFile(t, []byte(fmt.Sprintf(hubTmpl, "H1")))
10394+
sh, oh := RunServerWithConfig(confH)
10395+
defer sh.Shutdown()
10396+
10397+
spokeTmpl := `
10398+
port: -1
10399+
server_name: "%s"
10400+
accounts {
10401+
A { users: [{user: A, password: pwd}] }
10402+
}
10403+
leafnodes {
10404+
remotes [
10405+
{
10406+
url: "nats://HA:[email protected]:%d"
10407+
local: "A"
10408+
request_isolation: %v
10409+
}
10410+
]
10411+
}
10412+
`
10413+
10414+
confSP1 := createConfFile(t, []byte(fmt.Sprintf(spokeTmpl, "SP1", oh.LeafNode.Port, isolated)))
10415+
sp1, _ := RunServerWithConfig(confSP1)
10416+
defer sp1.Shutdown()
10417+
10418+
confSP2 := createConfFile(t, []byte(fmt.Sprintf(spokeTmpl, "SP2", oh.LeafNode.Port, isolated)))
10419+
sp2, _ := RunServerWithConfig(confSP2)
10420+
defer sp2.Shutdown()
10421+
10422+
checkLeafNodeConnectedCount(t, sh, 2)
10423+
checkLeafNodeConnectedCount(t, sp1, 1)
10424+
checkLeafNodeConnectedCount(t, sp2, 1)
10425+
10426+
// We expect that the hub side will have answered the request from the spoke for
10427+
// isolation if needed, but the spokes themselves will not themselves isolate
10428+
// subscription interest in the other direction unless also configured to do so.
10429+
for _, c := range sh.leafs {
10430+
require_Equal(t, c.leaf.isolated, isolated)
10431+
}
10432+
for _, c := range sp1.leafs {
10433+
require_False(t, c.leaf.isolated)
10434+
}
10435+
for _, c := range sp2.leafs {
10436+
require_False(t, c.leaf.isolated)
10437+
}
10438+
10439+
nch := natsConnect(t, sh.ClientURL(), nats.UserInfo("HA", "pwd"))
10440+
nc1 := natsConnect(t, sp1.ClientURL(), nats.UserInfo("A", "pwd"))
10441+
10442+
// Create a north-south subscription on the hub that should be visible to both spokes.
10443+
nssub, err := nch.SubscribeSync("northsouth")
10444+
require_NoError(t, err)
10445+
checkSubInterest(t, sh, "HA", "northsouth", time.Second) // Visible to the hub.
10446+
checkSubInterest(t, sp1, "A", "northsouth", time.Second) // Visible to both spokes.
10447+
checkSubInterest(t, sp2, "A", "northsouth", time.Second) // Visible to both spokes.
10448+
10449+
// The spoke subscriptions should be visible to the hub in all cases, but only
10450+
// visible to other spokes if they are not isolated.
10451+
ewsub, err := nc1.SubscribeSync("eastwest")
10452+
require_NoError(t, err)
10453+
checkSubInterest(t, sh, "HA", "eastwest", time.Second) // Visible to the hub.
10454+
checkSubInterest(t, sp1, "A", "eastwest", time.Second) // Visible to the spoke with the sub.
10455+
if isolated {
10456+
checkSubNoInterest(t, sp2, "A", "eastwest", time.Second) // Not visible to the other spoke.
10457+
} else {
10458+
checkSubInterest(t, sp2, "A", "eastwest", time.Second) // Visible to the other spoke.
10459+
}
10460+
require_NoError(t, ewsub.Unsubscribe())
10461+
checkSubNoInterest(t, sh, "HA", "eastwest", time.Second)
10462+
checkSubNoInterest(t, sp1, "A", "eastwest", time.Second)
10463+
checkSubNoInterest(t, sp2, "A", "eastwest", time.Second)
10464+
10465+
// ... but a subscription from the hub should be visible to both.
10466+
require_NoError(t, nssub.Unsubscribe())
10467+
checkSubNoInterest(t, sh, "HA", "northsouth", time.Second)
10468+
checkSubNoInterest(t, sp1, "A", "northsouth", time.Second)
10469+
checkSubNoInterest(t, sp2, "A", "northsouth", time.Second)
10470+
})
10471+
}
10472+
}
10473+
10474+
func TestLeafNodeIsolatedLeafSubjectPropagationLocalIsolation(t *testing.T) {
10475+
for tname, isolated := range map[string]bool{
10476+
"Isolated": true,
10477+
"Normal": false,
10478+
} {
10479+
t.Run(tname, func(t *testing.T) {
10480+
spokeTmpl := `
10481+
port: -1
10482+
server_name: "%s"
10483+
accounts {
10484+
A { users: [{user: A, password: pwd}] }
10485+
}
10486+
leafnodes {
10487+
port: -1
10488+
}
10489+
`
10490+
10491+
confSP1 := createConfFile(t, []byte(fmt.Sprintf(spokeTmpl, "SP1")))
10492+
sp1, osp1 := RunServerWithConfig(confSP1)
10493+
defer sp1.Shutdown()
10494+
10495+
confSP2 := createConfFile(t, []byte(fmt.Sprintf(spokeTmpl, "SP2")))
10496+
sp2, osp2 := RunServerWithConfig(confSP2)
10497+
defer sp2.Shutdown()
10498+
10499+
hubTmpl := `
10500+
port: -1
10501+
server_name: "%s"
10502+
accounts {
10503+
HA { users: [{user: HA, password: pwd}] }
10504+
}
10505+
leafnodes {
10506+
port: -1
10507+
remotes [
10508+
{
10509+
url: "nats://A:[email protected]:%d"
10510+
local: "HA"
10511+
isolate: %v
10512+
hub: true
10513+
},
10514+
{
10515+
url: "nats://A:[email protected]:%d"
10516+
local: "HA"
10517+
isolate: %v
10518+
hub: true
10519+
}
10520+
]
10521+
}
10522+
`
10523+
confH := createConfFile(t, []byte(fmt.Sprintf(hubTmpl, "H1", osp1.LeafNode.Port, isolated, osp2.LeafNode.Port, isolated)))
10524+
sh, _ := RunServerWithConfig(confH)
10525+
defer sh.Shutdown()
10526+
10527+
checkLeafNodeConnectedCount(t, sh, 2)
10528+
checkLeafNodeConnectedCount(t, sp1, 1)
10529+
checkLeafNodeConnectedCount(t, sp2, 1)
10530+
10531+
// We expect that the hub side will have answered the request from the spoke for
10532+
// isolation if needed, but the spokes themselves will not themselves isolate
10533+
// subscription interest in the other direction unless also configured to do so.
10534+
for _, c := range sh.leafs {
10535+
require_Equal(t, c.leaf.isolated, isolated)
10536+
}
10537+
for _, c := range sp1.leafs {
10538+
require_False(t, c.leaf.isolated)
10539+
}
10540+
for _, c := range sp2.leafs {
10541+
require_False(t, c.leaf.isolated)
10542+
}
10543+
1032910544
nch := natsConnect(t, sh.ClientURL(), nats.UserInfo("HA", "pwd"))
1033010545
nc1 := natsConnect(t, sp1.ClientURL(), nats.UserInfo("A", "pwd"))
1033110546

server/opts.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,12 @@ type RemoteLeafOpts struct {
253253
// will be migrated away from this server if still disconnected.
254254
JetStreamClusterMigrateDelay time.Duration `json:"jetstream_cluster_migrate_delay,omitempty"`
255255

256+
// LocalIsolation isolates this remote from east-west subject interest originating locally.
257+
LocalIsolation bool `json:"local_isolation,omitempty"`
258+
259+
// RequestIsolation asks the remote side to isolate us from their east-west subject interest.
260+
RequestIsolation bool `json:"request_isolation,omitempty"`
261+
256262
// If this is set to true, the connection to this remote will not be solicited.
257263
// During a configuration reload, if this is changed from `false` to `true`, the
258264
// existing connection will be closed and not solicited again (until it is changed
@@ -2971,6 +2977,10 @@ func parseRemoteLeafNodes(v any, errors *[]error, warnings *[]error) ([]*RemoteL
29712977
default:
29722978
*errors = append(*errors, &configErr{tk, fmt.Sprintf("Expected boolean or map for jetstream_cluster_migrate, got %T", v)})
29732979
}
2980+
case "isolate_leafnode_interest", "isolate":
2981+
remote.LocalIsolation = v.(bool)
2982+
case "request_isolation":
2983+
remote.RequestIsolation = v.(bool)
29742984
case "compression":
29752985
if err := parseCompression(&remote.Compression, CompressionS2Auto, tk, k, v); err != nil {
29762986
*errors = append(*errors, err)

0 commit comments

Comments
 (0)