diff --git a/server/config_check_test.go b/server/config_check_test.go index aefebc74a79..094fc7f7550 100644 --- a/server/config_check_test.go +++ b/server/config_check_test.go @@ -2008,6 +2008,23 @@ func TestConfigCheck(t *testing.T) { errorLine: 9, errorPos: 9, }, + { + name: "invalid duration for remote leafnode first info timeout", + config: ` + leafnodes { + port: -1 + remotes [ + { + url: "nats://127.0.0.1:123" + first_info_timeout: abc + } + ] + } + `, + err: fmt.Errorf("error parsing first_info_timeout: time: invalid duration %q", "abc"), + errorLine: 7, + errorPos: 8, + }, { name: "show warnings on empty configs without values", config: ``, diff --git a/server/leafnode.go b/server/leafnode.go index 72bd9f4cbd2..23503c2bea8 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -982,6 +982,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name) var tlsFirst bool + var infoTimeout time.Duration if remote != nil { solicited = true remote.Lock() @@ -991,6 +992,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf c.leaf.isSpoke = true } tlsFirst = remote.TLSHandshakeFirst + infoTimeout = remote.FirstInfoTimeout remote.Unlock() c.acc = acc } else { @@ -1048,7 +1050,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf } } // We need to wait for the info, but not for too long. - c.nc.SetReadDeadline(time.Now().Add(DEFAULT_LEAFNODE_INFO_WAIT)) + c.nc.SetReadDeadline(time.Now().Add(infoTimeout)) } // We will process the INFO from the readloop and finish by @@ -2831,6 +2833,7 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot compress := remote.Websocket.Compression // By default the server will mask outbound frames, but it can be disabled with this option. noMasking := remote.Websocket.NoMasking + infoTimeout := remote.FirstInfoTimeout remote.RUnlock() // Will do the client-side TLS handshake if needed. tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts) @@ -2883,6 +2886,7 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot if noMasking { req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue) } + c.nc.SetDeadline(time.Now().Add(infoTimeout)) if err := req.Write(c.nc); err != nil { return nil, WriteError, err } @@ -2890,7 +2894,6 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot var resp *http.Response br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE) - c.nc.SetReadDeadline(time.Now().Add(DEFAULT_LEAFNODE_INFO_WAIT)) resp, err = http.ReadResponse(br, req) if err == nil && (resp.StatusCode != 101 || diff --git a/server/leafnode_test.go b/server/leafnode_test.go index aaabb988ef4..c46067ed129 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -7607,3 +7607,147 @@ func TestLeafNodeLoopDetectionOnActualLoop(t *testing.T) { t.Fatalf("Did not get any error regarding loop") } } + +func TestLeafNodeConnectionSucceedsEvenWithDelayedFirstINFO(t *testing.T) { + for _, test := range []struct { + name string + websocket bool + }{ + {"regular", false}, + {"websocket", true}, + } { + t.Run(test.name, func(t *testing.T) { + ob := DefaultOptions() + ob.ServerName = "HUB" + ob.LeafNode.Host = "127.0.0.1" + ob.LeafNode.Port = -1 + ob.LeafNode.AuthTimeout = 10 + if test.websocket { + ob.Websocket.Host = "127.0.0.1" + ob.Websocket.Port = -1 + ob.Websocket.HandshakeTimeout = 10 * time.Second + ob.Websocket.AuthTimeout = 10 + ob.Websocket.NoTLS = true + } + sb := RunServer(ob) + defer sb.Shutdown() + + var port int + var scheme string + if test.websocket { + port = ob.Websocket.Port + scheme = wsSchemePrefix + } else { + port = ob.LeafNode.Port + scheme = "nats" + } + + urlStr := fmt.Sprintf("%s://127.0.0.1:%d", scheme, port) + proxy := createNetProxy(1100*time.Millisecond, 1024*1024*1024, 1024*1024*1024, urlStr, true) + defer proxy.stop() + proxyURL := proxy.clientURL() + _, proxyPort, err := net.SplitHostPort(proxyURL[len(scheme)+3:]) + require_NoError(t, err) + + lnBURL, err := url.Parse(fmt.Sprintf("%s://127.0.0.1:%s", scheme, proxyPort)) + require_NoError(t, err) + + oa := DefaultOptions() + oa.ServerName = "SPOKE" + oa.Cluster.Name = "xyz" + remote := &RemoteLeafOpts{ + URLs: []*url.URL{lnBURL}, + FirstInfoTimeout: 3 * time.Second, + } + oa.LeafNode.Remotes = []*RemoteLeafOpts{remote} + sa := RunServer(oa) + defer sa.Shutdown() + + checkLeafNodeConnected(t, sa) + }) + } +} + +type captureLeafConnClosed struct { + DummyLogger + ch chan struct{} +} + +func (l *captureLeafConnClosed) Noticef(format string, v ...any) { + msg := fmt.Sprintf(format, v...) + if strings.Contains(msg, "Leafnode connection closed: Read Error") { + select { + case l.ch <- struct{}{}: + default: + } + } +} + +func TestLeafNodeDetectsStaleConnectionIfNoInfo(t *testing.T) { + for _, test := range []struct { + name string + websocket bool + }{ + {"regular", false}, + {"websocket", true}, + } { + t.Run(test.name, func(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:0") + require_NoError(t, err) + defer l.Close() + + ch := make(chan struct{}) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + c, err := l.Accept() + if err != nil { + return + } + defer c.Close() + <-ch + }() + + var scheme string + if test.websocket { + scheme = wsSchemePrefix + } else { + scheme = "nats" + } + urlStr := fmt.Sprintf("%s://%s", scheme, l.Addr()) + lnBURL, err := url.Parse(urlStr) + require_NoError(t, err) + + oa := DefaultOptions() + oa.ServerName = "SPOKE" + oa.Cluster.Name = "xyz" + remote := &RemoteLeafOpts{ + URLs: []*url.URL{lnBURL}, + FirstInfoTimeout: 250 * time.Millisecond, + } + oa.LeafNode.Remotes = []*RemoteLeafOpts{remote} + oa.DisableShortFirstPing = false + oa.NoLog = false + sa, err := NewServer(oa) + require_NoError(t, err) + defer sa.Shutdown() + + log := &captureLeafConnClosed{ch: make(chan struct{}, 1)} + sa.SetLogger(log, false, false) + sa.Start() + + select { + case <-log.ch: + // OK + case <-time.After(750 * time.Millisecond): + t.Fatalf("Connection was not closed") + } + + sa.Shutdown() + close(ch) + wg.Wait() + sa.WaitForShutdown() + }) + } +} diff --git a/server/opts.go b/server/opts.go index 73564fe3eee..e717f26ea43 100644 --- a/server/opts.go +++ b/server/opts.go @@ -205,6 +205,11 @@ type RemoteLeafOpts struct { DenyImports []string `json:"-"` DenyExports []string `json:"-"` + // FirstInfoTimeout is the amount of time the server will wait for the + // initial INFO protocol from the remote server before closing the + // connection. + FirstInfoTimeout time.Duration `json:"-"` + // Compression options for this remote. Each remote could have a different // setting and also be different from the LeafNode options. Compression CompressionOpts `json:"-"` @@ -2668,6 +2673,8 @@ func parseRemoteLeafNodes(v any, errors *[]error, warnings *[]error) ([]*RemoteL *errors = append(*errors, err) continue } + case "first_info_timeout": + remote.FirstInfoTimeout = parseDuration(k, tk, v, errors, warnings) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -5376,6 +5383,10 @@ func setBaselineOptions(opts *Options) { c.Mode = CompressionS2Auto } } + // Set default first info timeout value if not set. + if r.FirstInfoTimeout <= 0 { + r.FirstInfoTimeout = DEFAULT_LEAFNODE_INFO_WAIT + } } }