Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions server/config_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2008,6 +2008,23 @@ func TestConfigCheck(t *testing.T) {
errorLine: 9,
errorPos: 9,
},
{
name: "invalid duration for remote leafnode first info timeout",
config: `
leafnodes {
port: -1
remotes [
{
url: "nats://127.0.0.1:123"
first_info_timeout: abc
}
]
}
`,
err: fmt.Errorf("error parsing first_info_timeout: time: invalid duration %q", "abc"),
errorLine: 7,
errorPos: 8,
},
{
name: "show warnings on empty configs without values",
config: ``,
Expand Down
7 changes: 5 additions & 2 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name)

var tlsFirst bool
var infoTimeout time.Duration
if remote != nil {
solicited = true
remote.Lock()
Expand All @@ -991,6 +992,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
c.leaf.isSpoke = true
}
tlsFirst = remote.TLSHandshakeFirst
infoTimeout = remote.FirstInfoTimeout
remote.Unlock()
c.acc = acc
} else {
Expand Down Expand Up @@ -1048,7 +1050,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
}
}
// We need to wait for the info, but not for too long.
c.nc.SetReadDeadline(time.Now().Add(DEFAULT_LEAFNODE_INFO_WAIT))
c.nc.SetReadDeadline(time.Now().Add(infoTimeout))
}

// We will process the INFO from the readloop and finish by
Expand Down Expand Up @@ -2831,6 +2833,7 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot
compress := remote.Websocket.Compression
// By default the server will mask outbound frames, but it can be disabled with this option.
noMasking := remote.Websocket.NoMasking
infoTimeout := remote.FirstInfoTimeout
remote.RUnlock()
// Will do the client-side TLS handshake if needed.
tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
Expand Down Expand Up @@ -2883,14 +2886,14 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot
if noMasking {
req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue)
}
c.nc.SetDeadline(time.Now().Add(infoTimeout))
if err := req.Write(c.nc); err != nil {
return nil, WriteError, err
}

var resp *http.Response

br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE)
c.nc.SetReadDeadline(time.Now().Add(DEFAULT_LEAFNODE_INFO_WAIT))
resp, err = http.ReadResponse(br, req)
if err == nil &&
(resp.StatusCode != 101 ||
Expand Down
144 changes: 144 additions & 0 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7607,3 +7607,147 @@ func TestLeafNodeLoopDetectionOnActualLoop(t *testing.T) {
t.Fatalf("Did not get any error regarding loop")
}
}

func TestLeafNodeConnectionSucceedsEvenWithDelayedFirstINFO(t *testing.T) {
for _, test := range []struct {
name string
websocket bool
}{
{"regular", false},
{"websocket", true},
} {
t.Run(test.name, func(t *testing.T) {
ob := DefaultOptions()
ob.ServerName = "HUB"
ob.LeafNode.Host = "127.0.0.1"
ob.LeafNode.Port = -1
ob.LeafNode.AuthTimeout = 10
if test.websocket {
ob.Websocket.Host = "127.0.0.1"
ob.Websocket.Port = -1
ob.Websocket.HandshakeTimeout = 10 * time.Second
ob.Websocket.AuthTimeout = 10
ob.Websocket.NoTLS = true
}
sb := RunServer(ob)
defer sb.Shutdown()

var port int
var scheme string
if test.websocket {
port = ob.Websocket.Port
scheme = wsSchemePrefix
} else {
port = ob.LeafNode.Port
scheme = "nats"
}

urlStr := fmt.Sprintf("%s://127.0.0.1:%d", scheme, port)
proxy := createNetProxy(1100*time.Millisecond, 1024*1024*1024, 1024*1024*1024, urlStr, true)
defer proxy.stop()
proxyURL := proxy.clientURL()
_, proxyPort, err := net.SplitHostPort(proxyURL[len(scheme)+3:])
require_NoError(t, err)

lnBURL, err := url.Parse(fmt.Sprintf("%s://127.0.0.1:%s", scheme, proxyPort))
require_NoError(t, err)

oa := DefaultOptions()
oa.ServerName = "SPOKE"
oa.Cluster.Name = "xyz"
remote := &RemoteLeafOpts{
URLs: []*url.URL{lnBURL},
FirstInfoTimeout: 3 * time.Second,
}
oa.LeafNode.Remotes = []*RemoteLeafOpts{remote}
sa := RunServer(oa)
defer sa.Shutdown()

checkLeafNodeConnected(t, sa)
})
}
}

type captureLeafConnClosed struct {
DummyLogger
ch chan struct{}
}

func (l *captureLeafConnClosed) Noticef(format string, v ...any) {
msg := fmt.Sprintf(format, v...)
if strings.Contains(msg, "Leafnode connection closed: Read Error") {
select {
case l.ch <- struct{}{}:
default:
}
}
}

func TestLeafNodeDetectsStaleConnectionIfNoInfo(t *testing.T) {
for _, test := range []struct {
name string
websocket bool
}{
{"regular", false},
{"websocket", true},
} {
t.Run(test.name, func(t *testing.T) {
l, err := net.Listen("tcp", "127.0.0.1:0")
require_NoError(t, err)
defer l.Close()

ch := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
c, err := l.Accept()
if err != nil {
return
}
defer c.Close()
<-ch
}()

var scheme string
if test.websocket {
scheme = wsSchemePrefix
} else {
scheme = "nats"
}
urlStr := fmt.Sprintf("%s://%s", scheme, l.Addr())
lnBURL, err := url.Parse(urlStr)
require_NoError(t, err)

oa := DefaultOptions()
oa.ServerName = "SPOKE"
oa.Cluster.Name = "xyz"
remote := &RemoteLeafOpts{
URLs: []*url.URL{lnBURL},
FirstInfoTimeout: 250 * time.Millisecond,
}
oa.LeafNode.Remotes = []*RemoteLeafOpts{remote}
oa.DisableShortFirstPing = false
oa.NoLog = false
sa, err := NewServer(oa)
require_NoError(t, err)
defer sa.Shutdown()

log := &captureLeafConnClosed{ch: make(chan struct{}, 1)}
sa.SetLogger(log, false, false)
sa.Start()

select {
case <-log.ch:
// OK
case <-time.After(750 * time.Millisecond):
t.Fatalf("Connection was not closed")
}

sa.Shutdown()
close(ch)
wg.Wait()
sa.WaitForShutdown()
})
}
}
11 changes: 11 additions & 0 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ type RemoteLeafOpts struct {
DenyImports []string `json:"-"`
DenyExports []string `json:"-"`

// FirstInfoTimeout is the amount of time the server will wait for the
// initial INFO protocol from the remote server before closing the
// connection.
FirstInfoTimeout time.Duration `json:"-"`

// Compression options for this remote. Each remote could have a different
// setting and also be different from the LeafNode options.
Compression CompressionOpts `json:"-"`
Expand Down Expand Up @@ -2668,6 +2673,8 @@ func parseRemoteLeafNodes(v any, errors *[]error, warnings *[]error) ([]*RemoteL
*errors = append(*errors, err)
continue
}
case "first_info_timeout":
remote.FirstInfoTimeout = parseDuration(k, tk, v, errors, warnings)
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
Expand Down Expand Up @@ -5376,6 +5383,10 @@ func setBaselineOptions(opts *Options) {
c.Mode = CompressionS2Auto
}
}
// Set default first info timeout value if not set.
if r.FirstInfoTimeout <= 0 {
r.FirstInfoTimeout = DEFAULT_LEAFNODE_INFO_WAIT
}
}
}

Expand Down