diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index 5ebd7a1deb1..b5afe755063 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -1883,6 +1883,7 @@ type netProxy struct { down int url string surl string + port int } func newNetProxy(rtt time.Duration, upRate, downRate int, serverURL string) *netProxy { @@ -1895,14 +1896,27 @@ func createNetProxy(rtt time.Duration, upRate, downRate int, serverURL string, s if e != nil { panic(fmt.Sprintf("Error listening on port: %s, %q", hp, e)) } + u, err := url.Parse(serverURL) + if err != nil { + panic(fmt.Sprintf("Could not parse server URL: %v", err)) + } + + var clientURL string port := l.Addr().(*net.TCPAddr).Port + if u.User != nil { + clientURL = fmt.Sprintf("nats://%v@127.0.0.1:%d", u.User, port) + } else { + clientURL = fmt.Sprintf("nats://127.0.0.1:%d", port) + } + proxy := &netProxy{ listener: l, rtt: rtt, up: upRate, down: downRate, - url: fmt.Sprintf("nats://127.0.0.1:%d", port), + url: clientURL, surl: serverURL, + port: port, } if start { proxy.start() @@ -1917,6 +1931,16 @@ func (np *netProxy) start() { } host := u.Host + // Check if this is restart. + // We nil out listener on stop() + if np.listener == nil && np.port != 0 { + hp := net.JoinHostPort("127.0.0.1", fmt.Sprintf("%d", np.port)) + np.listener, err = net.Listen("tcp", hp) + if err != nil { + panic(fmt.Sprintf("Error listening on port: %s, %q", hp, err)) + } + } + go func() { for { client, err := np.listener.Accept() @@ -1942,6 +1966,10 @@ func (np *netProxy) routeURL() string { return strings.Replace(np.url, "nats", "nats-route", 1) } +func (np *netProxy) leafURL() string { + return strings.Replace(np.url, "nats", "nats-leaf", 1) +} + func (np *netProxy) loop(tbw int, r, w net.Conn) { const rbl = 8192 var buf [rbl]byte diff --git a/server/jetstream_leafnode_test.go b/server/jetstream_leafnode_test.go index 6257904445b..0c796cbba63 100644 --- a/server/jetstream_leafnode_test.go +++ b/server/jetstream_leafnode_test.go @@ -1426,3 +1426,249 @@ func TestJetStreamLeafNodeJSClusterMigrateRecoveryWithDelay(t *testing.T) { // long election timer. Now this should work reliably. lnc.waitOnStreamLeader(globalAccountName, "TEST") } + +// This will test that when a mirror or source construct is setup across a leafnode/domain +// that it will recover quickly once the LN is re-established regardless +// of backoff state of the internal consumer create. +func TestJetStreamLeafNodeAndMirrorResyncAfterConnectionDown(t *testing.T) { + tmplA := ` + listen: -1 + server_name: tcm + jetstream { + store_dir: '%s', + domain: TCM + } + accounts { + JS { users = [ { user: "y", pass: "p" } ]; jetstream: true } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + } + leaf { port: -1 } + ` + confA := createConfFile(t, []byte(fmt.Sprintf(tmplA, t.TempDir()))) + sA, oA := RunServerWithConfig(confA) + defer sA.Shutdown() + + // Create a proxy - we will use this to simulate a network down event. + rtt, bw := 10*time.Microsecond, 10*1024*1024*1024 + proxy := newNetProxy(rtt, bw, bw, fmt.Sprintf("nats://y:p@127.0.0.1:%d", oA.LeafNode.Port)) + defer proxy.stop() + + tmplB := ` + listen: -1 + server_name: xmm + jetstream { + store_dir: '%s', + domain: XMM + } + accounts { + JS { users = [ { user: "y", pass: "p" } ]; jetstream: true } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + } + leaf { remotes [ { url: %s, account: "JS" } ], reconnect: "0.25s" } + ` + + confB := createConfFile(t, []byte(fmt.Sprintf(tmplB, t.TempDir(), proxy.leafURL()))) + sB, _ := RunServerWithConfig(confB) + defer sA.Shutdown() + + // Make sure we are connected ok. + checkLeafNodeConnectedCount(t, sA, 1) + checkLeafNodeConnectedCount(t, sB, 1) + + // We will have 3 streams that we will test for proper syncing after + // the network is restored. + // + // 1. Mirror A --> B + // 2. Mirror A <-- B + // 3. Source A <-> B + + // Connect to sA. + ncA, jsA := jsClientConnect(t, sA, nats.UserInfo("y", "p")) + defer ncA.Close() + + // Connect to sB. + ncB, jsB := jsClientConnect(t, sB, nats.UserInfo("y", "p")) + defer ncB.Close() + + // Add in TEST-A + _, err := jsA.AddStream(&nats.StreamConfig{Name: "TEST-A", Subjects: []string{"foo"}}) + require_NoError(t, err) + + // Add in TEST-B + _, err = jsB.AddStream(&nats.StreamConfig{Name: "TEST-B", Subjects: []string{"bar"}}) + require_NoError(t, err) + + // Now setup mirrors. + _, err = jsB.AddStream(&nats.StreamConfig{ + Name: "M-A", + Mirror: &nats.StreamSource{ + Name: "TEST-A", + External: &nats.ExternalStream{APIPrefix: "$JS.TCM.API"}, + }, + }) + require_NoError(t, err) + + _, err = jsA.AddStream(&nats.StreamConfig{ + Name: "M-B", + Mirror: &nats.StreamSource{ + Name: "TEST-B", + External: &nats.ExternalStream{APIPrefix: "$JS.XMM.API"}, + }, + }) + require_NoError(t, err) + + // Now add in the streams that will source from one another bi-directionally. + _, err = jsA.AddStream(&nats.StreamConfig{ + Name: "SRC-A", + Subjects: []string{"A.*"}, + Sources: []*nats.StreamSource{{ + Name: "SRC-B", + FilterSubject: "B.*", + External: &nats.ExternalStream{APIPrefix: "$JS.XMM.API"}, + }}, + }) + require_NoError(t, err) + + _, err = jsB.AddStream(&nats.StreamConfig{ + Name: "SRC-B", + Subjects: []string{"B.*"}, + Sources: []*nats.StreamSource{{ + Name: "SRC-A", + FilterSubject: "A.*", + External: &nats.ExternalStream{APIPrefix: "$JS.TCM.API"}, + }}, + }) + require_NoError(t, err) + + // Now load them up with 500 messages. + initMsgs := 500 + for i := 0; i < initMsgs; i++ { + // Individual Streams + jsA.PublishAsync("foo", []byte("PAYLOAD")) + jsB.PublishAsync("bar", []byte("PAYLOAD")) + // Bi-directional Sources + jsA.PublishAsync("A.foo", []byte("PAYLOAD")) + jsB.PublishAsync("B.bar", []byte("PAYLOAD")) + } + select { + case <-jsA.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + select { + case <-jsB.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + // Utility to check the number of stream msgs. + checkStreamMsgs := func(js nats.JetStreamContext, sname string, expected int, perr error) error { + t.Helper() + if perr != nil { + return perr + } + si, err := js.StreamInfo(sname) + require_NoError(t, err) + if si.State.Msgs != uint64(expected) { + return fmt.Errorf("Expected %d msgs for %s, got state: %+v", expected, sname, si.State) + } + return nil + } + + // Wait til we see all messages. + checkFor(t, 2*time.Second, 250*time.Millisecond, func() error { + err := checkStreamMsgs(jsA, "TEST-A", initMsgs, nil) + err = checkStreamMsgs(jsB, "M-A", initMsgs, err) + err = checkStreamMsgs(jsB, "TEST-B", initMsgs, err) + err = checkStreamMsgs(jsA, "M-B", initMsgs, err) + err = checkStreamMsgs(jsA, "SRC-A", initMsgs*2, err) + err = checkStreamMsgs(jsB, "SRC-B", initMsgs*2, err) + return err + }) + + // Take down proxy. This will stop any propagation of messages between TEST and M streams. + proxy.stop() + + // Now add an additional 500 messages to originals on both sides. + for i := 0; i < initMsgs; i++ { + // Individual Streams + jsA.PublishAsync("foo", []byte("PAYLOAD")) + jsB.PublishAsync("bar", []byte("PAYLOAD")) + // Bi-directional Sources + jsA.PublishAsync("A.foo", []byte("PAYLOAD")) + jsB.PublishAsync("B.bar", []byte("PAYLOAD")) + } + select { + case <-jsA.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + select { + case <-jsB.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + cancelAndDelayConsumer := func(s *Server, stream string) { + // Now make sure internal consumer is at max backoff. + acc, err := s.lookupAccount("JS") + require_NoError(t, err) + mset, err := acc.lookupStream(stream) + require_NoError(t, err) + + // Reset sourceInfo to have lots of failures and last attempt 2 minutes ago. + // Lock should be held on parent stream. + resetSourceInfo := func(si *sourceInfo) { + si.sip = false + si.fails = 100 + si.lreq = time.Now().Add(-2 * time.Minute) + } + + // Force the consumer to be canceled and we simulate 100 failed attempts + // such that the next time we will try will be a long way out. + mset.mu.Lock() + if mset.mirror != nil { + resetSourceInfo(mset.mirror) + mset.cancelSourceInfo(mset.mirror) + mset.scheduleSetupMirrorConsumerRetry() + } else if len(mset.sources) > 0 { + for iname, si := range mset.sources { + resetSourceInfo(si) + mset.cancelSourceInfo(si) + mset.setupSourceConsumer(iname, si.sseq+1, time.Time{}) + } + } + mset.mu.Unlock() + } + + // Mirrors + cancelAndDelayConsumer(sA, "M-B") + cancelAndDelayConsumer(sB, "M-A") + // Now bi-directional sourcing + cancelAndDelayConsumer(sA, "SRC-A") + cancelAndDelayConsumer(sB, "SRC-B") + + // Now restart the network proxy. + proxy.start() + + // Make sure we are connected ok. + checkLeafNodeConnectedCount(t, sA, 1) + checkLeafNodeConnectedCount(t, sB, 1) + + // These should be good before re-sync. + require_NoError(t, checkStreamMsgs(jsA, "TEST-A", initMsgs*2, nil)) + require_NoError(t, checkStreamMsgs(jsB, "TEST-B", initMsgs*2, nil)) + + start := time.Now() + // Wait til we see all messages. + checkFor(t, 2*time.Minute, 50*time.Millisecond, func() error { + err := checkStreamMsgs(jsA, "M-B", initMsgs*2, err) + err = checkStreamMsgs(jsB, "M-A", initMsgs*2, err) + err = checkStreamMsgs(jsA, "SRC-A", initMsgs*4, err) + err = checkStreamMsgs(jsB, "SRC-B", initMsgs*4, err) + return err + }) + if elapsed := time.Since(start); elapsed > 2*time.Second { + t.Fatalf("Expected to resync all streams <2s but got %v", elapsed) + } +} diff --git a/server/leafnode.go b/server/leafnode.go index da664f7e820..f8390f9190d 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1418,7 +1418,7 @@ func (c *client) processLeafnodeInfo(info *Info) { c.setPermissions(perms) } - var resumeConnect bool + var resumeConnect, checkSyncConsumers bool // If this is a remote connection and this is the first INFO protocol, // then we need to finish the connect process by sending CONNECT, etc.. @@ -1428,6 +1428,7 @@ func (c *client) processLeafnodeInfo(info *Info) { resumeConnect = true } else if !firstINFO && didSolicit { c.leaf.remoteAccName = info.RemoteAccount + checkSyncConsumers = info.JetStream } // Check if we have the remote account information and if so make sure it's stored. @@ -1446,6 +1447,12 @@ func (c *client) processLeafnodeInfo(info *Info) { if finishConnect { s.leafNodeFinishConnectProcess(c) } + + // If we have JS enabled and so does the other side, we will + // check to see if we need to kick any internal source or mirror consumers. + if checkSyncConsumers { + s.checkInternalSyncConsumers(c.acc, info.Domain) + } } func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompression string, co *CompressionOpts) (bool, error) { @@ -1954,11 +1961,12 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro // If we received pub deny permissions from the other end, merge with existing ones. c.mergeDenyPermissions(pub, proto.DenyPub) + acc := c.acc c.mu.Unlock() // Register the cluster, even if empty, as long as we are acting as a hub. if !proto.Hub { - c.acc.registerLeafNodeCluster(proto.Cluster) + acc.registerLeafNodeCluster(proto.Cluster) } // Add in the leafnode here since we passed through auth at this point. @@ -1973,12 +1981,58 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro s.initLeafNodeSmapAndSendSubs(c) // Announce the account connect event for a leaf node. - // This will no-op as needed. + // This will be a no-op as needed. s.sendLeafNodeConnect(c.acc) + // If we have JS enabled and so does the other side, we will + // check to see if we need to kick any internal source or mirror consumers. + if proto.JetStream { + s.checkInternalSyncConsumers(acc, proto.Domain) + } return nil } +// checkInternalSyncConsumers +func (s *Server) checkInternalSyncConsumers(acc *Account, remoteDomain string) { + // Grab our js + js := s.getJetStream() + + // Only applicable if we have JS and the leafnode has JS as well. + // We check for remote JS outside. + if !js.isEnabled() || acc == nil { + return + } + + // We will check all streams in our local account. They must be a leader and + // be sourcing or mirroring. We will check the external config on the stream itself + // if this is cross domain, or if the remote domain is empty, meaning we might be + // extedning the system across this leafnode connection and hence we would be extending + // our own domain. + jsa := js.lookupAccount(acc) + if jsa == nil { + return + } + var streams []*stream + jsa.mu.RLock() + for _, mset := range jsa.streams { + mset.cfgMu.RLock() + // We need to have a mirror or source defined. + // We do not want to force another lock here to look for leader status, + // so collect and after we release jsa will make sure. + if mset.cfg.Mirror != nil || len(mset.cfg.Sources) > 0 { + streams = append(streams, mset) + } + mset.cfgMu.RUnlock() + } + jsa.mu.RUnlock() + + // Now loop through all candidates and check if we are the leader and have NOT + // created the sync up consumer. + for _, mset := range streams { + mset.retryDisconnectedSyncConsumers(remoteDomain) + } +} + // Returns the remote cluster name. This is set only once so does not require a lock. func (c *client) remoteCluster() string { if c.leaf == nil { diff --git a/server/stream.go b/server/stream.go index e077bdfb5f7..4f95d3463ed 100644 --- a/server/stream.go +++ b/server/stream.go @@ -257,12 +257,20 @@ type StreamSource struct { iname string // For indexing when stream names are the same for multiple sources. } -// ExternalStream allows you to qualify access to a stream source in another account. +// ExternalStream allows you to qualify access to a stream source in another account or domain. type ExternalStream struct { ApiPrefix string `json:"api"` DeliverPrefix string `json:"deliver"` } +// Will return the domain for this external stream. +func (ext *ExternalStream) Domain() string { + if ext == nil || ext.ApiPrefix == _EMPTY_ { + return _EMPTY_ + } + return tokenAt(ext.ApiPrefix, 2) +} + // For managing stream ingest. const ( streamDefaultMaxQueueMsgs = 10_000 @@ -2337,14 +2345,15 @@ func (mset *stream) eraseMsg(seq uint64) (bool, error) { // Are we a mirror? func (mset *stream) isMirror() bool { - mset.mu.RLock() - defer mset.mu.RUnlock() + mset.cfgMu.RLock() + defer mset.cfgMu.RUnlock() return mset.cfg.Mirror != nil } func (mset *stream) sourcesInfo() (sis []*StreamSourceInfo) { mset.mu.RLock() defer mset.mu.RUnlock() + sis = make([]*StreamSourceInfo, 0, len(mset.sources)) for _, si := range mset.sources { sis = append(sis, mset.sourceInfo(si)) } @@ -2361,7 +2370,7 @@ func (mset *stream) sourceInfo(si *sourceInfo) *StreamSourceInfo { trConfigs := make([]SubjectTransformConfig, len(si.sfs)) for i := range si.sfs { - destination := _EMPTY_ + var destination string if si.trs[i] != nil { destination = si.trs[i].dest } @@ -2399,6 +2408,40 @@ func (mset *stream) mirrorInfo() *StreamSourceInfo { return mset.sourceInfo(mset.mirror) } +// retryDisconnectedSyncConsumers() will check if we have any disconnected +// sync consumers for either mirror or a source and will reset and retry to connect. +func (mset *stream) retryDisconnectedSyncConsumers(remoteDomain string) { + mset.mu.Lock() + defer mset.mu.Unlock() + + // Only applicable if we are the stream leader. + if !mset.isLeader() { + return + } + + // Check mirrors first. + if si := mset.mirror; si != nil { + if si.sub == nil && !si.sip { + if remoteDomain == _EMPTY_ || (mset.cfg.Mirror != nil && mset.cfg.Mirror.External.Domain() == remoteDomain) { + // Need to reset + si.fails = 0 + mset.cancelSourceInfo(si) + mset.scheduleSetupMirrorConsumerRetry() + } + } + } else { + for _, si := range mset.sources { + ss := mset.streamSource(si.iname) + if remoteDomain == _EMPTY_ || (ss != nil && ss.External.Domain() == remoteDomain) { + // Need to reset + si.fails = 0 + mset.cancelSourceInfo(si) + mset.setupSourceConsumer(si.iname, si.sseq+1, time.Time{}) + } + } + } +} + const ( // Our consumer HB interval. sourceHealthHB = 1 * time.Second