Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 49 additions & 10 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,26 @@ const (
pmrMsgImportedFromService
)

type WriteTimeoutPolicy uint8

const (
WriteTimeoutPolicyDefault = iota
WriteTimeoutPolicyClose
WriteTimeoutPolicyRetry
)

// String returns a human-friendly value. Only used in varz.
func (p WriteTimeoutPolicy) String() string {
switch p {
case WriteTimeoutPolicyClose:
return "close"
case WriteTimeoutPolicyRetry:
return "retry"
default:
return _EMPTY_
}
}

type client struct {
// Here first because of use of atomics, and memory alignment.
stats
Expand Down Expand Up @@ -328,15 +348,16 @@ type pinfo struct {

// outbound holds pending data for a socket.
type outbound struct {
nb net.Buffers // Pending buffers for send, each has fixed capacity as per nbPool below.
wnb net.Buffers // Working copy of "nb", reused on each flushOutbound call, partial writes may leave entries here for next iteration.
pb int64 // Total pending/queued bytes.
fsp int32 // Flush signals that are pending per producer from readLoop's pcd.
sg *sync.Cond // To signal writeLoop that there is data to flush.
wdl time.Duration // Snapshot of write deadline.
mp int64 // Snapshot of max pending for client.
lft time.Duration // Last flush time for Write.
stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in.
nb net.Buffers // Pending buffers for send, each has fixed capacity as per nbPool below.
wnb net.Buffers // Working copy of "nb", reused on each flushOutbound call, partial writes may leave entries here for next iteration.
pb int64 // Total pending/queued bytes.
fsp int32 // Flush signals that are pending per producer from readLoop's pcd.
wtp WriteTimeoutPolicy // What do we do on a write timeout?
sg *sync.Cond // To signal writeLoop that there is data to flush.
wdl time.Duration // Snapshot of write deadline.
mp int64 // Snapshot of max pending for client.
lft time.Duration // Last flush time for Write.
stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in.
cw *s2.Writer
}

Expand Down Expand Up @@ -698,6 +719,24 @@ func (c *client) initClient() {
case c.kind == LEAF && opts.LeafNode.WriteDeadline > 0:
c.out.wdl = opts.LeafNode.WriteDeadline
}
switch c.kind {
case ROUTER:
if c.out.wtp = opts.Cluster.WriteTimeout; c.out.wtp == WriteTimeoutPolicyDefault {
c.out.wtp = WriteTimeoutPolicyRetry
}
case LEAF:
if c.out.wtp = opts.LeafNode.WriteTimeout; c.out.wtp == WriteTimeoutPolicyDefault {
c.out.wtp = WriteTimeoutPolicyRetry
}
case GATEWAY:
if c.out.wtp = opts.Gateway.WriteTimeout; c.out.wtp == WriteTimeoutPolicyDefault {
c.out.wtp = WriteTimeoutPolicyRetry
}
default:
if c.out.wtp = opts.WriteTimeout; c.out.wtp == WriteTimeoutPolicyDefault {
c.out.wtp = WriteTimeoutPolicyClose
}
}
c.out.mp = opts.MaxPending
// Snapshot max control line since currently can not be changed on reload and we
// were checking it on each call to parse. If this changes and we allow MaxControlLine
Expand Down Expand Up @@ -1849,7 +1888,7 @@ func (c *client) handleWriteTimeout(written, attempted int64, numChunks int) boo
scState, c.out.wdl, numChunks, attempted)

// We always close CLIENT connections, or when nothing was written at all...
if c.kind == CLIENT || written == 0 {
if c.out.wtp == WriteTimeoutPolicyClose || written == 0 {
c.markConnAsClosed(SlowConsumerWriteDeadline)
return true
} else {
Expand Down
109 changes: 109 additions & 0 deletions server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3686,3 +3686,112 @@ func TestLogConnectionAuthInfo(t *testing.T) {
}
})
}

func TestClientConfigureWriteTimeoutPolicy(t *testing.T) {
for name, policy := range map[string]WriteTimeoutPolicy{
"Default": WriteTimeoutPolicyDefault,
"Retry": WriteTimeoutPolicyRetry,
"Close": WriteTimeoutPolicyClose,
} {
t.Run(name, func(t *testing.T) {
opts := DefaultOptions()
opts.WriteTimeout = policy
s := RunServer(opts)
defer s.Shutdown()

nc := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
defer nc.Close()

s.mu.RLock()
defer s.mu.RUnlock()

for _, r := range s.clients {
if policy == WriteTimeoutPolicyDefault {
require_Equal(t, r.out.wtp, WriteTimeoutPolicyClose)
} else {
require_Equal(t, r.out.wtp, policy)
}
}
})
}
}

// TestClientFlushOutboundWriteTimeoutPolicy relies on specifically having
// written at least one byte in order to not trip the "written == 0" close
// condition, so just setting an unrealistically low write deadline won't
// work. Instead what we'll do is write the first byte very quickly and then
// slow down, so that we can trip a more honest slow consumer condition.
type writeTimeoutPolicyWriter struct {
net.Conn
deadline time.Time
written int
}

func (w *writeTimeoutPolicyWriter) SetWriteDeadline(deadline time.Time) error {
w.deadline = deadline
return w.Conn.SetWriteDeadline(deadline)
}

func (w *writeTimeoutPolicyWriter) Write(b []byte) (int, error) {
if w.written == 0 {
w.written++
return w.Conn.Write(b[:1])
}
time.Sleep(time.Until(w.deadline) + 10*time.Millisecond)
return w.Conn.Write(b)
}

func TestClientFlushOutboundWriteTimeoutPolicy(t *testing.T) {
for name, policy := range map[string]WriteTimeoutPolicy{
"Retry": WriteTimeoutPolicyRetry,
"Close": WriteTimeoutPolicyClose,
} {
t.Run(name, func(t *testing.T) {
opts := DefaultOptions()
opts.PingInterval = 250 * time.Millisecond
opts.WriteDeadline = 100 * time.Millisecond
opts.WriteTimeout = policy
s := RunServer(opts)
defer s.Shutdown()

nc1 := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
defer nc1.Close()

_, err := nc1.Subscribe("test", func(_ *nats.Msg) {})
require_NoError(t, err)

nc2 := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
defer nc2.Close()

cid, err := nc1.GetClientID()
require_NoError(t, err)

client := s.getClient(cid)
client.mu.Lock()
client.out.wdl = 100 * time.Millisecond
client.nc = &writeTimeoutPolicyWriter{Conn: client.nc}
client.mu.Unlock()

require_NoError(t, nc2.Publish("test", make([]byte, 1024*1024)))

checkFor(t, 5*time.Second, 10*time.Millisecond, func() error {
client.mu.Lock()
defer client.mu.Unlock()
switch {
case !client.flags.isSet(connMarkedClosed):
return fmt.Errorf("connection not closed yet")
case policy == WriteTimeoutPolicyRetry && client.flags.isSet(isSlowConsumer):
// Retry policy should have marked the client as a slow consumer and
// continued to retry flushes.
return nil
case policy == WriteTimeoutPolicyClose && !client.flags.isSet(isSlowConsumer):
// Close policy shouldn't have marked the client as a slow consumer,
// it will just close it instead.
return nil
default:
return fmt.Errorf("client not in correct state yet")
}
})
})
}
}
58 changes: 50 additions & 8 deletions server/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7537,7 +7537,6 @@ func TestGatewayConfigureWriteDeadline(t *testing.T) {
defer s1.Shutdown()

o2 := testGatewayOptionsFromToWithServers(t, "A", "B", s1)
o2.Gateway.WriteDeadline = 6 * time.Second
s2 := runGatewayServer(o2)
defer s2.Shutdown()

Expand All @@ -7546,17 +7545,60 @@ func TestGatewayConfigureWriteDeadline(t *testing.T) {
waitForOutboundGateways(t, s1, 1, time.Second)

s1.mu.RLock()
s2.mu.RLock()
defer s1.mu.RUnlock()
defer s2.mu.RUnlock()

s1.forEachRemote(func(r *client) {
require_Equal(t, r.out.wdl, 6*time.Second)
})
for _, r := range s1.gateway.out {
require_Equal(t, r.out.wdl, 5*time.Second)
}

s2.forEachRemote(func(r *client) {
for _, r := range s1.gateway.in {
require_Equal(t, r.out.wdl, 5*time.Second)
})
}
}

func TestGatewayConfigureWriteTimeoutPolicy(t *testing.T) {
for name, policy := range map[string]WriteTimeoutPolicy{
"Default": WriteTimeoutPolicyDefault,
"Retry": WriteTimeoutPolicyRetry,
"Close": WriteTimeoutPolicyClose,
} {
t.Run(name, func(t *testing.T) {
o1 := testDefaultOptionsForGateway("B")
o1.Gateway.WriteTimeout = policy
s1 := runGatewayServer(o1)
defer s1.Shutdown()

o2 := testGatewayOptionsFromToWithServers(t, "A", "B", s1)
s2 := runGatewayServer(o2)
defer s2.Shutdown()

waitForOutboundGateways(t, s2, 1, time.Second)
waitForInboundGateways(t, s1, 1, time.Second)
waitForOutboundGateways(t, s1, 1, time.Second)

s1.mu.RLock()
defer s1.mu.RUnlock()

s1.gateway.RLock()
defer s1.gateway.RUnlock()

for _, r := range s1.gateway.out {
if policy == WriteTimeoutPolicyDefault {
require_Equal(t, r.out.wtp, WriteTimeoutPolicyRetry)
} else {
require_Equal(t, r.out.wtp, policy)
}
}

for _, r := range s1.gateway.in {
if policy == WriteTimeoutPolicyDefault {
require_Equal(t, r.out.wtp, WriteTimeoutPolicyRetry)
} else {
require_Equal(t, r.out.wtp, policy)
}
}
})
}
}

func TestGatewayProcessRSubNoBlockingAccountFetch(t *testing.T) {
Expand Down
43 changes: 35 additions & 8 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10810,25 +10810,52 @@ func TestLeafNodeConfigureWriteDeadline(t *testing.T) {

s1URL, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", o1.LeafNode.Port))
o2.Cluster.Name = "somethingelse"
o2.LeafNode.WriteDeadline = 6 * time.Second
o2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{s1URL}}}
s2 := RunServer(o2)
defer s2.Shutdown()

checkLeafNodeConnected(t, s2)

s1.mu.RLock()
s2.mu.RLock()
defer s1.mu.RUnlock()
defer s2.mu.RUnlock()

s1.forEachRemote(func(r *client) {
for _, r := range s1.leafs {
require_Equal(t, r.out.wdl, 5*time.Second)
})
}
}

s2.forEachRemote(func(r *client) {
require_Equal(t, r.out.wdl, 6*time.Second)
})
func TestLeafNodeConfigureWriteTimeoutPolicy(t *testing.T) {
for name, policy := range map[string]WriteTimeoutPolicy{
"Default": WriteTimeoutPolicyDefault,
"Retry": WriteTimeoutPolicyRetry,
"Close": WriteTimeoutPolicyClose,
} {
t.Run(name, func(t *testing.T) {
o1 := testDefaultOptionsForGateway("B")
o1.Gateway.WriteTimeout = policy
s1 := runGatewayServer(o1)
defer s1.Shutdown()

o2 := testGatewayOptionsFromToWithServers(t, "A", "B", s1)
s2 := runGatewayServer(o2)
defer s2.Shutdown()

waitForOutboundGateways(t, s2, 1, time.Second)
waitForInboundGateways(t, s1, 1, time.Second)
waitForOutboundGateways(t, s1, 1, time.Second)

s1.mu.RLock()
defer s1.mu.RUnlock()

for _, r := range s1.leafs {
if policy == WriteTimeoutPolicyDefault {
require_Equal(t, r.out.wtp, WriteTimeoutPolicyRetry)
} else {
require_Equal(t, r.out.wtp, policy)
}
}
})
}
}

// https://github.com/nats-io/nats-server/issues/7441
Expand Down
Loading
Loading