diff --git a/server/leafnode.go b/server/leafnode.go index dc48b57f8f5..c21027cc933 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -816,6 +816,7 @@ func (s *Server) removeLeafNodeURL(urlStr string) bool { // Server lock is held on entry func (s *Server) generateLeafNodeInfoJSON() { + s.leafNodeInfo.Cluster = s.cachedClusterName() s.leafNodeInfo.LeafNodeURLs = s.leafURLsMap.getAsStringSlice() s.leafNodeInfo.WSConnectURLs = s.websocket.connectURLsMap.getAsStringSlice() b, _ := json.Marshal(s.leafNodeInfo) @@ -1767,6 +1768,11 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) { // Check to make sure this sub does not have an origin cluster than matches the leafnode. ln.mu.Lock() skip := (sub.origin != nil && string(sub.origin) == ln.remoteCluster()) || !ln.canSubscribe(string(sub.subject)) + // If skipped, make sure that we still let go the "$LDS." subscription that allows + // the detection of a loop. + if skip && bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix)) { + skip = false + } ln.mu.Unlock() if skip { continue diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 903396cbdab..bb81d85ba2c 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -845,45 +845,58 @@ func (l *loopDetectedLogger) Errorf(format string, v ...interface{}) { } func TestLeafNodeLoop(t *testing.T) { - // This test requires that we set the port to known value because - // we want A point to B and B to A. - oa := DefaultOptions() - oa.LeafNode.ReconnectInterval = 10 * time.Millisecond - oa.LeafNode.Port = 1234 - ub, _ := url.Parse("nats://127.0.0.1:5678") - oa.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ub}}} - oa.LeafNode.connDelay = 50 * time.Millisecond - sa := RunServer(oa) - defer sa.Shutdown() + test := func(t *testing.T, cluster bool) { + // This test requires that we set the port to known value because + // we want A point to B and B to A. + oa := DefaultOptions() + if !cluster { + oa.Cluster.Port = 0 + oa.Cluster.Name = _EMPTY_ + } + oa.LeafNode.ReconnectInterval = 10 * time.Millisecond + oa.LeafNode.Port = 1234 + ub, _ := url.Parse("nats://127.0.0.1:5678") + oa.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ub}}} + oa.LeafNode.connDelay = 50 * time.Millisecond + sa := RunServer(oa) + defer sa.Shutdown() + + l := &loopDetectedLogger{ch: make(chan string, 1)} + sa.SetLogger(l, false, false) + + ob := DefaultOptions() + if !cluster { + ob.Cluster.Port = 0 + ob.Cluster.Name = _EMPTY_ + } else { + ob.Cluster.Name = "xyz" + } + ob.LeafNode.ReconnectInterval = 10 * time.Millisecond + ob.LeafNode.Port = 5678 + ua, _ := url.Parse("nats://127.0.0.1:1234") + ob.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ua}}} + ob.LeafNode.connDelay = 50 * time.Millisecond + sb := RunServer(ob) + defer sb.Shutdown() - l := &loopDetectedLogger{ch: make(chan string, 1)} - sa.SetLogger(l, false, false) + select { + case <-l.ch: + // OK! + case <-time.After(2 * time.Second): + t.Fatalf("Did not get any error regarding loop") + } - ob := DefaultOptions() - ob.Cluster.Name = "xyz" - ob.LeafNode.ReconnectInterval = 10 * time.Millisecond - ob.LeafNode.Port = 5678 - ua, _ := url.Parse("nats://127.0.0.1:1234") - ob.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ua}}} - ob.LeafNode.connDelay = 50 * time.Millisecond - sb := RunServer(ob) - defer sb.Shutdown() + sb.Shutdown() + ob.Port = -1 + ob.Cluster.Port = -1 + ob.LeafNode.Remotes = nil + sb = RunServer(ob) + defer sb.Shutdown() - select { - case <-l.ch: - // OK! - case <-time.After(2 * time.Second): - t.Fatalf("Did not get any error regarding loop") + checkLeafNodeConnected(t, sa) } - - sb.Shutdown() - ob.Port = -1 - ob.Cluster.Port = -1 - ob.LeafNode.Remotes = nil - sb = RunServer(ob) - defer sb.Shutdown() - - checkLeafNodeConnected(t, sa) + t.Run("standalone", func(t *testing.T) { test(t, false) }) + t.Run("cluster", func(t *testing.T) { test(t, true) }) } func TestLeafNodeLoopFromDAG(t *testing.T) { @@ -4662,3 +4675,212 @@ func TestLeafNodePermsSuppressSubs(t *testing.T) { // OK } } + +func TestLeafNodeDuplicateMsg(t *testing.T) { + // This involves 2 clusters with leafnodes to each other with a different + // account, and those accounts import/export a subject that caused + // duplicate messages. This test requires static ports since we need to + // have A->B and B->A. + a1Conf := createConfFile(t, []byte(` + cluster : { + name : A + port : -1 + } + leafnodes : { + port : 14333 + remotes : [{ + account : A + urls : [nats://leafa:pwd@127.0.0.1:24333] + }] + } + port : -1 + server_name : A_1 + + accounts:{ + A:{ + users:[ + {user: leafa, password: pwd}, + {user: usera, password: usera, permissions: { + publish:{ allow:["iot.b.topic"] } + subscribe:{ allow:["iot.a.topic"] } + }} + ] + imports:[ + {stream:{account:"B", subject:"iot.a.topic"}} + ] + }, + B:{ + users:[ + {user: leafb, password: pwd}, + ] + exports:[ + {stream: "iot.a.topic", accounts: ["A"]} + ] + } + } + `)) + defer removeFile(t, a1Conf) + a1, oa1 := RunServerWithConfig(a1Conf) + defer a1.Shutdown() + + a2Conf := createConfFile(t, []byte(fmt.Sprintf(` + cluster : { + name : A + port : -1 + routes : [nats://127.0.0.1:%d] + } + leafnodes : { + port : 14334 + remotes : [{ + account : A + urls : [nats://leafa:pwd@127.0.0.1:24334] + }] + } + port : -1 + server_name : A_2 + + accounts:{ + A:{ + users:[ + {user: leafa, password: pwd}, + {user: usera, password: usera, permissions: { + publish:{ allow:["iot.b.topic"] } + subscribe:{ allow:["iot.a.topic"] } + }} + ] + imports:[ + {stream:{account:"B", subject:"iot.a.topic"}} + ] + }, + B:{ + users:[ + {user: leafb, password: pwd}, + ] + exports:[ + {stream: "iot.a.topic", accounts: ["A"]} + ] + } + }`, oa1.Cluster.Port))) + defer removeFile(t, a2Conf) + a2, _ := RunServerWithConfig(a2Conf) + defer a2.Shutdown() + + checkClusterFormed(t, a1, a2) + + b1Conf := createConfFile(t, []byte(` + cluster : { + name : B + port : -1 + } + leafnodes : { + port : 24333 + remotes : [{ + account : B + urls : [nats://leafb:pwd@127.0.0.1:14333] + }] + } + port : -1 + server_name : B_1 + + accounts:{ + A:{ + users:[ + {user: leafa, password: pwd}, + ] + exports:[ + {stream: "iot.b.topic", accounts: ["B"]} + ] + }, + B:{ + users:[ + {user: leafb, password: pwd}, + {user: userb, password: userb, permissions: { + publish:{ allow:["iot.a.topic"] }, + subscribe:{ allow:["iot.b.topic"] } + }} + ] + imports:[ + {stream:{account:"A", subject:"iot.b.topic"}} + ] + } + }`)) + defer removeFile(t, b1Conf) + b1, ob1 := RunServerWithConfig(b1Conf) + defer b1.Shutdown() + + b2Conf := createConfFile(t, []byte(fmt.Sprintf(` + cluster : { + name : B + port : -1 + routes : [nats://127.0.0.1:%d] + } + leafnodes : { + port : 24334 + remotes : [{ + account : B + urls : [nats://leafb:pwd@127.0.0.1:14334] + }] + } + port : -1 + server_name : B_2 + + accounts:{ + A:{ + users:[ + {user: leafa, password: pwd}, + ] + exports:[ + {stream: "iot.b.topic", accounts: ["B"]} + ] + }, + B:{ + users:[ + {user: leafb, password: pwd}, + {user: userb, password: userb, permissions: { + publish:{ allow:["iot.a.topic"] }, + subscribe:{ allow:["iot.b.topic"] } + }} + ] + imports:[ + {stream:{account:"A", subject:"iot.b.topic"}} + ] + } + }`, ob1.Cluster.Port))) + defer removeFile(t, b2Conf) + b2, _ := RunServerWithConfig(b2Conf) + defer b2.Shutdown() + + checkClusterFormed(t, b1, b2) + + checkLeafNodeConnectedCount(t, a1, 2) + checkLeafNodeConnectedCount(t, a2, 2) + checkLeafNodeConnectedCount(t, b1, 2) + checkLeafNodeConnectedCount(t, b2, 2) + + check := func(t *testing.T, subSrv *Server, pubSrv *Server) { + + sc := natsConnect(t, subSrv.ClientURL(), nats.UserInfo("userb", "userb")) + defer sc.Close() + + subject := "iot.b.topic" + sub := natsSubSync(t, sc, subject) + + // Wait for this to be available in A cluster + checkSubInterest(t, a1, "A", subject, time.Second) + checkSubInterest(t, a2, "A", subject, time.Second) + + pb := natsConnect(t, pubSrv.ClientURL(), nats.UserInfo("usera", "usera")) + defer pb.Close() + + natsPub(t, pb, subject, []byte("msg")) + natsNexMsg(t, sub, time.Second) + // Should be only 1 + if msg, err := sub.NextMsg(100 * time.Millisecond); err == nil { + t.Fatalf("Received duplicate on %q: %s", msg.Subject, msg.Data) + } + } + t.Run("sub_b1_pub_a1", func(t *testing.T) { check(t, b1, a1) }) + t.Run("sub_b1_pub_a2", func(t *testing.T) { check(t, b1, a2) }) + t.Run("sub_b2_pub_a1", func(t *testing.T) { check(t, b2, a1) }) + t.Run("sub_b2_pub_a2", func(t *testing.T) { check(t, b2, a2) }) +}