Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,14 @@ func (c *client) initClient() {
opts := s.getOpts()
// Snapshots to avoid mutex access in fast paths.
c.out.wdl = opts.WriteDeadline
switch {
case c.kind == ROUTER && opts.Cluster.WriteDeadline > 0:
c.out.wdl = opts.Cluster.WriteDeadline
Comment thread
neilalexander marked this conversation as resolved.
case c.kind == GATEWAY && opts.Gateway.WriteDeadline > 0:
c.out.wdl = opts.Gateway.WriteDeadline
case c.kind == LEAF && opts.LeafNode.WriteDeadline > 0:
c.out.wdl = opts.LeafNode.WriteDeadline
}
c.out.mp = opts.MaxPending
// Snapshot max control line since currently can not be changed on reload and we
// were checking it on each call to parse. If this changes and we allow MaxControlLine
Expand Down
29 changes: 29 additions & 0 deletions server/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7528,3 +7528,32 @@ func TestGatewayOutboundDetectsStaleConnectionIfNoInfo(t *testing.T) {
wg.Wait()
s.WaitForShutdown()
}

func TestGatewayConfigureWriteDeadline(t *testing.T) {
o1 := testDefaultOptionsForGateway("B")
o1.Gateway.WriteDeadline = 5 * time.Second
s1 := runGatewayServer(o1)
defer s1.Shutdown()

o2 := testGatewayOptionsFromToWithServers(t, "A", "B", s1)
o2.Gateway.WriteDeadline = 6 * time.Second
s2 := runGatewayServer(o2)
defer s2.Shutdown()

waitForOutboundGateways(t, s2, 1, time.Second)
waitForInboundGateways(t, s1, 1, time.Second)
waitForOutboundGateways(t, s1, 1, time.Second)

s1.mu.RLock()
s2.mu.RLock()
defer s1.mu.RUnlock()
defer s2.mu.RUnlock()

s1.forEachRemote(func(r *client) {
require_Equal(t, r.out.wdl, 6*time.Second)
})

s2.forEachRemote(func(r *client) {
require_Equal(t, r.out.wdl, 5*time.Second)
})
}
32 changes: 32 additions & 0 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10798,3 +10798,35 @@ func TestLeafNodeDaisyChainWithAccountImportExport(t *testing.T) {
acc.mu.RUnlock()
require_Len(t, len(sr.psubs), 0)
}

func TestLeafNodeConfigureWriteDeadline(t *testing.T) {
o1, o2 := DefaultOptions(), DefaultOptions()

o1.LeafNode.WriteDeadline = 5 * time.Second
o1.LeafNode.Host = "127.0.0.1"
o1.LeafNode.Port = -1
s1 := RunServer(o1)
defer s1.Shutdown()

s1URL, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", o1.LeafNode.Port))
o2.Cluster.Name = "somethingelse"
o2.LeafNode.WriteDeadline = 6 * time.Second
o2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{s1URL}}}
s2 := RunServer(o2)
defer s2.Shutdown()

checkLeafNodeConnected(t, s2)

s1.mu.RLock()
s2.mu.RLock()
defer s1.mu.RUnlock()
defer s2.mu.RUnlock()

s1.forEachRemote(func(r *client) {
require_Equal(t, r.out.wdl, 5*time.Second)
})

s2.forEachRemote(func(r *client) {
require_Equal(t, r.out.wdl, 6*time.Second)
})
}
9 changes: 9 additions & 0 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type ClusterOpts struct {
Compression CompressionOpts `json:"-"`
PingInterval time.Duration `json:"-"`
MaxPingsOut int `json:"-"`
WriteDeadline time.Duration `json:"-"`

// Not exported (used in tests)
resolver netResolver
Expand Down Expand Up @@ -125,6 +126,7 @@ type GatewayOpts struct {
ConnectBackoff bool `json:"connect_backoff,omitempty"`
Gateways []*RemoteGatewayOpts `json:"gateways,omitempty"`
RejectUnknown bool `json:"reject_unknown,omitempty"` // config got renamed to reject_unknown_cluster
WriteDeadline time.Duration `json:"-"`

// Not exported, for tests.
resolver netResolver
Expand Down Expand Up @@ -175,6 +177,7 @@ type LeafNodeOpts struct {
Advertise string `json:"-"`
NoAdvertise bool `json:"-"`
ReconnectInterval time.Duration `json:"-"`
WriteDeadline time.Duration `json:"-"`

// Compression options
Compression CompressionOpts `json:"-"`
Expand Down Expand Up @@ -1998,6 +2001,8 @@ func parseCluster(v any, opts *Options, errors *[]error, warnings *[]error) erro
}
case "ping_max":
opts.Cluster.MaxPingsOut = int(mv.(int64))
case "write_deadline":
opts.Cluster.WriteDeadline = parseDuration("write_deadline", tk, mv, errors, warnings)
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
Expand Down Expand Up @@ -2186,6 +2191,8 @@ func parseGateway(v any, o *Options, errors *[]error, warnings *[]error) error {
o.Gateway.Gateways = gateways
case "reject_unknown", "reject_unknown_cluster":
o.Gateway.RejectUnknown = mv.(bool)
case "write_deadline":
o.Gateway.WriteDeadline = parseDuration("write_deadline", tk, mv, errors, warnings)
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
Expand Down Expand Up @@ -2706,6 +2713,8 @@ func parseLeafNodes(v any, opts *Options, errors *[]error, warnings *[]error) er
}
case "isolate_leafnode_interest", "isolate":
opts.LeafNode.IsolateLeafnodeInterest = mv.(bool)
case "write_deadline":
opts.LeafNode.WriteDeadline = parseDuration("write_deadline", tk, mv, errors, warnings)
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
Expand Down
60 changes: 60 additions & 0 deletions server/opts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,7 @@ func TestOptionsClone(t *testing.T) {
Cluster: ClusterOpts{
NoAdvertise: true,
ConnectRetries: 2,
WriteDeadline: 3 * time.Second,
},
Gateway: GatewayOpts{
Name: "A",
Expand Down Expand Up @@ -3994,3 +3995,62 @@ func TestNewServerFromConfigVsLoadConfig(t *testing.T) {

checkOptionsEqual(t, opts1, opts2)
}

func TestWriteDeadlineConfigParsing(t *testing.T) {
type testCase struct {
name string
config string
expect func(t *testing.T, opts *Options)
}

for _, tc := range []testCase{
{
name: "LeafNode",
config: `
leafnodes {
write_deadline: 5s
}
`,
expect: func(t *testing.T, opts *Options) {
require_Equal(t, opts.LeafNode.WriteDeadline, 5*time.Second)
},
},
{
name: "Gateway",
config: `
gateway {
write_deadline: 6s
}
`,
expect: func(t *testing.T, opts *Options) {
require_Equal(t, opts.Gateway.WriteDeadline, 6*time.Second)
},
},
{
name: "Cluster",
config: `
cluster {
write_deadline: 7s
}
`,
expect: func(t *testing.T, opts *Options) {
require_Equal(t, opts.Cluster.WriteDeadline, 7*time.Second)
},
},
{
name: "Global",
config: `
write_deadline: 8s
`,
expect: func(t *testing.T, opts *Options) {
require_Equal(t, opts.WriteDeadline, 8*time.Second)
},
},
} {
t.Run(tc.name, func(t *testing.T) {
opts, err := parseConfigTolerantly(t, tc.config)
require_NoError(t, err)
tc.expect(t, opts)
})
}
}
28 changes: 28 additions & 0 deletions server/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4977,3 +4977,31 @@ func TestRouteImplicitJoinsSeparateGroups(t *testing.T) {
})
}
}

func TestRouteConfigureWriteDeadline(t *testing.T) {
o1, o2 := DefaultOptions(), DefaultOptions()

o1.Cluster.WriteDeadline = 5 * time.Second
s1 := RunServer(o1)
defer s1.Shutdown()

o2.Cluster.WriteDeadline = 6 * time.Second
o2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port))
s2 := RunServer(o2)
defer s2.Shutdown()

checkClusterFormed(t, s1, s2)

s1.mu.RLock()
s2.mu.RLock()
defer s1.mu.RUnlock()
defer s2.mu.RUnlock()

s1.forEachRoute(func(r *client) {
require_Equal(t, r.out.wdl, 5*time.Second)
})

s2.forEachRoute(func(r *client) {
require_Equal(t, r.out.wdl, 6*time.Second)
})
}
Loading