Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,7 @@ func (s *Server) removeLeafNodeURL(urlStr string) bool {

// Server lock is held on entry
func (s *Server) generateLeafNodeInfoJSON() {
s.leafNodeInfo.Cluster = s.cachedClusterName()
s.leafNodeInfo.LeafNodeURLs = s.leafURLsMap.getAsStringSlice()
s.leafNodeInfo.WSConnectURLs = s.websocket.connectURLsMap.getAsStringSlice()
b, _ := json.Marshal(s.leafNodeInfo)
Expand Down Expand Up @@ -1767,6 +1768,11 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) {
// Check to make sure this sub does not have an origin cluster than matches the leafnode.
ln.mu.Lock()
skip := (sub.origin != nil && string(sub.origin) == ln.remoteCluster()) || !ln.canSubscribe(string(sub.subject))
// If skipped, make sure that we still let go the "$LDS." subscription that allows
// the detection of a loop.
if skip && bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix)) {
skip = false
}
ln.mu.Unlock()
if skip {
continue
Expand Down
292 changes: 257 additions & 35 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,45 +845,58 @@ func (l *loopDetectedLogger) Errorf(format string, v ...interface{}) {
}

func TestLeafNodeLoop(t *testing.T) {
// This test requires that we set the port to known value because
// we want A point to B and B to A.
oa := DefaultOptions()
oa.LeafNode.ReconnectInterval = 10 * time.Millisecond
oa.LeafNode.Port = 1234
ub, _ := url.Parse("nats://127.0.0.1:5678")
oa.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ub}}}
oa.LeafNode.connDelay = 50 * time.Millisecond
sa := RunServer(oa)
defer sa.Shutdown()
test := func(t *testing.T, cluster bool) {
// This test requires that we set the port to known value because
// we want A point to B and B to A.
oa := DefaultOptions()
if !cluster {
oa.Cluster.Port = 0
oa.Cluster.Name = _EMPTY_
}
oa.LeafNode.ReconnectInterval = 10 * time.Millisecond
oa.LeafNode.Port = 1234
ub, _ := url.Parse("nats://127.0.0.1:5678")
oa.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ub}}}
oa.LeafNode.connDelay = 50 * time.Millisecond
sa := RunServer(oa)
defer sa.Shutdown()

l := &loopDetectedLogger{ch: make(chan string, 1)}
sa.SetLogger(l, false, false)

ob := DefaultOptions()
if !cluster {
ob.Cluster.Port = 0
ob.Cluster.Name = _EMPTY_
} else {
ob.Cluster.Name = "xyz"
}
ob.LeafNode.ReconnectInterval = 10 * time.Millisecond
ob.LeafNode.Port = 5678
ua, _ := url.Parse("nats://127.0.0.1:1234")
ob.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ua}}}
ob.LeafNode.connDelay = 50 * time.Millisecond
sb := RunServer(ob)
defer sb.Shutdown()

l := &loopDetectedLogger{ch: make(chan string, 1)}
sa.SetLogger(l, false, false)
select {
case <-l.ch:
// OK!
case <-time.After(2 * time.Second):
t.Fatalf("Did not get any error regarding loop")
}

ob := DefaultOptions()
ob.Cluster.Name = "xyz"
ob.LeafNode.ReconnectInterval = 10 * time.Millisecond
ob.LeafNode.Port = 5678
ua, _ := url.Parse("nats://127.0.0.1:1234")
ob.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ua}}}
ob.LeafNode.connDelay = 50 * time.Millisecond
sb := RunServer(ob)
defer sb.Shutdown()
sb.Shutdown()
ob.Port = -1
ob.Cluster.Port = -1
ob.LeafNode.Remotes = nil
sb = RunServer(ob)
defer sb.Shutdown()

select {
case <-l.ch:
// OK!
case <-time.After(2 * time.Second):
t.Fatalf("Did not get any error regarding loop")
checkLeafNodeConnected(t, sa)
}

sb.Shutdown()
ob.Port = -1
ob.Cluster.Port = -1
ob.LeafNode.Remotes = nil
sb = RunServer(ob)
defer sb.Shutdown()

checkLeafNodeConnected(t, sa)
t.Run("standalone", func(t *testing.T) { test(t, false) })
t.Run("cluster", func(t *testing.T) { test(t, true) })
}

func TestLeafNodeLoopFromDAG(t *testing.T) {
Expand Down Expand Up @@ -4662,3 +4675,212 @@ func TestLeafNodePermsSuppressSubs(t *testing.T) {
// OK
}
}

func TestLeafNodeDuplicateMsg(t *testing.T) {
// This involves 2 clusters with leafnodes to each other with a different
// account, and those accounts import/export a subject that caused
// duplicate messages. This test requires static ports since we need to
// have A->B and B->A.
a1Conf := createConfFile(t, []byte(`
cluster : {
name : A
port : -1
}
leafnodes : {
port : 14333
remotes : [{
account : A
urls : [nats://leafa:[email protected]:24333]
}]
}
port : -1
server_name : A_1

accounts:{
A:{
users:[
{user: leafa, password: pwd},
{user: usera, password: usera, permissions: {
publish:{ allow:["iot.b.topic"] }
subscribe:{ allow:["iot.a.topic"] }
}}
]
imports:[
{stream:{account:"B", subject:"iot.a.topic"}}
]
},
B:{
users:[
{user: leafb, password: pwd},
]
exports:[
{stream: "iot.a.topic", accounts: ["A"]}
]
}
}
`))
defer removeFile(t, a1Conf)
a1, oa1 := RunServerWithConfig(a1Conf)
defer a1.Shutdown()

a2Conf := createConfFile(t, []byte(fmt.Sprintf(`
cluster : {
name : A
port : -1
routes : [nats://127.0.0.1:%d]
}
leafnodes : {
port : 14334
remotes : [{
account : A
urls : [nats://leafa:[email protected]:24334]
}]
}
port : -1
server_name : A_2

accounts:{
A:{
users:[
{user: leafa, password: pwd},
{user: usera, password: usera, permissions: {
publish:{ allow:["iot.b.topic"] }
subscribe:{ allow:["iot.a.topic"] }
}}
]
imports:[
{stream:{account:"B", subject:"iot.a.topic"}}
]
},
B:{
users:[
{user: leafb, password: pwd},
]
exports:[
{stream: "iot.a.topic", accounts: ["A"]}
]
}
}`, oa1.Cluster.Port)))
defer removeFile(t, a2Conf)
a2, _ := RunServerWithConfig(a2Conf)
defer a2.Shutdown()

checkClusterFormed(t, a1, a2)

b1Conf := createConfFile(t, []byte(`
cluster : {
name : B
port : -1
}
leafnodes : {
port : 24333
remotes : [{
account : B
urls : [nats://leafb:[email protected]:14333]
}]
}
port : -1
server_name : B_1

accounts:{
A:{
users:[
{user: leafa, password: pwd},
]
exports:[
{stream: "iot.b.topic", accounts: ["B"]}
]
},
B:{
users:[
{user: leafb, password: pwd},
{user: userb, password: userb, permissions: {
publish:{ allow:["iot.a.topic"] },
subscribe:{ allow:["iot.b.topic"] }
}}
]
imports:[
{stream:{account:"A", subject:"iot.b.topic"}}
]
}
}`))
defer removeFile(t, b1Conf)
b1, ob1 := RunServerWithConfig(b1Conf)
defer b1.Shutdown()

b2Conf := createConfFile(t, []byte(fmt.Sprintf(`
cluster : {
name : B
port : -1
routes : [nats://127.0.0.1:%d]
}
leafnodes : {
port : 24334
remotes : [{
account : B
urls : [nats://leafb:[email protected]:14334]
}]
}
port : -1
server_name : B_2

accounts:{
A:{
users:[
{user: leafa, password: pwd},
]
exports:[
{stream: "iot.b.topic", accounts: ["B"]}
]
},
B:{
users:[
{user: leafb, password: pwd},
{user: userb, password: userb, permissions: {
publish:{ allow:["iot.a.topic"] },
subscribe:{ allow:["iot.b.topic"] }
}}
]
imports:[
{stream:{account:"A", subject:"iot.b.topic"}}
]
}
}`, ob1.Cluster.Port)))
defer removeFile(t, b2Conf)
b2, _ := RunServerWithConfig(b2Conf)
defer b2.Shutdown()

checkClusterFormed(t, b1, b2)

checkLeafNodeConnectedCount(t, a1, 2)
checkLeafNodeConnectedCount(t, a2, 2)
checkLeafNodeConnectedCount(t, b1, 2)
checkLeafNodeConnectedCount(t, b2, 2)

check := func(t *testing.T, subSrv *Server, pubSrv *Server) {

sc := natsConnect(t, subSrv.ClientURL(), nats.UserInfo("userb", "userb"))
defer sc.Close()

subject := "iot.b.topic"
sub := natsSubSync(t, sc, subject)

// Wait for this to be available in A cluster
checkSubInterest(t, a1, "A", subject, time.Second)
checkSubInterest(t, a2, "A", subject, time.Second)

pb := natsConnect(t, pubSrv.ClientURL(), nats.UserInfo("usera", "usera"))
defer pb.Close()

natsPub(t, pb, subject, []byte("msg"))
natsNexMsg(t, sub, time.Second)
// Should be only 1
if msg, err := sub.NextMsg(100 * time.Millisecond); err == nil {
t.Fatalf("Received duplicate on %q: %s", msg.Subject, msg.Data)
}
}
t.Run("sub_b1_pub_a1", func(t *testing.T) { check(t, b1, a1) })
t.Run("sub_b1_pub_a2", func(t *testing.T) { check(t, b1, a2) })
t.Run("sub_b2_pub_a1", func(t *testing.T) { check(t, b2, a1) })
t.Run("sub_b2_pub_a2", func(t *testing.T) { check(t, b2, a2) })
}