diff --git a/server/client.go b/server/client.go index af670bd33b1..70a6ec09bff 100644 --- a/server/client.go +++ b/server/client.go @@ -237,6 +237,26 @@ const ( pmrMsgImportedFromService ) +type WriteTimeoutPolicy uint8 + +const ( + WriteTimeoutPolicyDefault = iota + WriteTimeoutPolicyClose + WriteTimeoutPolicyRetry +) + +// String returns a human-friendly value. Only used in varz. +func (p WriteTimeoutPolicy) String() string { + switch p { + case WriteTimeoutPolicyClose: + return "close" + case WriteTimeoutPolicyRetry: + return "retry" + default: + return _EMPTY_ + } +} + type client struct { // Here first because of use of atomics, and memory alignment. stats @@ -328,15 +348,16 @@ type pinfo struct { // outbound holds pending data for a socket. type outbound struct { - nb net.Buffers // Pending buffers for send, each has fixed capacity as per nbPool below. - wnb net.Buffers // Working copy of "nb", reused on each flushOutbound call, partial writes may leave entries here for next iteration. - pb int64 // Total pending/queued bytes. - fsp int32 // Flush signals that are pending per producer from readLoop's pcd. - sg *sync.Cond // To signal writeLoop that there is data to flush. - wdl time.Duration // Snapshot of write deadline. - mp int64 // Snapshot of max pending for client. - lft time.Duration // Last flush time for Write. - stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in. + nb net.Buffers // Pending buffers for send, each has fixed capacity as per nbPool below. + wnb net.Buffers // Working copy of "nb", reused on each flushOutbound call, partial writes may leave entries here for next iteration. + pb int64 // Total pending/queued bytes. + fsp int32 // Flush signals that are pending per producer from readLoop's pcd. + wtp WriteTimeoutPolicy // What do we do on a write timeout? + sg *sync.Cond // To signal writeLoop that there is data to flush. + wdl time.Duration // Snapshot of write deadline. + mp int64 // Snapshot of max pending for client. + lft time.Duration // Last flush time for Write. + stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in. cw *s2.Writer } @@ -698,6 +719,24 @@ func (c *client) initClient() { case c.kind == LEAF && opts.LeafNode.WriteDeadline > 0: c.out.wdl = opts.LeafNode.WriteDeadline } + switch c.kind { + case ROUTER: + if c.out.wtp = opts.Cluster.WriteTimeout; c.out.wtp == WriteTimeoutPolicyDefault { + c.out.wtp = WriteTimeoutPolicyRetry + } + case LEAF: + if c.out.wtp = opts.LeafNode.WriteTimeout; c.out.wtp == WriteTimeoutPolicyDefault { + c.out.wtp = WriteTimeoutPolicyRetry + } + case GATEWAY: + if c.out.wtp = opts.Gateway.WriteTimeout; c.out.wtp == WriteTimeoutPolicyDefault { + c.out.wtp = WriteTimeoutPolicyRetry + } + default: + if c.out.wtp = opts.WriteTimeout; c.out.wtp == WriteTimeoutPolicyDefault { + c.out.wtp = WriteTimeoutPolicyClose + } + } c.out.mp = opts.MaxPending // Snapshot max control line since currently can not be changed on reload and we // were checking it on each call to parse. If this changes and we allow MaxControlLine @@ -1849,7 +1888,7 @@ func (c *client) handleWriteTimeout(written, attempted int64, numChunks int) boo scState, c.out.wdl, numChunks, attempted) // We always close CLIENT connections, or when nothing was written at all... - if c.kind == CLIENT || written == 0 { + if c.out.wtp == WriteTimeoutPolicyClose || written == 0 { c.markConnAsClosed(SlowConsumerWriteDeadline) return true } else { diff --git a/server/client_test.go b/server/client_test.go index 03d0fca2e72..060f3c4b87b 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -3686,3 +3686,112 @@ func TestLogConnectionAuthInfo(t *testing.T) { } }) } + +func TestClientConfigureWriteTimeoutPolicy(t *testing.T) { + for name, policy := range map[string]WriteTimeoutPolicy{ + "Default": WriteTimeoutPolicyDefault, + "Retry": WriteTimeoutPolicyRetry, + "Close": WriteTimeoutPolicyClose, + } { + t.Run(name, func(t *testing.T) { + opts := DefaultOptions() + opts.WriteTimeout = policy + s := RunServer(opts) + defer s.Shutdown() + + nc := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)) + defer nc.Close() + + s.mu.RLock() + defer s.mu.RUnlock() + + for _, r := range s.clients { + if policy == WriteTimeoutPolicyDefault { + require_Equal(t, r.out.wtp, WriteTimeoutPolicyClose) + } else { + require_Equal(t, r.out.wtp, policy) + } + } + }) + } +} + +// TestClientFlushOutboundWriteTimeoutPolicy relies on specifically having +// written at least one byte in order to not trip the "written == 0" close +// condition, so just setting an unrealistically low write deadline won't +// work. Instead what we'll do is write the first byte very quickly and then +// slow down, so that we can trip a more honest slow consumer condition. +type writeTimeoutPolicyWriter struct { + net.Conn + deadline time.Time + written int +} + +func (w *writeTimeoutPolicyWriter) SetWriteDeadline(deadline time.Time) error { + w.deadline = deadline + return w.Conn.SetWriteDeadline(deadline) +} + +func (w *writeTimeoutPolicyWriter) Write(b []byte) (int, error) { + if w.written == 0 { + w.written++ + return w.Conn.Write(b[:1]) + } + time.Sleep(time.Until(w.deadline) + 10*time.Millisecond) + return w.Conn.Write(b) +} + +func TestClientFlushOutboundWriteTimeoutPolicy(t *testing.T) { + for name, policy := range map[string]WriteTimeoutPolicy{ + "Retry": WriteTimeoutPolicyRetry, + "Close": WriteTimeoutPolicyClose, + } { + t.Run(name, func(t *testing.T) { + opts := DefaultOptions() + opts.PingInterval = 250 * time.Millisecond + opts.WriteDeadline = 100 * time.Millisecond + opts.WriteTimeout = policy + s := RunServer(opts) + defer s.Shutdown() + + nc1 := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)) + defer nc1.Close() + + _, err := nc1.Subscribe("test", func(_ *nats.Msg) {}) + require_NoError(t, err) + + nc2 := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)) + defer nc2.Close() + + cid, err := nc1.GetClientID() + require_NoError(t, err) + + client := s.getClient(cid) + client.mu.Lock() + client.out.wdl = 100 * time.Millisecond + client.nc = &writeTimeoutPolicyWriter{Conn: client.nc} + client.mu.Unlock() + + require_NoError(t, nc2.Publish("test", make([]byte, 1024*1024))) + + checkFor(t, 5*time.Second, 10*time.Millisecond, func() error { + client.mu.Lock() + defer client.mu.Unlock() + switch { + case !client.flags.isSet(connMarkedClosed): + return fmt.Errorf("connection not closed yet") + case policy == WriteTimeoutPolicyRetry && client.flags.isSet(isSlowConsumer): + // Retry policy should have marked the client as a slow consumer and + // continued to retry flushes. + return nil + case policy == WriteTimeoutPolicyClose && !client.flags.isSet(isSlowConsumer): + // Close policy shouldn't have marked the client as a slow consumer, + // it will just close it instead. + return nil + default: + return fmt.Errorf("client not in correct state yet") + } + }) + }) + } +} diff --git a/server/gateway_test.go b/server/gateway_test.go index de23b97c063..6056892620d 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -7537,7 +7537,6 @@ func TestGatewayConfigureWriteDeadline(t *testing.T) { defer s1.Shutdown() o2 := testGatewayOptionsFromToWithServers(t, "A", "B", s1) - o2.Gateway.WriteDeadline = 6 * time.Second s2 := runGatewayServer(o2) defer s2.Shutdown() @@ -7546,17 +7545,60 @@ func TestGatewayConfigureWriteDeadline(t *testing.T) { waitForOutboundGateways(t, s1, 1, time.Second) s1.mu.RLock() - s2.mu.RLock() defer s1.mu.RUnlock() - defer s2.mu.RUnlock() - s1.forEachRemote(func(r *client) { - require_Equal(t, r.out.wdl, 6*time.Second) - }) + for _, r := range s1.gateway.out { + require_Equal(t, r.out.wdl, 5*time.Second) + } - s2.forEachRemote(func(r *client) { + for _, r := range s1.gateway.in { require_Equal(t, r.out.wdl, 5*time.Second) - }) + } +} + +func TestGatewayConfigureWriteTimeoutPolicy(t *testing.T) { + for name, policy := range map[string]WriteTimeoutPolicy{ + "Default": WriteTimeoutPolicyDefault, + "Retry": WriteTimeoutPolicyRetry, + "Close": WriteTimeoutPolicyClose, + } { + t.Run(name, func(t *testing.T) { + o1 := testDefaultOptionsForGateway("B") + o1.Gateway.WriteTimeout = policy + s1 := runGatewayServer(o1) + defer s1.Shutdown() + + o2 := testGatewayOptionsFromToWithServers(t, "A", "B", s1) + s2 := runGatewayServer(o2) + defer s2.Shutdown() + + waitForOutboundGateways(t, s2, 1, time.Second) + waitForInboundGateways(t, s1, 1, time.Second) + waitForOutboundGateways(t, s1, 1, time.Second) + + s1.mu.RLock() + defer s1.mu.RUnlock() + + s1.gateway.RLock() + defer s1.gateway.RUnlock() + + for _, r := range s1.gateway.out { + if policy == WriteTimeoutPolicyDefault { + require_Equal(t, r.out.wtp, WriteTimeoutPolicyRetry) + } else { + require_Equal(t, r.out.wtp, policy) + } + } + + for _, r := range s1.gateway.in { + if policy == WriteTimeoutPolicyDefault { + require_Equal(t, r.out.wtp, WriteTimeoutPolicyRetry) + } else { + require_Equal(t, r.out.wtp, policy) + } + } + }) + } } func TestGatewayProcessRSubNoBlockingAccountFetch(t *testing.T) { diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 9d7f94bd227..7883ec76236 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -10810,7 +10810,6 @@ func TestLeafNodeConfigureWriteDeadline(t *testing.T) { s1URL, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", o1.LeafNode.Port)) o2.Cluster.Name = "somethingelse" - o2.LeafNode.WriteDeadline = 6 * time.Second o2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{s1URL}}} s2 := RunServer(o2) defer s2.Shutdown() @@ -10818,17 +10817,45 @@ func TestLeafNodeConfigureWriteDeadline(t *testing.T) { checkLeafNodeConnected(t, s2) s1.mu.RLock() - s2.mu.RLock() defer s1.mu.RUnlock() - defer s2.mu.RUnlock() - s1.forEachRemote(func(r *client) { + for _, r := range s1.leafs { require_Equal(t, r.out.wdl, 5*time.Second) - }) + } +} - s2.forEachRemote(func(r *client) { - require_Equal(t, r.out.wdl, 6*time.Second) - }) +func TestLeafNodeConfigureWriteTimeoutPolicy(t *testing.T) { + for name, policy := range map[string]WriteTimeoutPolicy{ + "Default": WriteTimeoutPolicyDefault, + "Retry": WriteTimeoutPolicyRetry, + "Close": WriteTimeoutPolicyClose, + } { + t.Run(name, func(t *testing.T) { + o1 := testDefaultOptionsForGateway("B") + o1.Gateway.WriteTimeout = policy + s1 := runGatewayServer(o1) + defer s1.Shutdown() + + o2 := testGatewayOptionsFromToWithServers(t, "A", "B", s1) + s2 := runGatewayServer(o2) + defer s2.Shutdown() + + waitForOutboundGateways(t, s2, 1, time.Second) + waitForInboundGateways(t, s1, 1, time.Second) + waitForOutboundGateways(t, s1, 1, time.Second) + + s1.mu.RLock() + defer s1.mu.RUnlock() + + for _, r := range s1.leafs { + if policy == WriteTimeoutPolicyDefault { + require_Equal(t, r.out.wtp, WriteTimeoutPolicyRetry) + } else { + require_Equal(t, r.out.wtp, policy) + } + } + }) + } } // https://github.com/nats-io/nats-server/issues/7441 diff --git a/server/monitor.go b/server/monitor.go index c59a47b81a6..7626ef00739 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1244,6 +1244,7 @@ type Varz struct { JetStream JetStreamVarz `json:"jetstream,omitempty"` // JetStream is the JetStream state TLSTimeout float64 `json:"tls_timeout"` // TLSTimeout is how long TLS operations have to complete WriteDeadline time.Duration `json:"write_deadline"` // WriteDeadline is the maximum time writes to sockets have to complete + WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors Start time.Time `json:"start"` // Start is time when the server was started Now time.Time `json:"now"` // Now is the current time of the server Uptime string `json:"uptime"` // Uptime is how long the server has been running @@ -1290,15 +1291,17 @@ type JetStreamVarz struct { // ClusterOptsVarz contains monitoring cluster information type ClusterOptsVarz struct { - Name string `json:"name,omitempty"` // Name is the configured cluster name - Host string `json:"addr,omitempty"` // Host is the host the cluster listens on for connections - Port int `json:"cluster_port,omitempty"` // Port is the port the cluster listens on for connections - AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication - URLs []string `json:"urls,omitempty"` // URLs is the list of cluster URLs - TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete - TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections - TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed - PoolSize int `json:"pool_size,omitempty"` // PoolSize is the configured route connection pool size + Name string `json:"name,omitempty"` // Name is the configured cluster name + Host string `json:"addr,omitempty"` // Host is the host the cluster listens on for connections + Port int `json:"cluster_port,omitempty"` // Port is the port the cluster listens on for connections + AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication + URLs []string `json:"urls,omitempty"` // URLs is the list of cluster URLs + TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete + TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections + TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed + PoolSize int `json:"pool_size,omitempty"` // PoolSize is the configured route connection pool size + WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete + WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors } // GatewayOptsVarz contains monitoring gateway information @@ -1314,6 +1317,8 @@ type GatewayOptsVarz struct { ConnectRetries int `json:"connect_retries,omitempty"` // ConnectRetries is how many connection attempts the route will make Gateways []RemoteGatewayOptsVarz `json:"gateways,omitempty"` // Gateways is state of configured gateway remotes RejectUnknown bool `json:"reject_unknown,omitempty"` // RejectUnknown indicates if unknown cluster connections will be rejected + WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete + WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors } // RemoteGatewayOptsVarz contains monitoring remote gateway information @@ -1333,6 +1338,8 @@ type LeafNodeOptsVarz struct { TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed Remotes []RemoteLeafOptsVarz `json:"remotes,omitempty"` // Remotes is state of configured Leafnode remotes TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be performed + WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete + WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors } // DenyRules Contains lists of subjects not allowed to be imported/exported @@ -1601,14 +1608,16 @@ func (s *Server) createVarz(pcpu float64, rss int64) *Varz { HTTPBasePath: opts.HTTPBasePath, HTTPSPort: opts.HTTPSPort, Cluster: ClusterOptsVarz{ - Name: info.Cluster, - Host: c.Host, - Port: c.Port, - AuthTimeout: c.AuthTimeout, - TLSTimeout: c.TLSTimeout, - TLSRequired: clustTlsReq, - TLSVerify: clustTlsReq, - PoolSize: opts.Cluster.PoolSize, + Name: info.Cluster, + Host: c.Host, + Port: c.Port, + AuthTimeout: c.AuthTimeout, + TLSTimeout: c.TLSTimeout, + TLSRequired: clustTlsReq, + TLSVerify: clustTlsReq, + PoolSize: opts.Cluster.PoolSize, + WriteDeadline: opts.Cluster.WriteDeadline, + WriteTimeout: opts.Cluster.WriteTimeout.String(), }, Gateway: GatewayOptsVarz{ Name: gw.Name, @@ -1622,6 +1631,8 @@ func (s *Server) createVarz(pcpu float64, rss int64) *Varz { ConnectRetries: gw.ConnectRetries, Gateways: []RemoteGatewayOptsVarz{}, RejectUnknown: gw.RejectUnknown, + WriteDeadline: opts.Cluster.WriteDeadline, + WriteTimeout: opts.Cluster.WriteTimeout.String(), }, LeafNode: LeafNodeOptsVarz{ Host: ln.Host, @@ -1632,6 +1643,8 @@ func (s *Server) createVarz(pcpu float64, rss int64) *Varz { TLSVerify: leafTlsVerify, TLSOCSPPeerVerify: leafTlsOCSPPeerVerify, Remotes: []RemoteLeafOptsVarz{}, + WriteDeadline: opts.Cluster.WriteDeadline, + WriteTimeout: opts.Cluster.WriteTimeout.String(), }, MQTT: MQTTOptsVarz{ Host: mqtt.Host, @@ -1748,6 +1761,7 @@ func (s *Server) updateVarzConfigReloadableFields(v *Varz) { v.MaxPending = opts.MaxPending v.TLSTimeout = opts.TLSTimeout v.WriteDeadline = opts.WriteDeadline + v.WriteTimeout = opts.WriteTimeout.String() v.ConfigLoadTime = s.configTime.UTC() v.ConfigDigest = opts.configDigest v.Tags = opts.Tags diff --git a/server/monitor_test.go b/server/monitor_test.go index c799355df98..a5a08a60ab1 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -2744,6 +2744,7 @@ func TestMonitorCluster(t *testing.T) { opts.Cluster.TLSConfig != nil, opts.Cluster.TLSConfig != nil, DEFAULT_ROUTE_POOL_SIZE, + 0, _EMPTY_, } varzURL := fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port) @@ -2759,7 +2760,7 @@ func TestMonitorCluster(t *testing.T) { // Having this here to make sure that if fields are added in ClusterOptsVarz, // we make sure to update this test (compiler will report an error if we don't) - _ = ClusterOptsVarz{"", "", 0, 0, nil, 2, false, false, 0} + _ = ClusterOptsVarz{"", "", 0, 0, nil, 2, false, false, 0, 0, _EMPTY_} // Alter the fields to make sure that we have a proper deep copy // of what may be stored in the server. Anything we change here @@ -2914,6 +2915,7 @@ func TestMonitorGateway(t *testing.T) { opts.Gateway.ConnectRetries, []RemoteGatewayOptsVarz{{"B", 1, nil}}, opts.Gateway.RejectUnknown, + 0, _EMPTY_, } // Since URLs array is not guaranteed to be always the same order, // we don't add it in the expected GatewayOptsVarz, instead we @@ -2951,7 +2953,7 @@ func TestMonitorGateway(t *testing.T) { // Having this here to make sure that if fields are added in GatewayOptsVarz, // we make sure to update this test (compiler will report an error if we don't) - _ = GatewayOptsVarz{"", "", 0, 0, 0, false, false, "", 0, []RemoteGatewayOptsVarz{{"", 0, nil}}, false} + _ = GatewayOptsVarz{"", "", 0, 0, 0, false, false, "", 0, []RemoteGatewayOptsVarz{{"", 0, nil}}, false, 0, "default"} // Alter the fields to make sure that we have a proper deep copy // of what may be stored in the server. Anything we change here @@ -3137,6 +3139,7 @@ func TestMonitorLeafNode(t *testing.T) { }, }, false, + 0, _EMPTY_, } varzURL := fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port) @@ -3161,7 +3164,7 @@ func TestMonitorLeafNode(t *testing.T) { // Having this here to make sure that if fields are added in ClusterOptsVarz, // we make sure to update this test (compiler will report an error if we don't) - _ = LeafNodeOptsVarz{"", 0, 0, 0, false, false, []RemoteLeafOptsVarz{{"", 0, nil, nil, false}}, false} + _ = LeafNodeOptsVarz{"", 0, 0, 0, false, false, []RemoteLeafOptsVarz{{"", 0, nil, nil, false}}, false, 0, _EMPTY_} // Alter the fields to make sure that we have a proper deep copy // of what may be stored in the server. Anything we change here diff --git a/server/opts.go b/server/opts.go index d05b3074954..a3507a4a324 100644 --- a/server/opts.go +++ b/server/opts.go @@ -62,29 +62,30 @@ type PinnedCertSet map[string]struct{} // NOTE: This structure is no longer used for monitoring endpoints // and json tags are deprecated and may be removed in the future. type ClusterOpts struct { - Name string `json:"-"` - Host string `json:"addr,omitempty"` - Port int `json:"cluster_port,omitempty"` - Username string `json:"-"` - Password string `json:"-"` - AuthTimeout float64 `json:"auth_timeout,omitempty"` - Permissions *RoutePermissions `json:"-"` - TLSTimeout float64 `json:"-"` - TLSConfig *tls.Config `json:"-"` - TLSMap bool `json:"-"` - TLSCheckKnownURLs bool `json:"-"` - TLSPinnedCerts PinnedCertSet `json:"-"` - ListenStr string `json:"-"` - Advertise string `json:"-"` - NoAdvertise bool `json:"-"` - ConnectRetries int `json:"-"` - ConnectBackoff bool `json:"-"` - PoolSize int `json:"-"` - PinnedAccounts []string `json:"-"` - Compression CompressionOpts `json:"-"` - PingInterval time.Duration `json:"-"` - MaxPingsOut int `json:"-"` - WriteDeadline time.Duration `json:"-"` + Name string `json:"-"` + Host string `json:"addr,omitempty"` + Port int `json:"cluster_port,omitempty"` + Username string `json:"-"` + Password string `json:"-"` + AuthTimeout float64 `json:"auth_timeout,omitempty"` + Permissions *RoutePermissions `json:"-"` + TLSTimeout float64 `json:"-"` + TLSConfig *tls.Config `json:"-"` + TLSMap bool `json:"-"` + TLSCheckKnownURLs bool `json:"-"` + TLSPinnedCerts PinnedCertSet `json:"-"` + ListenStr string `json:"-"` + Advertise string `json:"-"` + NoAdvertise bool `json:"-"` + ConnectRetries int `json:"-"` + ConnectBackoff bool `json:"-"` + PoolSize int `json:"-"` + PinnedAccounts []string `json:"-"` + Compression CompressionOpts `json:"-"` + PingInterval time.Duration `json:"-"` + MaxPingsOut int `json:"-"` + WriteDeadline time.Duration `json:"-"` + WriteTimeout WriteTimeoutPolicy `json:"-"` // Not exported (used in tests) resolver netResolver @@ -128,6 +129,7 @@ type GatewayOpts struct { Gateways []*RemoteGatewayOpts `json:"gateways,omitempty"` RejectUnknown bool `json:"reject_unknown,omitempty"` // config got renamed to reject_unknown_cluster WriteDeadline time.Duration `json:"-"` + WriteTimeout WriteTimeoutPolicy `json:"-"` // Not exported, for tests. resolver netResolver @@ -174,11 +176,12 @@ type LeafNodeOpts struct { // to start before falling back to previous behavior of sending the // INFO protocol first. It allows for a mix of newer remote leafnodes // that can require a TLS handshake first, and older that can't. - TLSHandshakeFirstFallback time.Duration `json:"-"` - Advertise string `json:"-"` - NoAdvertise bool `json:"-"` - ReconnectInterval time.Duration `json:"-"` - WriteDeadline time.Duration `json:"-"` + TLSHandshakeFirstFallback time.Duration `json:"-"` + Advertise string `json:"-"` + NoAdvertise bool `json:"-"` + ReconnectInterval time.Duration `json:"-"` + WriteDeadline time.Duration `json:"-"` + WriteTimeout WriteTimeoutPolicy `json:"-"` // Compression options Compression CompressionOpts `json:"-"` @@ -424,12 +427,13 @@ type Options struct { // to start before falling back to previous behavior of sending the // INFO protocol first. It allows for a mix of newer clients that can // require a TLS handshake first, and older clients that can't. - TLSHandshakeFirstFallback time.Duration `json:"-"` - AllowNonTLS bool `json:"-"` - WriteDeadline time.Duration `json:"-"` - MaxClosedClients int `json:"-"` - LameDuckDuration time.Duration `json:"-"` - LameDuckGracePeriod time.Duration `json:"-"` + TLSHandshakeFirstFallback time.Duration `json:"-"` + AllowNonTLS bool `json:"-"` + WriteDeadline time.Duration `json:"-"` + WriteTimeout WriteTimeoutPolicy `json:"-"` + MaxClosedClients int `json:"-"` + LameDuckDuration time.Duration `json:"-"` + LameDuckGracePeriod time.Duration `json:"-"` // MaxTracedMsgLen is the maximum printable length for traced messages. MaxTracedMsgLen int `json:"-"` @@ -1348,6 +1352,8 @@ func (o *Options) processConfigFileLine(k string, v any, errors *[]error, warnin o.AllowNonTLS = v.(bool) case "write_deadline": o.WriteDeadline = parseDuration("write_deadline", tk, v, errors, warnings) + case "write_timeout": + o.WriteTimeout = parseWriteDeadlinePolicy(tk, v.(string), errors) case "lame_duck_duration": dur, err := time.ParseDuration(v.(string)) if err != nil { @@ -1829,6 +1835,21 @@ func parseDuration(field string, tk token, v any, errors *[]error, warnings *[]e } } +func parseWriteDeadlinePolicy(tk token, v string, errors *[]error) WriteTimeoutPolicy { + switch v { + case "default": + return WriteTimeoutPolicyDefault + case "close": + return WriteTimeoutPolicyClose + case "retry": + return WriteTimeoutPolicyRetry + default: + err := &configErr{tk, "write_timeout must be 'default', 'close' or 'retry'"} + *errors = append(*errors, err) + return WriteTimeoutPolicyDefault + } +} + func trackExplicitVal(pm *map[string]bool, name string, val bool) { m := *pm if m == nil { @@ -2005,6 +2026,8 @@ func parseCluster(v any, opts *Options, errors *[]error, warnings *[]error) erro opts.Cluster.MaxPingsOut = int(mv.(int64)) case "write_deadline": opts.Cluster.WriteDeadline = parseDuration("write_deadline", tk, mv, errors, warnings) + case "write_timeout": + opts.Cluster.WriteTimeout = parseWriteDeadlinePolicy(tk, mv.(string), errors) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -2195,6 +2218,8 @@ func parseGateway(v any, o *Options, errors *[]error, warnings *[]error) error { o.Gateway.RejectUnknown = mv.(bool) case "write_deadline": o.Gateway.WriteDeadline = parseDuration("write_deadline", tk, mv, errors, warnings) + case "write_timeout": + o.Gateway.WriteTimeout = parseWriteDeadlinePolicy(tk, mv.(string), errors) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -2726,6 +2751,8 @@ func parseLeafNodes(v any, opts *Options, errors *[]error, warnings *[]error) er opts.LeafNode.IsolateLeafnodeInterest = mv.(bool) case "write_deadline": opts.LeafNode.WriteDeadline = parseDuration("write_deadline", tk, mv, errors, warnings) + case "write_timeout": + opts.LeafNode.WriteTimeout = parseWriteDeadlinePolicy(tk, mv.(string), errors) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/opts_test.go b/server/opts_test.go index e885791c8d2..1b5071de758 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -4054,3 +4054,68 @@ func TestWriteDeadlineConfigParsing(t *testing.T) { }) } } + +func TestWriteTimeoutConfigParsing(t *testing.T) { + type testCase struct { + name string + config string + expect func(t *testing.T, opts *Options) + } + + for str, pol := range map[string]WriteTimeoutPolicy{ + "default": WriteTimeoutPolicyDefault, + "retry": WriteTimeoutPolicyRetry, + "close": WriteTimeoutPolicyClose, + } { + for _, tc := range []testCase{ + { + name: "LeafNode", + config: fmt.Sprintf(` + leafnodes { + write_timeout: %s + } + `, str), + expect: func(t *testing.T, opts *Options) { + require_Equal(t, opts.LeafNode.WriteTimeout, pol) + }, + }, + { + name: "Gateway", + config: fmt.Sprintf(` + gateway { + write_timeout: %s + } + `, str), + expect: func(t *testing.T, opts *Options) { + require_Equal(t, opts.Gateway.WriteTimeout, pol) + }, + }, + { + name: "Cluster", + config: fmt.Sprintf(` + cluster { + write_timeout: %s + } + `, str), + expect: func(t *testing.T, opts *Options) { + require_Equal(t, opts.Cluster.WriteTimeout, pol) + }, + }, + { + name: "Global", + config: fmt.Sprintf(` + write_timeout: %s + `, str), + expect: func(t *testing.T, opts *Options) { + require_Equal(t, opts.WriteTimeout, pol) + }, + }, + } { + t.Run(fmt.Sprintf("%s/%s", tc.name, str), func(t *testing.T) { + opts, err := parseConfigTolerantly(t, tc.config) + require_NoError(t, err) + tc.expect(t, opts) + }) + } + } +} diff --git a/server/reload.go b/server/reload.go index d3530a550ad..f8af878f206 100644 --- a/server/reload.go +++ b/server/reload.go @@ -1259,7 +1259,7 @@ func imposeOrder(value any) error { slices.Sort(value.AllowedOrigins) case string, bool, uint8, uint16, uint64, int, int32, int64, time.Duration, float64, nil, LeafNodeOpts, ClusterOpts, *tls.Config, PinnedCertSet, *URLAccResolver, *MemAccResolver, *DirAccResolver, *CacheDirAccResolver, Authentication, MQTTOpts, jwt.TagList, - *OCSPConfig, map[string]string, JSLimitOpts, StoreCipher, *OCSPResponseCacheConfig, *ProxiesConfig: + *OCSPConfig, map[string]string, JSLimitOpts, StoreCipher, *OCSPResponseCacheConfig, *ProxiesConfig, WriteTimeoutPolicy: // explicitly skipped types case *AuthCallout: case JSTpmOpts: diff --git a/server/routes_test.go b/server/routes_test.go index 2a394b24f1d..93866700752 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -5005,3 +5005,37 @@ func TestRouteConfigureWriteDeadline(t *testing.T) { require_Equal(t, r.out.wdl, 6*time.Second) }) } + +func TestRouteConfigureWriteTimeoutPolicy(t *testing.T) { + for name, policy := range map[string]WriteTimeoutPolicy{ + "Default": WriteTimeoutPolicyDefault, + "Retry": WriteTimeoutPolicyRetry, + "Close": WriteTimeoutPolicyClose, + } { + t.Run(name, func(t *testing.T) { + o1 := testDefaultOptionsForGateway("B") + o1.Gateway.WriteTimeout = policy + s1 := runGatewayServer(o1) + defer s1.Shutdown() + + o2 := testGatewayOptionsFromToWithServers(t, "A", "B", s1) + s2 := runGatewayServer(o2) + defer s2.Shutdown() + + waitForOutboundGateways(t, s2, 1, time.Second) + waitForInboundGateways(t, s1, 1, time.Second) + waitForOutboundGateways(t, s1, 1, time.Second) + + s1.mu.RLock() + defer s1.mu.RUnlock() + + s1.forEachRoute(func(r *client) { + if policy == WriteTimeoutPolicyDefault { + require_Equal(t, r.out.wtp, WriteTimeoutPolicyRetry) + } else { + require_Equal(t, r.out.wtp, policy) + } + }) + }) + } +}