From 0963486b8be206ad440160a9fdf82f2e3864d66d Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 8 Oct 2025 16:00:31 +0100 Subject: [PATCH] Add `write_deadline` to cluster, leafnode and gateway config Signed-off-by: Neil Twigg --- server/client.go | 8 ++++++ server/gateway_test.go | 29 ++++++++++++++++++++ server/leafnode_test.go | 32 ++++++++++++++++++++++ server/opts.go | 9 +++++++ server/opts_test.go | 60 +++++++++++++++++++++++++++++++++++++++++ server/routes_test.go | 28 +++++++++++++++++++ 6 files changed, 166 insertions(+) diff --git a/server/client.go b/server/client.go index cd1ba3e839c..b7ef2ba60b5 100644 --- a/server/client.go +++ b/server/client.go @@ -690,6 +690,14 @@ func (c *client) initClient() { opts := s.getOpts() // Snapshots to avoid mutex access in fast paths. c.out.wdl = opts.WriteDeadline + switch { + case c.kind == ROUTER && opts.Cluster.WriteDeadline > 0: + c.out.wdl = opts.Cluster.WriteDeadline + case c.kind == GATEWAY && opts.Gateway.WriteDeadline > 0: + c.out.wdl = opts.Gateway.WriteDeadline + case c.kind == LEAF && opts.LeafNode.WriteDeadline > 0: + c.out.wdl = opts.LeafNode.WriteDeadline + } c.out.mp = opts.MaxPending // Snapshot max control line since currently can not be changed on reload and we // were checking it on each call to parse. If this changes and we allow MaxControlLine diff --git a/server/gateway_test.go b/server/gateway_test.go index f4241ea5fb6..c457d115026 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -7528,3 +7528,32 @@ func TestGatewayOutboundDetectsStaleConnectionIfNoInfo(t *testing.T) { wg.Wait() s.WaitForShutdown() } + +func TestGatewayConfigureWriteDeadline(t *testing.T) { + o1 := testDefaultOptionsForGateway("B") + o1.Gateway.WriteDeadline = 5 * time.Second + s1 := runGatewayServer(o1) + defer s1.Shutdown() + + o2 := testGatewayOptionsFromToWithServers(t, "A", "B", s1) + o2.Gateway.WriteDeadline = 6 * time.Second + s2 := runGatewayServer(o2) + defer s2.Shutdown() + + waitForOutboundGateways(t, s2, 1, time.Second) + waitForInboundGateways(t, s1, 1, time.Second) + waitForOutboundGateways(t, s1, 1, time.Second) + + s1.mu.RLock() + s2.mu.RLock() + defer s1.mu.RUnlock() + defer s2.mu.RUnlock() + + s1.forEachRemote(func(r *client) { + require_Equal(t, r.out.wdl, 6*time.Second) + }) + + s2.forEachRemote(func(r *client) { + require_Equal(t, r.out.wdl, 5*time.Second) + }) +} diff --git a/server/leafnode_test.go b/server/leafnode_test.go index def398f646c..d0ea7305dd6 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -10798,3 +10798,35 @@ func TestLeafNodeDaisyChainWithAccountImportExport(t *testing.T) { acc.mu.RUnlock() require_Len(t, len(sr.psubs), 0) } + +func TestLeafNodeConfigureWriteDeadline(t *testing.T) { + o1, o2 := DefaultOptions(), DefaultOptions() + + o1.LeafNode.WriteDeadline = 5 * time.Second + o1.LeafNode.Host = "127.0.0.1" + o1.LeafNode.Port = -1 + s1 := RunServer(o1) + defer s1.Shutdown() + + s1URL, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", o1.LeafNode.Port)) + o2.Cluster.Name = "somethingelse" + o2.LeafNode.WriteDeadline = 6 * time.Second + o2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{s1URL}}} + s2 := RunServer(o2) + defer s2.Shutdown() + + checkLeafNodeConnected(t, s2) + + s1.mu.RLock() + s2.mu.RLock() + defer s1.mu.RUnlock() + defer s2.mu.RUnlock() + + s1.forEachRemote(func(r *client) { + require_Equal(t, r.out.wdl, 5*time.Second) + }) + + s2.forEachRemote(func(r *client) { + require_Equal(t, r.out.wdl, 6*time.Second) + }) +} diff --git a/server/opts.go b/server/opts.go index 15307bbf7f7..fcdfcf966c8 100644 --- a/server/opts.go +++ b/server/opts.go @@ -83,6 +83,7 @@ type ClusterOpts struct { Compression CompressionOpts `json:"-"` PingInterval time.Duration `json:"-"` MaxPingsOut int `json:"-"` + WriteDeadline time.Duration `json:"-"` // Not exported (used in tests) resolver netResolver @@ -125,6 +126,7 @@ type GatewayOpts struct { ConnectBackoff bool `json:"connect_backoff,omitempty"` Gateways []*RemoteGatewayOpts `json:"gateways,omitempty"` RejectUnknown bool `json:"reject_unknown,omitempty"` // config got renamed to reject_unknown_cluster + WriteDeadline time.Duration `json:"-"` // Not exported, for tests. resolver netResolver @@ -175,6 +177,7 @@ type LeafNodeOpts struct { Advertise string `json:"-"` NoAdvertise bool `json:"-"` ReconnectInterval time.Duration `json:"-"` + WriteDeadline time.Duration `json:"-"` // Compression options Compression CompressionOpts `json:"-"` @@ -1998,6 +2001,8 @@ func parseCluster(v any, opts *Options, errors *[]error, warnings *[]error) erro } case "ping_max": opts.Cluster.MaxPingsOut = int(mv.(int64)) + case "write_deadline": + opts.Cluster.WriteDeadline = parseDuration("write_deadline", tk, mv, errors, warnings) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -2186,6 +2191,8 @@ func parseGateway(v any, o *Options, errors *[]error, warnings *[]error) error { o.Gateway.Gateways = gateways case "reject_unknown", "reject_unknown_cluster": o.Gateway.RejectUnknown = mv.(bool) + case "write_deadline": + o.Gateway.WriteDeadline = parseDuration("write_deadline", tk, mv, errors, warnings) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -2706,6 +2713,8 @@ func parseLeafNodes(v any, opts *Options, errors *[]error, warnings *[]error) er } case "isolate_leafnode_interest", "isolate": opts.LeafNode.IsolateLeafnodeInterest = mv.(bool) + case "write_deadline": + opts.LeafNode.WriteDeadline = parseDuration("write_deadline", tk, mv, errors, warnings) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/opts_test.go b/server/opts_test.go index 2f3077fb826..e885791c8d2 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -1242,6 +1242,7 @@ func TestOptionsClone(t *testing.T) { Cluster: ClusterOpts{ NoAdvertise: true, ConnectRetries: 2, + WriteDeadline: 3 * time.Second, }, Gateway: GatewayOpts{ Name: "A", @@ -3994,3 +3995,62 @@ func TestNewServerFromConfigVsLoadConfig(t *testing.T) { checkOptionsEqual(t, opts1, opts2) } + +func TestWriteDeadlineConfigParsing(t *testing.T) { + type testCase struct { + name string + config string + expect func(t *testing.T, opts *Options) + } + + for _, tc := range []testCase{ + { + name: "LeafNode", + config: ` + leafnodes { + write_deadline: 5s + } + `, + expect: func(t *testing.T, opts *Options) { + require_Equal(t, opts.LeafNode.WriteDeadline, 5*time.Second) + }, + }, + { + name: "Gateway", + config: ` + gateway { + write_deadline: 6s + } + `, + expect: func(t *testing.T, opts *Options) { + require_Equal(t, opts.Gateway.WriteDeadline, 6*time.Second) + }, + }, + { + name: "Cluster", + config: ` + cluster { + write_deadline: 7s + } + `, + expect: func(t *testing.T, opts *Options) { + require_Equal(t, opts.Cluster.WriteDeadline, 7*time.Second) + }, + }, + { + name: "Global", + config: ` + write_deadline: 8s + `, + expect: func(t *testing.T, opts *Options) { + require_Equal(t, opts.WriteDeadline, 8*time.Second) + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + opts, err := parseConfigTolerantly(t, tc.config) + require_NoError(t, err) + tc.expect(t, opts) + }) + } +} diff --git a/server/routes_test.go b/server/routes_test.go index add45703d86..2a394b24f1d 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -4977,3 +4977,31 @@ func TestRouteImplicitJoinsSeparateGroups(t *testing.T) { }) } } + +func TestRouteConfigureWriteDeadline(t *testing.T) { + o1, o2 := DefaultOptions(), DefaultOptions() + + o1.Cluster.WriteDeadline = 5 * time.Second + s1 := RunServer(o1) + defer s1.Shutdown() + + o2.Cluster.WriteDeadline = 6 * time.Second + o2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port)) + s2 := RunServer(o2) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + s1.mu.RLock() + s2.mu.RLock() + defer s1.mu.RUnlock() + defer s2.mu.RUnlock() + + s1.forEachRoute(func(r *client) { + require_Equal(t, r.out.wdl, 5*time.Second) + }) + + s2.forEachRoute(func(r *client) { + require_Equal(t, r.out.wdl, 6*time.Second) + }) +}