Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed server/.tmp
Binary file not shown.
4 changes: 2 additions & 2 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3794,7 +3794,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
// If JetStream is enabled for this server we will call into configJetStream for the account
// regardless of enabled or disabled. It handles both cases.
if jsEnabled {
if err := s.configJetStream(a); err != nil {
if err := s.configJetStream(a, nil); err != nil {
s.Errorf("Error configuring jetstream for account [%s]: %v", tl, err.Error())
a.mu.Lock()
// Absent reload of js server cfg, this is going to be broken until js is disabled
Expand Down Expand Up @@ -4371,7 +4371,7 @@ func (dr *DirAccResolver) Start(s *Server) error {
s.Warnf("DirResolver - Error checking for JetStream support for account %q: %v", pubKey, err)
}
} else if jsa == nil {
if err = s.configJetStream(acc); err != nil {
if err = s.configJetStream(acc, nil); err != nil {
s.Errorf("DirResolver - Error configuring JetStream for account %q: %v", pubKey, err)
}
}
Expand Down
5 changes: 4 additions & 1 deletion server/ats/ats.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ func AccessTime() int64 {
// Return last updated time.
v := utime.Load()
if v == 0 {
panic("access time service not running")
// Always register a time, the worst case is a stale time.
// On startup, we can register in parallel and could previously panic.
v = time.Now().UnixNano()
utime.Store(v)
}
return v
}
17 changes: 10 additions & 7 deletions server/ats/ats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ import (
"time"
)

func TestNotRunningPanic(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("Expected function to panic, but it did not")
}
}()
func TestNotRunningValue(t *testing.T) {
// Set back to zero in case this test gets run multiple times via --count.
utime.Store(0)
_ = AccessTime()
at := AccessTime()
if at == 0 {
t.Fatal("Expected non-zero access time")
}

atn := AccessTime()
if atn != at {
t.Fatal("Did not expect updates to access time")
}
}

func TestRegisterAndUnregister(t *testing.T) {
Expand Down
67 changes: 57 additions & 10 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,26 @@ const (
pmrMsgImportedFromService
)

type WriteTimeoutPolicy uint8

const (
WriteTimeoutPolicyDefault = iota
WriteTimeoutPolicyClose
WriteTimeoutPolicyRetry
)

// String returns a human-friendly value. Only used in varz.
func (p WriteTimeoutPolicy) String() string {
switch p {
case WriteTimeoutPolicyClose:
return "close"
case WriteTimeoutPolicyRetry:
return "retry"
default:
return _EMPTY_
}
}

type client struct {
// Here first because of use of atomics, and memory alignment.
stats
Expand Down Expand Up @@ -315,15 +335,16 @@ type pinfo struct {

// outbound holds pending data for a socket.
type outbound struct {
nb net.Buffers // Pending buffers for send, each has fixed capacity as per nbPool below.
wnb net.Buffers // Working copy of "nb", reused on each flushOutbound call, partial writes may leave entries here for next iteration.
pb int64 // Total pending/queued bytes.
fsp int32 // Flush signals that are pending per producer from readLoop's pcd.
sg *sync.Cond // To signal writeLoop that there is data to flush.
wdl time.Duration // Snapshot of write deadline.
mp int64 // Snapshot of max pending for client.
lft time.Duration // Last flush time for Write.
stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in.
nb net.Buffers // Pending buffers for send, each has fixed capacity as per nbPool below.
wnb net.Buffers // Working copy of "nb", reused on each flushOutbound call, partial writes may leave entries here for next iteration.
pb int64 // Total pending/queued bytes.
fsp int32 // Flush signals that are pending per producer from readLoop's pcd.
wtp WriteTimeoutPolicy // What do we do on a write timeout?
sg *sync.Cond // To signal writeLoop that there is data to flush.
wdl time.Duration // Snapshot of write deadline.
mp int64 // Snapshot of max pending for client.
lft time.Duration // Last flush time for Write.
stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in.
cw *s2.Writer
}

Expand Down Expand Up @@ -676,6 +697,32 @@ func (c *client) initClient() {
opts := s.getOpts()
// Snapshots to avoid mutex access in fast paths.
c.out.wdl = opts.WriteDeadline
switch {
case c.kind == ROUTER && opts.Cluster.WriteDeadline > 0:
c.out.wdl = opts.Cluster.WriteDeadline
case c.kind == GATEWAY && opts.Gateway.WriteDeadline > 0:
c.out.wdl = opts.Gateway.WriteDeadline
case c.kind == LEAF && opts.LeafNode.WriteDeadline > 0:
c.out.wdl = opts.LeafNode.WriteDeadline
}
switch c.kind {
case ROUTER:
if c.out.wtp = opts.Cluster.WriteTimeout; c.out.wtp == WriteTimeoutPolicyDefault {
c.out.wtp = WriteTimeoutPolicyRetry
}
case LEAF:
if c.out.wtp = opts.LeafNode.WriteTimeout; c.out.wtp == WriteTimeoutPolicyDefault {
c.out.wtp = WriteTimeoutPolicyRetry
}
case GATEWAY:
if c.out.wtp = opts.Gateway.WriteTimeout; c.out.wtp == WriteTimeoutPolicyDefault {
c.out.wtp = WriteTimeoutPolicyRetry
}
default:
if c.out.wtp = opts.WriteTimeout; c.out.wtp == WriteTimeoutPolicyDefault {
c.out.wtp = WriteTimeoutPolicyClose
}
}
c.out.mp = opts.MaxPending
// Snapshot max control line since currently can not be changed on reload and we
// were checking it on each call to parse. If this changes and we allow MaxControlLine
Expand Down Expand Up @@ -1827,7 +1874,7 @@ func (c *client) handleWriteTimeout(written, attempted int64, numChunks int) boo
scState, c.out.wdl, numChunks, attempted)

// We always close CLIENT connections, or when nothing was written at all...
if c.kind == CLIENT || written == 0 {
if c.out.wtp == WriteTimeoutPolicyClose || written == 0 {
c.markConnAsClosed(SlowConsumerWriteDeadline)
return true
} else {
Expand Down
109 changes: 109 additions & 0 deletions server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3368,3 +3368,112 @@ func TestClientRejectsNRGSubjects(t *testing.T) {
require_True(t, strings.HasPrefix(err.Error(), "nats: permissions violation"))
})
}

func TestClientConfigureWriteTimeoutPolicy(t *testing.T) {
for name, policy := range map[string]WriteTimeoutPolicy{
"Default": WriteTimeoutPolicyDefault,
"Retry": WriteTimeoutPolicyRetry,
"Close": WriteTimeoutPolicyClose,
} {
t.Run(name, func(t *testing.T) {
opts := DefaultOptions()
opts.WriteTimeout = policy
s := RunServer(opts)
defer s.Shutdown()

nc := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
defer nc.Close()

s.mu.RLock()
defer s.mu.RUnlock()

for _, r := range s.clients {
if policy == WriteTimeoutPolicyDefault {
require_Equal(t, r.out.wtp, WriteTimeoutPolicyClose)
} else {
require_Equal(t, r.out.wtp, policy)
}
}
})
}
}

// TestClientFlushOutboundWriteTimeoutPolicy relies on specifically having
// written at least one byte in order to not trip the "written == 0" close
// condition, so just setting an unrealistically low write deadline won't
// work. Instead what we'll do is write the first byte very quickly and then
// slow down, so that we can trip a more honest slow consumer condition.
type writeTimeoutPolicyWriter struct {
net.Conn
deadline time.Time
written int
}

func (w *writeTimeoutPolicyWriter) SetWriteDeadline(deadline time.Time) error {
w.deadline = deadline
return w.Conn.SetWriteDeadline(deadline)
}

func (w *writeTimeoutPolicyWriter) Write(b []byte) (int, error) {
if w.written == 0 {
w.written++
return w.Conn.Write(b[:1])
}
time.Sleep(time.Until(w.deadline) + 10*time.Millisecond)
return w.Conn.Write(b)
}

func TestClientFlushOutboundWriteTimeoutPolicy(t *testing.T) {
for name, policy := range map[string]WriteTimeoutPolicy{
"Retry": WriteTimeoutPolicyRetry,
"Close": WriteTimeoutPolicyClose,
} {
t.Run(name, func(t *testing.T) {
opts := DefaultOptions()
opts.PingInterval = 250 * time.Millisecond
opts.WriteDeadline = 100 * time.Millisecond
opts.WriteTimeout = policy
s := RunServer(opts)
defer s.Shutdown()

nc1 := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
defer nc1.Close()

_, err := nc1.Subscribe("test", func(_ *nats.Msg) {})
require_NoError(t, err)

nc2 := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
defer nc2.Close()

cid, err := nc1.GetClientID()
require_NoError(t, err)

client := s.getClient(cid)
client.mu.Lock()
client.out.wdl = 100 * time.Millisecond
client.nc = &writeTimeoutPolicyWriter{Conn: client.nc}
client.mu.Unlock()

require_NoError(t, nc2.Publish("test", make([]byte, 1024*1024)))

checkFor(t, 5*time.Second, 10*time.Millisecond, func() error {
client.mu.Lock()
defer client.mu.Unlock()
switch {
case !client.flags.isSet(connMarkedClosed):
return fmt.Errorf("connection not closed yet")
case policy == WriteTimeoutPolicyRetry && client.flags.isSet(isSlowConsumer):
// Retry policy should have marked the client as a slow consumer and
// continued to retry flushes.
return nil
case policy == WriteTimeoutPolicyClose && !client.flags.isSet(isSlowConsumer):
// Close policy shouldn't have marked the client as a slow consumer,
// it will just close it instead.
return nil
default:
return fmt.Errorf("client not in correct state yet")
}
})
})
}
}
2 changes: 1 addition & 1 deletion server/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1247,7 +1247,7 @@ func TestAccountReqMonitoring(t *testing.T) {
s.EnableJetStream(&JetStreamConfig{StoreDir: t.TempDir()})
unusedAcc, _ := createAccount(s)
acc, akp := createAccount(s)
acc.EnableJetStream(nil)
acc.EnableJetStream(nil, nil)
subsz := fmt.Sprintf(accDirectReqSubj, acc.Name, "SUBSZ")
connz := fmt.Sprintf(accDirectReqSubj, acc.Name, "CONNZ")
jsz := fmt.Sprintf(accDirectReqSubj, acc.Name, "JSZ")
Expand Down
Loading
Loading