Skip to content

Commit edf0fe3

Browse files
authored
Merge pull request #3604 from nats-io/fix_3191
[FIXED] LeafNode: possible duplicate messages in complex setup
2 parents 0f8aa11 + 91c84c0 commit edf0fe3

File tree

2 files changed

+263
-35
lines changed

2 files changed

+263
-35
lines changed

server/leafnode.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,7 @@ func (s *Server) removeLeafNodeURL(urlStr string) bool {
816816

817817
// Server lock is held on entry
818818
func (s *Server) generateLeafNodeInfoJSON() {
819+
s.leafNodeInfo.Cluster = s.cachedClusterName()
819820
s.leafNodeInfo.LeafNodeURLs = s.leafURLsMap.getAsStringSlice()
820821
s.leafNodeInfo.WSConnectURLs = s.websocket.connectURLsMap.getAsStringSlice()
821822
b, _ := json.Marshal(s.leafNodeInfo)
@@ -1767,6 +1768,11 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) {
17671768
// Check to make sure this sub does not have an origin cluster than matches the leafnode.
17681769
ln.mu.Lock()
17691770
skip := (sub.origin != nil && string(sub.origin) == ln.remoteCluster()) || !ln.canSubscribe(string(sub.subject))
1771+
// If skipped, make sure that we still let go the "$LDS." subscription that allows
1772+
// the detection of a loop.
1773+
if skip && bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix)) {
1774+
skip = false
1775+
}
17701776
ln.mu.Unlock()
17711777
if skip {
17721778
continue

server/leafnode_test.go

Lines changed: 257 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -845,45 +845,58 @@ func (l *loopDetectedLogger) Errorf(format string, v ...interface{}) {
845845
}
846846

847847
func TestLeafNodeLoop(t *testing.T) {
848-
// This test requires that we set the port to known value because
849-
// we want A point to B and B to A.
850-
oa := DefaultOptions()
851-
oa.LeafNode.ReconnectInterval = 10 * time.Millisecond
852-
oa.LeafNode.Port = 1234
853-
ub, _ := url.Parse("nats://127.0.0.1:5678")
854-
oa.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ub}}}
855-
oa.LeafNode.connDelay = 50 * time.Millisecond
856-
sa := RunServer(oa)
857-
defer sa.Shutdown()
848+
test := func(t *testing.T, cluster bool) {
849+
// This test requires that we set the port to known value because
850+
// we want A point to B and B to A.
851+
oa := DefaultOptions()
852+
if !cluster {
853+
oa.Cluster.Port = 0
854+
oa.Cluster.Name = _EMPTY_
855+
}
856+
oa.LeafNode.ReconnectInterval = 10 * time.Millisecond
857+
oa.LeafNode.Port = 1234
858+
ub, _ := url.Parse("nats://127.0.0.1:5678")
859+
oa.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ub}}}
860+
oa.LeafNode.connDelay = 50 * time.Millisecond
861+
sa := RunServer(oa)
862+
defer sa.Shutdown()
863+
864+
l := &loopDetectedLogger{ch: make(chan string, 1)}
865+
sa.SetLogger(l, false, false)
866+
867+
ob := DefaultOptions()
868+
if !cluster {
869+
ob.Cluster.Port = 0
870+
ob.Cluster.Name = _EMPTY_
871+
} else {
872+
ob.Cluster.Name = "xyz"
873+
}
874+
ob.LeafNode.ReconnectInterval = 10 * time.Millisecond
875+
ob.LeafNode.Port = 5678
876+
ua, _ := url.Parse("nats://127.0.0.1:1234")
877+
ob.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ua}}}
878+
ob.LeafNode.connDelay = 50 * time.Millisecond
879+
sb := RunServer(ob)
880+
defer sb.Shutdown()
858881

859-
l := &loopDetectedLogger{ch: make(chan string, 1)}
860-
sa.SetLogger(l, false, false)
882+
select {
883+
case <-l.ch:
884+
// OK!
885+
case <-time.After(2 * time.Second):
886+
t.Fatalf("Did not get any error regarding loop")
887+
}
861888

862-
ob := DefaultOptions()
863-
ob.Cluster.Name = "xyz"
864-
ob.LeafNode.ReconnectInterval = 10 * time.Millisecond
865-
ob.LeafNode.Port = 5678
866-
ua, _ := url.Parse("nats://127.0.0.1:1234")
867-
ob.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ua}}}
868-
ob.LeafNode.connDelay = 50 * time.Millisecond
869-
sb := RunServer(ob)
870-
defer sb.Shutdown()
889+
sb.Shutdown()
890+
ob.Port = -1
891+
ob.Cluster.Port = -1
892+
ob.LeafNode.Remotes = nil
893+
sb = RunServer(ob)
894+
defer sb.Shutdown()
871895

872-
select {
873-
case <-l.ch:
874-
// OK!
875-
case <-time.After(2 * time.Second):
876-
t.Fatalf("Did not get any error regarding loop")
896+
checkLeafNodeConnected(t, sa)
877897
}
878-
879-
sb.Shutdown()
880-
ob.Port = -1
881-
ob.Cluster.Port = -1
882-
ob.LeafNode.Remotes = nil
883-
sb = RunServer(ob)
884-
defer sb.Shutdown()
885-
886-
checkLeafNodeConnected(t, sa)
898+
t.Run("standalone", func(t *testing.T) { test(t, false) })
899+
t.Run("cluster", func(t *testing.T) { test(t, true) })
887900
}
888901

889902
func TestLeafNodeLoopFromDAG(t *testing.T) {
@@ -4662,3 +4675,212 @@ func TestLeafNodePermsSuppressSubs(t *testing.T) {
46624675
// OK
46634676
}
46644677
}
4678+
4679+
func TestLeafNodeDuplicateMsg(t *testing.T) {
4680+
// This involves 2 clusters with leafnodes to each other with a different
4681+
// account, and those accounts import/export a subject that caused
4682+
// duplicate messages. This test requires static ports since we need to
4683+
// have A->B and B->A.
4684+
a1Conf := createConfFile(t, []byte(`
4685+
cluster : {
4686+
name : A
4687+
port : -1
4688+
}
4689+
leafnodes : {
4690+
port : 14333
4691+
remotes : [{
4692+
account : A
4693+
urls : [nats://leafa:[email protected]:24333]
4694+
}]
4695+
}
4696+
port : -1
4697+
server_name : A_1
4698+
4699+
accounts:{
4700+
A:{
4701+
users:[
4702+
{user: leafa, password: pwd},
4703+
{user: usera, password: usera, permissions: {
4704+
publish:{ allow:["iot.b.topic"] }
4705+
subscribe:{ allow:["iot.a.topic"] }
4706+
}}
4707+
]
4708+
imports:[
4709+
{stream:{account:"B", subject:"iot.a.topic"}}
4710+
]
4711+
},
4712+
B:{
4713+
users:[
4714+
{user: leafb, password: pwd},
4715+
]
4716+
exports:[
4717+
{stream: "iot.a.topic", accounts: ["A"]}
4718+
]
4719+
}
4720+
}
4721+
`))
4722+
defer removeFile(t, a1Conf)
4723+
a1, oa1 := RunServerWithConfig(a1Conf)
4724+
defer a1.Shutdown()
4725+
4726+
a2Conf := createConfFile(t, []byte(fmt.Sprintf(`
4727+
cluster : {
4728+
name : A
4729+
port : -1
4730+
routes : [nats://127.0.0.1:%d]
4731+
}
4732+
leafnodes : {
4733+
port : 14334
4734+
remotes : [{
4735+
account : A
4736+
urls : [nats://leafa:[email protected]:24334]
4737+
}]
4738+
}
4739+
port : -1
4740+
server_name : A_2
4741+
4742+
accounts:{
4743+
A:{
4744+
users:[
4745+
{user: leafa, password: pwd},
4746+
{user: usera, password: usera, permissions: {
4747+
publish:{ allow:["iot.b.topic"] }
4748+
subscribe:{ allow:["iot.a.topic"] }
4749+
}}
4750+
]
4751+
imports:[
4752+
{stream:{account:"B", subject:"iot.a.topic"}}
4753+
]
4754+
},
4755+
B:{
4756+
users:[
4757+
{user: leafb, password: pwd},
4758+
]
4759+
exports:[
4760+
{stream: "iot.a.topic", accounts: ["A"]}
4761+
]
4762+
}
4763+
}`, oa1.Cluster.Port)))
4764+
defer removeFile(t, a2Conf)
4765+
a2, _ := RunServerWithConfig(a2Conf)
4766+
defer a2.Shutdown()
4767+
4768+
checkClusterFormed(t, a1, a2)
4769+
4770+
b1Conf := createConfFile(t, []byte(`
4771+
cluster : {
4772+
name : B
4773+
port : -1
4774+
}
4775+
leafnodes : {
4776+
port : 24333
4777+
remotes : [{
4778+
account : B
4779+
urls : [nats://leafb:[email protected]:14333]
4780+
}]
4781+
}
4782+
port : -1
4783+
server_name : B_1
4784+
4785+
accounts:{
4786+
A:{
4787+
users:[
4788+
{user: leafa, password: pwd},
4789+
]
4790+
exports:[
4791+
{stream: "iot.b.topic", accounts: ["B"]}
4792+
]
4793+
},
4794+
B:{
4795+
users:[
4796+
{user: leafb, password: pwd},
4797+
{user: userb, password: userb, permissions: {
4798+
publish:{ allow:["iot.a.topic"] },
4799+
subscribe:{ allow:["iot.b.topic"] }
4800+
}}
4801+
]
4802+
imports:[
4803+
{stream:{account:"A", subject:"iot.b.topic"}}
4804+
]
4805+
}
4806+
}`))
4807+
defer removeFile(t, b1Conf)
4808+
b1, ob1 := RunServerWithConfig(b1Conf)
4809+
defer b1.Shutdown()
4810+
4811+
b2Conf := createConfFile(t, []byte(fmt.Sprintf(`
4812+
cluster : {
4813+
name : B
4814+
port : -1
4815+
routes : [nats://127.0.0.1:%d]
4816+
}
4817+
leafnodes : {
4818+
port : 24334
4819+
remotes : [{
4820+
account : B
4821+
urls : [nats://leafb:[email protected]:14334]
4822+
}]
4823+
}
4824+
port : -1
4825+
server_name : B_2
4826+
4827+
accounts:{
4828+
A:{
4829+
users:[
4830+
{user: leafa, password: pwd},
4831+
]
4832+
exports:[
4833+
{stream: "iot.b.topic", accounts: ["B"]}
4834+
]
4835+
},
4836+
B:{
4837+
users:[
4838+
{user: leafb, password: pwd},
4839+
{user: userb, password: userb, permissions: {
4840+
publish:{ allow:["iot.a.topic"] },
4841+
subscribe:{ allow:["iot.b.topic"] }
4842+
}}
4843+
]
4844+
imports:[
4845+
{stream:{account:"A", subject:"iot.b.topic"}}
4846+
]
4847+
}
4848+
}`, ob1.Cluster.Port)))
4849+
defer removeFile(t, b2Conf)
4850+
b2, _ := RunServerWithConfig(b2Conf)
4851+
defer b2.Shutdown()
4852+
4853+
checkClusterFormed(t, b1, b2)
4854+
4855+
checkLeafNodeConnectedCount(t, a1, 2)
4856+
checkLeafNodeConnectedCount(t, a2, 2)
4857+
checkLeafNodeConnectedCount(t, b1, 2)
4858+
checkLeafNodeConnectedCount(t, b2, 2)
4859+
4860+
check := func(t *testing.T, subSrv *Server, pubSrv *Server) {
4861+
4862+
sc := natsConnect(t, subSrv.ClientURL(), nats.UserInfo("userb", "userb"))
4863+
defer sc.Close()
4864+
4865+
subject := "iot.b.topic"
4866+
sub := natsSubSync(t, sc, subject)
4867+
4868+
// Wait for this to be available in A cluster
4869+
checkSubInterest(t, a1, "A", subject, time.Second)
4870+
checkSubInterest(t, a2, "A", subject, time.Second)
4871+
4872+
pb := natsConnect(t, pubSrv.ClientURL(), nats.UserInfo("usera", "usera"))
4873+
defer pb.Close()
4874+
4875+
natsPub(t, pb, subject, []byte("msg"))
4876+
natsNexMsg(t, sub, time.Second)
4877+
// Should be only 1
4878+
if msg, err := sub.NextMsg(100 * time.Millisecond); err == nil {
4879+
t.Fatalf("Received duplicate on %q: %s", msg.Subject, msg.Data)
4880+
}
4881+
}
4882+
t.Run("sub_b1_pub_a1", func(t *testing.T) { check(t, b1, a1) })
4883+
t.Run("sub_b1_pub_a2", func(t *testing.T) { check(t, b1, a2) })
4884+
t.Run("sub_b2_pub_a1", func(t *testing.T) { check(t, b2, a1) })
4885+
t.Run("sub_b2_pub_a2", func(t *testing.T) { check(t, b2, a2) })
4886+
}

0 commit comments

Comments
 (0)