diff --git a/server/.tmp b/server/.tmp deleted file mode 100644 index 67d91f16b8d..00000000000 Binary files a/server/.tmp and /dev/null differ diff --git a/server/accounts.go b/server/accounts.go index 55bd0774746..56b969e6d5c 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -3794,7 +3794,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim // If JetStream is enabled for this server we will call into configJetStream for the account // regardless of enabled or disabled. It handles both cases. if jsEnabled { - if err := s.configJetStream(a); err != nil { + if err := s.configJetStream(a, nil); err != nil { s.Errorf("Error configuring jetstream for account [%s]: %v", tl, err.Error()) a.mu.Lock() // Absent reload of js server cfg, this is going to be broken until js is disabled @@ -4371,7 +4371,7 @@ func (dr *DirAccResolver) Start(s *Server) error { s.Warnf("DirResolver - Error checking for JetStream support for account %q: %v", pubKey, err) } } else if jsa == nil { - if err = s.configJetStream(acc); err != nil { + if err = s.configJetStream(acc, nil); err != nil { s.Errorf("DirResolver - Error configuring JetStream for account %q: %v", pubKey, err) } } diff --git a/server/ats/ats.go b/server/ats/ats.go index c310ccbd078..50045bfd57f 100644 --- a/server/ats/ats.go +++ b/server/ats/ats.go @@ -77,7 +77,10 @@ func AccessTime() int64 { // Return last updated time. v := utime.Load() if v == 0 { - panic("access time service not running") + // Always register a time, the worst case is a stale time. + // On startup, we can register in parallel and could previously panic. + v = time.Now().UnixNano() + utime.Store(v) } return v } diff --git a/server/ats/ats_test.go b/server/ats/ats_test.go index 0c606408a9a..63451b62fb3 100644 --- a/server/ats/ats_test.go +++ b/server/ats/ats_test.go @@ -19,15 +19,18 @@ import ( "time" ) -func TestNotRunningPanic(t *testing.T) { - defer func() { - if r := recover(); r == nil { - t.Errorf("Expected function to panic, but it did not") - } - }() +func TestNotRunningValue(t *testing.T) { // Set back to zero in case this test gets run multiple times via --count. utime.Store(0) - _ = AccessTime() + at := AccessTime() + if at == 0 { + t.Fatal("Expected non-zero access time") + } + + atn := AccessTime() + if atn != at { + t.Fatal("Did not expect updates to access time") + } } func TestRegisterAndUnregister(t *testing.T) { diff --git a/server/client.go b/server/client.go index af0d022e61d..e9921dbde63 100644 --- a/server/client.go +++ b/server/client.go @@ -234,6 +234,26 @@ const ( pmrMsgImportedFromService ) +type WriteTimeoutPolicy uint8 + +const ( + WriteTimeoutPolicyDefault = iota + WriteTimeoutPolicyClose + WriteTimeoutPolicyRetry +) + +// String returns a human-friendly value. Only used in varz. +func (p WriteTimeoutPolicy) String() string { + switch p { + case WriteTimeoutPolicyClose: + return "close" + case WriteTimeoutPolicyRetry: + return "retry" + default: + return _EMPTY_ + } +} + type client struct { // Here first because of use of atomics, and memory alignment. stats @@ -315,15 +335,16 @@ type pinfo struct { // outbound holds pending data for a socket. type outbound struct { - nb net.Buffers // Pending buffers for send, each has fixed capacity as per nbPool below. - wnb net.Buffers // Working copy of "nb", reused on each flushOutbound call, partial writes may leave entries here for next iteration. - pb int64 // Total pending/queued bytes. - fsp int32 // Flush signals that are pending per producer from readLoop's pcd. - sg *sync.Cond // To signal writeLoop that there is data to flush. - wdl time.Duration // Snapshot of write deadline. - mp int64 // Snapshot of max pending for client. - lft time.Duration // Last flush time for Write. - stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in. + nb net.Buffers // Pending buffers for send, each has fixed capacity as per nbPool below. + wnb net.Buffers // Working copy of "nb", reused on each flushOutbound call, partial writes may leave entries here for next iteration. + pb int64 // Total pending/queued bytes. + fsp int32 // Flush signals that are pending per producer from readLoop's pcd. + wtp WriteTimeoutPolicy // What do we do on a write timeout? + sg *sync.Cond // To signal writeLoop that there is data to flush. + wdl time.Duration // Snapshot of write deadline. + mp int64 // Snapshot of max pending for client. + lft time.Duration // Last flush time for Write. + stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in. cw *s2.Writer } @@ -676,6 +697,32 @@ func (c *client) initClient() { opts := s.getOpts() // Snapshots to avoid mutex access in fast paths. c.out.wdl = opts.WriteDeadline + switch { + case c.kind == ROUTER && opts.Cluster.WriteDeadline > 0: + c.out.wdl = opts.Cluster.WriteDeadline + case c.kind == GATEWAY && opts.Gateway.WriteDeadline > 0: + c.out.wdl = opts.Gateway.WriteDeadline + case c.kind == LEAF && opts.LeafNode.WriteDeadline > 0: + c.out.wdl = opts.LeafNode.WriteDeadline + } + switch c.kind { + case ROUTER: + if c.out.wtp = opts.Cluster.WriteTimeout; c.out.wtp == WriteTimeoutPolicyDefault { + c.out.wtp = WriteTimeoutPolicyRetry + } + case LEAF: + if c.out.wtp = opts.LeafNode.WriteTimeout; c.out.wtp == WriteTimeoutPolicyDefault { + c.out.wtp = WriteTimeoutPolicyRetry + } + case GATEWAY: + if c.out.wtp = opts.Gateway.WriteTimeout; c.out.wtp == WriteTimeoutPolicyDefault { + c.out.wtp = WriteTimeoutPolicyRetry + } + default: + if c.out.wtp = opts.WriteTimeout; c.out.wtp == WriteTimeoutPolicyDefault { + c.out.wtp = WriteTimeoutPolicyClose + } + } c.out.mp = opts.MaxPending // Snapshot max control line since currently can not be changed on reload and we // were checking it on each call to parse. If this changes and we allow MaxControlLine @@ -1827,7 +1874,7 @@ func (c *client) handleWriteTimeout(written, attempted int64, numChunks int) boo scState, c.out.wdl, numChunks, attempted) // We always close CLIENT connections, or when nothing was written at all... - if c.kind == CLIENT || written == 0 { + if c.out.wtp == WriteTimeoutPolicyClose || written == 0 { c.markConnAsClosed(SlowConsumerWriteDeadline) return true } else { diff --git a/server/client_test.go b/server/client_test.go index e6eb614bd3e..875b1b43471 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -3368,3 +3368,112 @@ func TestClientRejectsNRGSubjects(t *testing.T) { require_True(t, strings.HasPrefix(err.Error(), "nats: permissions violation")) }) } + +func TestClientConfigureWriteTimeoutPolicy(t *testing.T) { + for name, policy := range map[string]WriteTimeoutPolicy{ + "Default": WriteTimeoutPolicyDefault, + "Retry": WriteTimeoutPolicyRetry, + "Close": WriteTimeoutPolicyClose, + } { + t.Run(name, func(t *testing.T) { + opts := DefaultOptions() + opts.WriteTimeout = policy + s := RunServer(opts) + defer s.Shutdown() + + nc := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)) + defer nc.Close() + + s.mu.RLock() + defer s.mu.RUnlock() + + for _, r := range s.clients { + if policy == WriteTimeoutPolicyDefault { + require_Equal(t, r.out.wtp, WriteTimeoutPolicyClose) + } else { + require_Equal(t, r.out.wtp, policy) + } + } + }) + } +} + +// TestClientFlushOutboundWriteTimeoutPolicy relies on specifically having +// written at least one byte in order to not trip the "written == 0" close +// condition, so just setting an unrealistically low write deadline won't +// work. Instead what we'll do is write the first byte very quickly and then +// slow down, so that we can trip a more honest slow consumer condition. +type writeTimeoutPolicyWriter struct { + net.Conn + deadline time.Time + written int +} + +func (w *writeTimeoutPolicyWriter) SetWriteDeadline(deadline time.Time) error { + w.deadline = deadline + return w.Conn.SetWriteDeadline(deadline) +} + +func (w *writeTimeoutPolicyWriter) Write(b []byte) (int, error) { + if w.written == 0 { + w.written++ + return w.Conn.Write(b[:1]) + } + time.Sleep(time.Until(w.deadline) + 10*time.Millisecond) + return w.Conn.Write(b) +} + +func TestClientFlushOutboundWriteTimeoutPolicy(t *testing.T) { + for name, policy := range map[string]WriteTimeoutPolicy{ + "Retry": WriteTimeoutPolicyRetry, + "Close": WriteTimeoutPolicyClose, + } { + t.Run(name, func(t *testing.T) { + opts := DefaultOptions() + opts.PingInterval = 250 * time.Millisecond + opts.WriteDeadline = 100 * time.Millisecond + opts.WriteTimeout = policy + s := RunServer(opts) + defer s.Shutdown() + + nc1 := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)) + defer nc1.Close() + + _, err := nc1.Subscribe("test", func(_ *nats.Msg) {}) + require_NoError(t, err) + + nc2 := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)) + defer nc2.Close() + + cid, err := nc1.GetClientID() + require_NoError(t, err) + + client := s.getClient(cid) + client.mu.Lock() + client.out.wdl = 100 * time.Millisecond + client.nc = &writeTimeoutPolicyWriter{Conn: client.nc} + client.mu.Unlock() + + require_NoError(t, nc2.Publish("test", make([]byte, 1024*1024))) + + checkFor(t, 5*time.Second, 10*time.Millisecond, func() error { + client.mu.Lock() + defer client.mu.Unlock() + switch { + case !client.flags.isSet(connMarkedClosed): + return fmt.Errorf("connection not closed yet") + case policy == WriteTimeoutPolicyRetry && client.flags.isSet(isSlowConsumer): + // Retry policy should have marked the client as a slow consumer and + // continued to retry flushes. + return nil + case policy == WriteTimeoutPolicyClose && !client.flags.isSet(isSlowConsumer): + // Close policy shouldn't have marked the client as a slow consumer, + // it will just close it instead. + return nil + default: + return fmt.Errorf("client not in correct state yet") + } + }) + }) + } +} diff --git a/server/events_test.go b/server/events_test.go index c7a18e84cef..f8663d95d2b 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1247,7 +1247,7 @@ func TestAccountReqMonitoring(t *testing.T) { s.EnableJetStream(&JetStreamConfig{StoreDir: t.TempDir()}) unusedAcc, _ := createAccount(s) acc, akp := createAccount(s) - acc.EnableJetStream(nil) + acc.EnableJetStream(nil, nil) subsz := fmt.Sprintf(accDirectReqSubj, acc.Name, "SUBSZ") connz := fmt.Sprintf(accDirectReqSubj, acc.Name, "CONNZ") jsz := fmt.Sprintf(accDirectReqSubj, acc.Name, "JSZ") diff --git a/server/filestore.go b/server/filestore.go index 3ac37f28f4f..d6c5c79835a 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -4302,11 +4302,11 @@ func (mb *msgBlock) skipMsg(seq uint64, now int64) { needsRecord = true mb.dmap.Insert(seq) } - mb.mu.Unlock() - if needsRecord { - mb.writeMsgRecord(emptyRecordLen, seq|ebit, _EMPTY_, nil, nil, now, true) - } else { + mb.writeMsgRecordLocked(emptyRecordLen, seq|ebit, _EMPTY_, nil, nil, now, true, true) + } + mb.mu.Unlock() + if !needsRecord { mb.kickFlusher() } } @@ -4394,10 +4394,9 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error { mb.dmap.Insert(seq) } } - mb.mu.Unlock() - // Write out our placeholder. - mb.writeMsgRecord(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, now, true) + mb.writeMsgRecordLocked(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, now, true, true) + mb.mu.Unlock() // Now update FS accounting. // Update fs state. @@ -6806,8 +6805,8 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { // earlier loop if we've ran out of block file to look at, but should // be easily noticed because the seq will be below the last seq from // the index. - if seq > 0 && seq < mbLastSeq { - for dseq := seq; dseq < mbLastSeq; dseq++ { + if seq > 0 && seq+1 <= mbLastSeq { + for dseq := seq + 1; dseq <= mbLastSeq; dseq++ { idx = append(idx, dbit) if dms == 0 { mb.dmap.Insert(dseq) @@ -9541,12 +9540,17 @@ func timestampNormalized(t time.Time) int64 { // writeFullState will proceed to write the full meta state iff not complex and time consuming. // Since this is for quick recovery it is optional and should not block/stall normal operations. func (fs *fileStore) writeFullState() error { - return fs._writeFullState(false) + return fs._writeFullState(false, true) } -// forceWriteFullState will proceed to write the full meta state. This should only be called by stop() +// forceWriteFullState will proceed to write the full meta state. func (fs *fileStore) forceWriteFullState() error { - return fs._writeFullState(true) + return fs._writeFullState(true, true) +} + +// forceWriteFullStateLocked will proceed to write the full meta state. This should only be called by stop() +func (fs *fileStore) forceWriteFullStateLocked() error { + return fs._writeFullState(true, false) } // This will write the full binary state for the stream. @@ -9556,11 +9560,22 @@ func (fs *fileStore) forceWriteFullState() error { // 2. PSIM - Per Subject Index Map - Tracks first and last blocks with subjects present. // 3. MBs - Index, Bytes, First and Last Sequence and Timestamps, and the deleted map (avl.seqset). // 4. Last block index and hash of record inclusive to this stream state. -func (fs *fileStore) _writeFullState(force bool) error { +func (fs *fileStore) _writeFullState(force, needLock bool) error { + fsLock := func() { + if needLock { + fs.mu.Lock() + } + } + fsUnlock := func() { + if needLock { + fs.mu.Unlock() + } + } + start := time.Now() - fs.mu.Lock() + fsLock() if fs.closed || fs.dirty == 0 { - fs.mu.Unlock() + fsUnlock() return nil } @@ -9579,7 +9594,7 @@ func (fs *fileStore) _writeFullState(force bool) error { numDeleted = int((fs.state.LastSeq - fs.state.FirstSeq + 1) - fs.state.Msgs) } if numSubjects > numThreshold || numDeleted > numThreshold { - fs.mu.Unlock() + fsUnlock() return errStateTooBig } } @@ -9686,13 +9701,15 @@ func (fs *fileStore) _writeFullState(force bool) error { // Encrypt if needed. if fs.prf != nil { if err := fs.setupAEK(); err != nil { - fs.mu.Unlock() + fsUnlock() return err } nonce := make([]byte, fs.aek.NonceSize(), fs.aek.NonceSize()+len(buf)+fs.aek.Overhead()) if n, err := rand.Read(nonce); err != nil { + fsUnlock() return err } else if n != len(nonce) { + fsUnlock() return fmt.Errorf("not enough nonce bytes read (%d != %d)", n, len(nonce)) } buf = fs.aek.Seal(nonce, nonce, buf, nil) @@ -9709,13 +9726,17 @@ func (fs *fileStore) _writeFullState(force bool) error { statesEqual := trackingStatesEqual(&fs.state, &mstate) // Release lock. - fs.mu.Unlock() + fsUnlock() // Check consistency here. if !statesEqual { fs.warn("Stream state encountered internal inconsistency on write") // Rebuild our fs state from the mb state. - fs.rebuildState(nil) + if needLock { + fs.rebuildState(nil) + } else { + fs.rebuildStateLocked(nil) + } return errCorruptState } @@ -9740,24 +9761,30 @@ func (fs *fileStore) _writeFullState(force bool) error { // Update dirty if successful. if err == nil { - fs.mu.Lock() + fsLock() fs.dirty -= priorDirty - fs.mu.Unlock() + fsUnlock() } - return fs.writeTTLState() + return fs.writeTTLState(needLock) } -func (fs *fileStore) writeTTLState() error { +func (fs *fileStore) writeTTLState(needLock bool) error { + if needLock { + fs.mu.RLock() + } if fs.ttls == nil { + if needLock { + fs.mu.RUnlock() + } return nil } - - fs.mu.RLock() fn := filepath.Join(fs.fcfg.StoreDir, msgDir, ttlStreamStateFile) // Must be lseq+1 to identify up to which sequence the TTLs are valid. buf := fs.ttls.Encode(fs.state.LastSeq + 1) - fs.mu.RUnlock() + if needLock { + fs.mu.RUnlock() + } return fs.writeFileWithOptionalSync(fn, buf, defaultFilePerms) } @@ -9775,18 +9802,10 @@ func (fs *fileStore) stop(delete, writeState bool) error { return ErrStoreClosed } - // Mark as closing. Do before releasing the lock to writeFullState + // Mark as closing. Do before releasing the lock to wait on the state flush loop // so we don't end up with this function running more than once. fs.closing = true - if writeState { - fs.checkAndFlushAllBlocks() - } - fs.closeAllMsgBlocks(false) - - fs.cancelSyncTimer() - fs.cancelAgeChk() - // Release the state flusher loop. if fs.qch != nil { close(fs.qch) @@ -9798,10 +9817,17 @@ func (fs *fileStore) stop(delete, writeState bool) error { fsld := fs.fsld fs.mu.Unlock() <-fsld - // Write full state if needed. If not dirty this is a no-op. - fs.forceWriteFullState() fs.mu.Lock() } + fs.closeAllMsgBlocks(false) + + fs.cancelSyncTimer() + fs.cancelAgeChk() + + if writeState { + // Write full state if needed. If not dirty this is a no-op. + fs.forceWriteFullStateLocked() + } // Mark as closed. Last message block needs to be cleared after // writeFullState has completed. diff --git a/server/gateway_test.go b/server/gateway_test.go index ae7aae1ad06..d1009a8a228 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -7471,3 +7471,83 @@ func TestGatewayOutboundDetectsStaleConnectionIfNoInfo(t *testing.T) { wg.Wait() s.WaitForShutdown() } + +func TestGatewayConfigureWriteDeadline(t *testing.T) { + o1 := testDefaultOptionsForGateway("B") + o1.Gateway.WriteDeadline = 5 * time.Second + s1 := runGatewayServer(o1) + defer s1.Shutdown() + + o2 := testGatewayOptionsFromToWithServers(t, "A", "B", s1) + s2 := runGatewayServer(o2) + defer s2.Shutdown() + + waitForOutboundGateways(t, s2, 1, time.Second) + waitForInboundGateways(t, s1, 1, time.Second) + waitForOutboundGateways(t, s1, 1, time.Second) + + s1.mu.RLock() + defer s1.mu.RUnlock() + + s1.gateway.RLock() + defer s1.gateway.RUnlock() + + for _, r := range s1.gateway.out { + r.mu.Lock() + wdl := r.out.wdl + r.mu.Unlock() + require_Equal(t, wdl, 5*time.Second) + } + + for _, r := range s1.gateway.in { + r.mu.Lock() + wdl := r.out.wdl + r.mu.Unlock() + require_Equal(t, wdl, 5*time.Second) + } +} + +func TestGatewayConfigureWriteTimeoutPolicy(t *testing.T) { + for name, policy := range map[string]WriteTimeoutPolicy{ + "Default": WriteTimeoutPolicyDefault, + "Retry": WriteTimeoutPolicyRetry, + "Close": WriteTimeoutPolicyClose, + } { + t.Run(name, func(t *testing.T) { + o1 := testDefaultOptionsForGateway("B") + o1.Gateway.WriteTimeout = policy + s1 := runGatewayServer(o1) + defer s1.Shutdown() + + o2 := testGatewayOptionsFromToWithServers(t, "A", "B", s1) + s2 := runGatewayServer(o2) + defer s2.Shutdown() + + waitForOutboundGateways(t, s2, 1, time.Second) + waitForInboundGateways(t, s1, 1, time.Second) + waitForOutboundGateways(t, s1, 1, time.Second) + + s1.mu.RLock() + defer s1.mu.RUnlock() + + s1.gateway.RLock() + defer s1.gateway.RUnlock() + + for _, r := range s1.gateway.out { + if policy == WriteTimeoutPolicyDefault { + require_Equal(t, r.out.wtp, WriteTimeoutPolicyRetry) + } else { + require_Equal(t, r.out.wtp, policy) + } + } + + for _, r := range s1.gateway.in { + if policy == WriteTimeoutPolicyDefault { + require_Equal(t, r.out.wtp, WriteTimeoutPolicyRetry) + } else { + require_Equal(t, r.out.wtp, policy) + } + } + }) + } +} diff --git a/server/jetstream.go b/server/jetstream.go index d676f64022e..fa3c2a64787 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -194,6 +194,11 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error { } s.Noticef("Starting JetStream") + start := time.Now() + defer func() { + s.Noticef("Took %s to start JetStream", time.Since(start)) + }() + if config == nil || config.MaxMemory <= 0 || config.MaxStore <= 0 { var storeDir, domain, uniqueTag string var maxStore, maxMem int64 @@ -686,6 +691,11 @@ func (s *Server) DisableJetStream() error { } func (s *Server) enableJetStreamAccounts() error { + // Reuse the same task workers across all accounts, so that we don't explode + // with a large number of goroutines on multi-account systems. + tq := parallelTaskQueue(0) + defer close(tq) + // If we have no configured accounts setup then setup imports on global account. if s.globalAccountOnly() { gacc := s.GlobalAccount() @@ -694,10 +704,10 @@ func (s *Server) enableJetStreamAccounts() error { gacc.jsLimits = defaultJSAccountTiers } gacc.mu.Unlock() - if err := s.configJetStream(gacc); err != nil { + if err := s.configJetStream(gacc, tq); err != nil { return err } - } else if err := s.configAllJetStreamAccounts(); err != nil { + } else if err := s.configAllJetStreamAccounts(tq); err != nil { return fmt.Errorf("Error enabling jetstream on configured accounts: %v", err) } return nil @@ -761,7 +771,7 @@ func (a *Account) enableJetStreamInfoServiceImportOnly() error { return a.enableAllJetStreamServiceImportsAndMappings() } -func (s *Server) configJetStream(acc *Account) error { +func (s *Server) configJetStream(acc *Account, tq chan<- func()) error { if acc == nil { return nil } @@ -778,7 +788,7 @@ func (s *Server) configJetStream(acc *Account) error { return err } } else { - if err := acc.EnableJetStream(jsLimits); err != nil { + if err := acc.EnableJetStream(jsLimits, tq); err != nil { return err } if s.gateway.enabled { @@ -799,7 +809,7 @@ func (s *Server) configJetStream(acc *Account) error { } // configAllJetStreamAccounts walk all configured accounts and turn on jetstream if requested. -func (s *Server) configAllJetStreamAccounts() error { +func (s *Server) configAllJetStreamAccounts(tq chan<- func()) error { // Check to see if system account has been enabled. We could arrive here via reload and // a non-default system account. s.checkJetStreamExports() @@ -839,7 +849,7 @@ func (s *Server) configAllJetStreamAccounts() error { // Process any jetstream enabled accounts here. These will be accounts we are // already aware of at startup etc. for _, acc := range jsAccounts { - if err := s.configJetStream(acc); err != nil { + if err := s.configJetStream(acc, tq); err != nil { return err } } @@ -852,7 +862,7 @@ func (s *Server) configAllJetStreamAccounts() error { // Only load up ones not already loaded since they are processed above. if _, ok := accounts.Load(accName); !ok { if acc, err := s.lookupAccount(accName); err != nil && acc != nil { - if err := s.configJetStream(acc); err != nil { + if err := s.configJetStream(acc, tq); err != nil { return err } } @@ -1100,7 +1110,7 @@ func (a *Account) assignJetStreamLimits(limits map[string]JetStreamAccountLimits // EnableJetStream will enable JetStream on this account with the defined limits. // This is a helper for JetStreamEnableAccount. -func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) error { +func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq chan<- func()) error { a.mu.RLock() s := a.srv a.mu.RUnlock() @@ -1253,30 +1263,139 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro } } - // Collect consumers, do after all streams. - type ce struct { - mset *stream - odir string - } - var consumers []*ce - - // Collect any interest policy streams to check for - // https://github.com/nats-io/nats-server/issues/3612 - var ipstreams []*stream - // Remember if we should be encrypted and what cipher we think we should use. encrypted := s.getOpts().JetStreamKey != _EMPTY_ - plaintext := true sc := s.getOpts().JetStreamCipher + doConsumers := func(mset *stream, odir string) { + ofis, _ := os.ReadDir(odir) + if len(ofis) > 0 { + s.Noticef(" Recovering %d consumers for stream - '%s > %s'", len(ofis), mset.accName(), mset.name()) + } + for _, ofi := range ofis { + metafile := filepath.Join(odir, ofi.Name(), JetStreamMetaFile) + metasum := filepath.Join(odir, ofi.Name(), JetStreamMetaFileSum) + if _, err := os.Stat(metafile); os.IsNotExist(err) { + s.Warnf(" Missing consumer metafile %q", metafile) + continue + } + buf, err := os.ReadFile(metafile) + if err != nil { + s.Warnf(" Error reading consumer metafile %q: %v", metafile, err) + continue + } + if _, err := os.Stat(metasum); os.IsNotExist(err) { + s.Warnf(" Missing consumer checksum for %q", metasum) + continue + } + + // Check if we are encrypted. + if key, err := os.ReadFile(filepath.Join(odir, ofi.Name(), JetStreamMetaFileKey)); err == nil { + s.Debugf(" Consumer metafile is encrypted, reading encrypted keyfile") + // Decode the buffer before proceeding. + ctxName := mset.name() + tsep + ofi.Name() + nbuf, _, err := s.decryptMeta(sc, key, buf, a.Name, ctxName) + if err != nil { + s.Warnf(" Error decrypting our consumer metafile: %v", err) + continue + } + buf = nbuf + } + + var cfg FileConsumerInfo + decoder := json.NewDecoder(bytes.NewReader(buf)) + decoder.DisallowUnknownFields() + strictErr := decoder.Decode(&cfg) + if strictErr != nil { + cfg = FileConsumerInfo{} + if err := json.Unmarshal(buf, &cfg); err != nil { + s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, err) + continue + } + } + if supported := supportsRequiredApiLevel(cfg.Metadata); !supported || strictErr != nil { + var offlineReason string + if !supported { + apiLevel := getRequiredApiLevel(cfg.Metadata) + if strictErr != nil { + offlineReason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(strictErr.Error(), "json: ")) + } else { + offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel) + } + s.Warnf(" Detected unsupported consumer '%s > %s > %s': %s", a.Name, mset.name(), cfg.Name, offlineReason) + } else { + offlineReason = fmt.Sprintf("decoding error: %v", strictErr) + s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, strictErr) + } + singleServerMode := !s.JetStreamIsClustered() && s.standAloneMode() + if singleServerMode { + if !mset.closed.Load() { + s.Warnf(" Stopping unsupported stream '%s > %s'", a.Name, mset.name()) + mset.mu.Lock() + mset.offlineReason = fmt.Sprintf("stopped - unsupported consumer %q", cfg.Name) + mset.mu.Unlock() + mset.stop(false, false) + } + + // Fake a consumer, so we can respond to API requests as single-server. + o := &consumer{ + mset: mset, + js: s.getJetStream(), + acc: a, + srv: s, + cfg: cfg.ConsumerConfig, + active: false, + stream: mset.name(), + name: cfg.Name, + dseq: 1, + sseq: 1, + created: time.Now().UTC(), + closed: true, + offlineReason: offlineReason, + } + if !cfg.Created.IsZero() { + o.created = cfg.Created + } + + mset.mu.Lock() + mset.setConsumer(o) + mset.mu.Unlock() + } + continue + } + + isEphemeral := !isDurableConsumer(&cfg.ConsumerConfig) + if isEphemeral { + // This is an ephemeral consumer and this could fail on restart until + // the consumer can reconnect. We will create it as a durable and switch it. + cfg.ConsumerConfig.Durable = ofi.Name() + } + obs, err := mset.addConsumerWithAssignment(&cfg.ConsumerConfig, _EMPTY_, nil, true, ActionCreateOrUpdate, false) + if err != nil { + s.Warnf(" Error adding consumer %q: %v", cfg.Name, err) + continue + } + if isEphemeral { + obs.switchToEphemeral() + } + if !cfg.Created.IsZero() { + obs.setCreatedTime(cfg.Created) + } + if err != nil { + s.Warnf(" Error restoring consumer %q state: %v", cfg.Name, err) + } + } + } + // Now recover the streams. fis, _ := os.ReadDir(sdir) - for _, fi := range fis { + doStream := func(fi os.DirEntry) error { + plaintext := true mdir := filepath.Join(sdir, fi.Name()) // Check for partially deleted streams. They are marked with "." prefix. if strings.HasPrefix(fi.Name(), tsep) { go os.RemoveAll(mdir) - continue + return nil } key := sha256.Sum256([]byte(fi.Name())) hh, err := highwayhash.New64(key[:]) @@ -1287,27 +1406,27 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro metasum := filepath.Join(mdir, JetStreamMetaFileSum) if _, err := os.Stat(metafile); os.IsNotExist(err) { s.Warnf(" Missing stream metafile for %q", metafile) - continue + return nil } buf, err := os.ReadFile(metafile) if err != nil { s.Warnf(" Error reading metafile %q: %v", metafile, err) - continue + return nil } if _, err := os.Stat(metasum); os.IsNotExist(err) { s.Warnf(" Missing stream checksum file %q", metasum) - continue + return nil } sum, err := os.ReadFile(metasum) if err != nil { s.Warnf(" Error reading Stream metafile checksum %q: %v", metasum, err) - continue + return nil } hh.Write(buf) checksum := hex.EncodeToString(hh.Sum(nil)) if checksum != string(sum) { s.Warnf(" Stream metafile %q: checksums do not match %q vs %q", metafile, sum, checksum) - continue + return nil } // Track if we are converting ciphers. @@ -1320,14 +1439,14 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro s.Debugf(" Stream metafile is encrypted, reading encrypted keyfile") if len(keyBuf) < minMetaKeySize { s.Warnf(" Bad stream encryption key length of %d", len(keyBuf)) - continue + return nil } // Decode the buffer before proceeding. var nbuf []byte nbuf, convertingCiphers, err = s.decryptMeta(sc, keyBuf, buf, a.Name, fi.Name()) if err != nil { s.Warnf(" Error decrypting our stream metafile: %v", err) - continue + return nil } buf = nbuf plaintext = false @@ -1341,15 +1460,19 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro cfg = FileStreamInfo{} if err := json.Unmarshal(buf, &cfg); err != nil { s.Warnf(" Error unmarshalling stream metafile %q: %v", metafile, err) - continue + return nil } } if supported := supportsRequiredApiLevel(cfg.Metadata); !supported || strictErr != nil { var offlineReason string if !supported { apiLevel := getRequiredApiLevel(cfg.Metadata) - offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel) - s.Warnf(" Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", a.Name, cfg.StreamConfig.Name, apiLevel) + if strictErr != nil { + offlineReason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(strictErr.Error(), "json: ")) + } else { + offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel) + } + s.Warnf(" Detected unsupported stream '%s > %s': %s", a.Name, cfg.StreamConfig.Name, offlineReason) } else { offlineReason = fmt.Sprintf("decoding error: %v", strictErr) s.Warnf(" Error unmarshalling stream metafile %q: %v", metafile, strictErr) @@ -1380,13 +1503,16 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro // Now do the consumers. odir := filepath.Join(sdir, fi.Name(), consumerDir) - consumers = append(consumers, &ce{mset, odir}) + doConsumers(mset, odir) } - continue + return nil } if cfg.Template != _EMPTY_ { - if err := jsa.addStreamNameToTemplate(cfg.Template, cfg.Name); err != nil { + jsa.mu.Lock() + err := jsa.addStreamNameToTemplate(cfg.Template, cfg.Name) + jsa.mu.Unlock() + if err != nil { s.Warnf(" Error adding stream %q to template %q: %v", cfg.Name, cfg.Template, err) } } @@ -1411,7 +1537,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro } } if hadSubjErr { - continue + return nil } // The other possible bug is assigning subjects to mirrors, so check for that and patch as well. @@ -1445,7 +1571,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro s.Warnf(" Error replacing meta keyfile for stream %q: %v", cfg.Name, err) } } - continue + return nil } if !cfg.Created.IsZero() { mset.setCreatedTime(cfg.Created) @@ -1455,142 +1581,41 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro s.Noticef(" Restored %s messages for stream '%s > %s' in %v", comma(int64(state.Msgs)), mset.accName(), mset.name(), time.Since(rt).Round(time.Millisecond)) + // Now do the consumers. + odir := filepath.Join(sdir, fi.Name(), consumerDir) + doConsumers(mset, odir) + // Collect to check for dangling messages. // TODO(dlc) - Can be removed eventually. if cfg.StreamConfig.Retention == InterestPolicy { - ipstreams = append(ipstreams, mset) + mset.checkForOrphanMsgs() + mset.checkConsumerReplication() } - // Now do the consumers. - odir := filepath.Join(sdir, fi.Name(), consumerDir) - consumers = append(consumers, &ce{mset, odir}) + return nil } - for _, e := range consumers { - ofis, _ := os.ReadDir(e.odir) - if len(ofis) > 0 { - s.Noticef(" Recovering %d consumers for stream - '%s > %s'", len(ofis), e.mset.accName(), e.mset.name()) - } - for _, ofi := range ofis { - metafile := filepath.Join(e.odir, ofi.Name(), JetStreamMetaFile) - metasum := filepath.Join(e.odir, ofi.Name(), JetStreamMetaFileSum) - if _, err := os.Stat(metafile); os.IsNotExist(err) { - s.Warnf(" Missing consumer metafile %q", metafile) - continue - } - buf, err := os.ReadFile(metafile) - if err != nil { - s.Warnf(" Error reading consumer metafile %q: %v", metafile, err) - continue - } - if _, err := os.Stat(metasum); os.IsNotExist(err) { - s.Warnf(" Missing consumer checksum for %q", metasum) - continue - } - - // Check if we are encrypted. - if key, err := os.ReadFile(filepath.Join(e.odir, ofi.Name(), JetStreamMetaFileKey)); err == nil { - s.Debugf(" Consumer metafile is encrypted, reading encrypted keyfile") - // Decode the buffer before proceeding. - ctxName := e.mset.name() + tsep + ofi.Name() - nbuf, _, err := s.decryptMeta(sc, key, buf, a.Name, ctxName) - if err != nil { - s.Warnf(" Error decrypting our consumer metafile: %v", err) - continue - } - buf = nbuf - } - - var cfg FileConsumerInfo - decoder := json.NewDecoder(bytes.NewReader(buf)) - decoder.DisallowUnknownFields() - strictErr := decoder.Decode(&cfg) - if strictErr != nil { - cfg = FileConsumerInfo{} - if err := json.Unmarshal(buf, &cfg); err != nil { - s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, err) - continue - } - } - if supported := supportsRequiredApiLevel(cfg.Metadata); !supported || strictErr != nil { - var offlineReason string - if !supported { - apiLevel := getRequiredApiLevel(cfg.Metadata) - offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel) - s.Warnf(" Detected unsupported consumer '%s > %s > %s', delete the consumer or upgrade the server to API level %s", a.Name, e.mset.name(), cfg.Name, apiLevel) - } else { - offlineReason = fmt.Sprintf("decoding error: %v", strictErr) - s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, strictErr) - } - singleServerMode := !s.JetStreamIsClustered() && s.standAloneMode() - if singleServerMode { - if !e.mset.closed.Load() { - s.Warnf(" Stopping unsupported stream '%s > %s'", a.Name, e.mset.name()) - e.mset.mu.Lock() - e.mset.offlineReason = "stopped" - e.mset.mu.Unlock() - e.mset.stop(false, false) - } - - // Fake a consumer, so we can respond to API requests as single-server. - o := &consumer{ - mset: e.mset, - js: s.getJetStream(), - acc: a, - srv: s, - cfg: cfg.ConsumerConfig, - active: false, - stream: e.mset.name(), - name: cfg.Name, - dseq: 1, - sseq: 1, - created: time.Now().UTC(), - closed: true, - offlineReason: offlineReason, - } - if !cfg.Created.IsZero() { - o.created = cfg.Created - } - - e.mset.mu.Lock() - e.mset.setConsumer(o) - e.mset.mu.Unlock() - } - continue - } - - isEphemeral := !isDurableConsumer(&cfg.ConsumerConfig) - if isEphemeral { - // This is an ephemeral consumer and this could fail on restart until - // the consumer can reconnect. We will create it as a durable and switch it. - cfg.ConsumerConfig.Durable = ofi.Name() - } - obs, err := e.mset.addConsumerWithAssignment(&cfg.ConsumerConfig, _EMPTY_, nil, true, ActionCreateOrUpdate, false) - if err != nil { - s.Warnf(" Error adding consumer %q: %v", cfg.Name, err) - continue - } - if isEphemeral { - obs.switchToEphemeral() - } - if !cfg.Created.IsZero() { - obs.setCreatedTime(cfg.Created) - } - if err != nil { - s.Warnf(" Error restoring consumer %q state: %v", cfg.Name, err) + if tq != nil { + // If a parallelTaskQueue was provided then use that for concurrency. + var wg sync.WaitGroup + wg.Add(len(fis)) + for _, fi := range fis { + tq <- func() { + doStream(fi) + wg.Done() } } + wg.Wait() + } else { + // No parallelTaskQueue provided, do inline as before. + for _, fi := range fis { + doStream(fi) + } } // Make sure to cleanup any old remaining snapshots. os.RemoveAll(filepath.Join(jsa.storeDir, snapsDir)) - // Check interest policy streams for auto cleanup. - for _, mset := range ipstreams { - mset.checkForOrphanMsgs() - mset.checkConsumerReplication() - } - s.Debugf("JetStream state for account %q recovered", a.Name) return nil diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 64c492965b8..1467b7cc508 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -158,9 +158,15 @@ type unsupportedStreamAssignment struct { infoSub *subscription } -func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment) *unsupportedStreamAssignment { +func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment, err error) *unsupportedStreamAssignment { reason := "stopped" - if sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata) { + if err != nil { + if errstr := err.Error(); strings.HasPrefix(errstr, "json:") { + reason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(err.Error(), "json: ")) + } else { + reason = fmt.Sprintf("stopped - %s", errstr) + } + } else if sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata) { if req := getRequiredApiLevel(sa.Config.Metadata); req != _EMPTY_ { reason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", req, JSApiLevel) } @@ -236,9 +242,15 @@ type unsupportedConsumerAssignment struct { infoSub *subscription } -func newUnsupportedConsumerAssignment(ca *consumerAssignment) *unsupportedConsumerAssignment { +func newUnsupportedConsumerAssignment(ca *consumerAssignment, err error) *unsupportedConsumerAssignment { reason := "stopped" - if ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata) { + if err != nil { + if errstr := err.Error(); strings.HasPrefix(errstr, "json:") { + reason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(err.Error(), "json: ")) + } else { + reason = fmt.Sprintf("stopped - %s", errstr) + } + } else if ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata) { if req := getRequiredApiLevel(ca.Config.Metadata); req != _EMPTY_ { reason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", getRequiredApiLevel(ca.Config.Metadata), JSApiLevel) } @@ -1204,6 +1216,52 @@ type recoveryUpdates struct { updateConsumers map[string]map[string]*consumerAssignment } +func (ru *recoveryUpdates) removeStream(sa *streamAssignment) { + key := sa.recoveryKey() + ru.removeStreams[key] = sa + delete(ru.addStreams, key) + delete(ru.updateStreams, key) + delete(ru.updateConsumers, key) + delete(ru.removeConsumers, key) +} + +func (ru *recoveryUpdates) addStream(sa *streamAssignment) { + key := sa.recoveryKey() + ru.addStreams[key] = sa + delete(ru.removeStreams, key) +} + +func (ru *recoveryUpdates) updateStream(sa *streamAssignment) { + key := sa.recoveryKey() + ru.updateStreams[key] = sa + delete(ru.addStreams, key) + delete(ru.removeStreams, key) +} + +func (ru *recoveryUpdates) removeConsumer(ca *consumerAssignment) { + key := ca.recoveryKey() + skey := ca.streamRecoveryKey() + if _, ok := ru.removeConsumers[skey]; !ok { + ru.removeConsumers[skey] = map[string]*consumerAssignment{} + } + ru.removeConsumers[skey][key] = ca + if consumers, ok := ru.updateConsumers[skey]; ok { + delete(consumers, key) + } +} + +func (ru *recoveryUpdates) addOrUpdateConsumer(ca *consumerAssignment) { + key := ca.recoveryKey() + skey := ca.streamRecoveryKey() + if consumers, ok := ru.removeConsumers[skey]; ok { + delete(consumers, key) + } + if _, ok := ru.updateConsumers[skey]; !ok { + ru.updateConsumers[skey] = map[string]*consumerAssignment{} + } + ru.updateConsumers[skey][key] = ca +} + // Called after recovery of the cluster on startup to check for any orphans. // Streams and consumers are recovered from disk, and the meta layer's mappings // should clean them up, but under crash scenarios there could be orphans. @@ -1698,12 +1756,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove for _, sa := range saDel { js.setStreamAssignmentRecovering(sa) if isRecovering { - key := sa.recoveryKey() - ru.removeStreams[key] = sa - delete(ru.addStreams, key) - delete(ru.updateStreams, key) - delete(ru.updateConsumers, key) - delete(ru.removeConsumers, key) + ru.removeStream(sa) } else { js.processStreamRemoval(sa) } @@ -1711,12 +1764,20 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove // Now do add for the streams. Also add in all consumers. for _, sa := range saAdd { js.setStreamAssignmentRecovering(sa) - js.processStreamAssignment(sa) + if isRecovering { + ru.addStream(sa) + } else { + js.processStreamAssignment(sa) + } // We can simply process the consumers. for _, ca := range sa.consumers { js.setConsumerAssignmentRecovering(ca) - js.processConsumerAssignment(ca) + if isRecovering { + ru.addOrUpdateConsumer(ca) + } else { + js.processConsumerAssignment(ca) + } } } @@ -1725,10 +1786,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove for _, sa := range saChk { js.setStreamAssignmentRecovering(sa) if isRecovering { - key := sa.recoveryKey() - ru.updateStreams[key] = sa - delete(ru.addStreams, key) - delete(ru.removeStreams, key) + ru.updateStream(sa) } else { js.processUpdateStreamAssignment(sa) } @@ -1738,15 +1796,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove for _, ca := range caDel { js.setConsumerAssignmentRecovering(ca) if isRecovering { - key := ca.recoveryKey() - skey := ca.streamRecoveryKey() - if _, ok := ru.removeConsumers[skey]; !ok { - ru.removeConsumers[skey] = map[string]*consumerAssignment{} - } - ru.removeConsumers[skey][key] = ca - if consumers, ok := ru.updateConsumers[skey]; ok { - delete(consumers, key) - } + ru.removeConsumer(ca) } else { js.processConsumerRemoval(ca) } @@ -1754,15 +1804,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove for _, ca := range caAdd { js.setConsumerAssignmentRecovering(ca) if isRecovering { - key := ca.recoveryKey() - skey := ca.streamRecoveryKey() - if consumers, ok := ru.removeConsumers[skey]; ok { - delete(consumers, key) - } - if _, ok := ru.updateConsumers[skey]; !ok { - ru.updateConsumers[skey] = map[string]*consumerAssignment{} - } - ru.updateConsumers[skey][key] = ca + ru.addOrUpdateConsumer(ca) } else { js.processConsumerAssignment(ca) } @@ -2028,9 +2070,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo } if isRecovering { js.setStreamAssignmentRecovering(sa) - key := sa.recoveryKey() - ru.addStreams[key] = sa - delete(ru.removeStreams, key) + ru.addStream(sa) } else { js.processStreamAssignment(sa) } @@ -2042,12 +2082,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo } if isRecovering { js.setStreamAssignmentRecovering(sa) - key := sa.recoveryKey() - ru.removeStreams[key] = sa - delete(ru.addStreams, key) - delete(ru.updateStreams, key) - delete(ru.updateConsumers, key) - delete(ru.removeConsumers, key) + ru.removeStream(sa) } else { js.processStreamRemoval(sa) } @@ -2059,15 +2094,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo } if isRecovering { js.setConsumerAssignmentRecovering(ca) - key := ca.recoveryKey() - skey := ca.streamRecoveryKey() - if consumers, ok := ru.removeConsumers[skey]; ok { - delete(consumers, key) - } - if _, ok := ru.updateConsumers[skey]; !ok { - ru.updateConsumers[skey] = map[string]*consumerAssignment{} - } - ru.updateConsumers[skey][key] = ca + ru.addOrUpdateConsumer(ca) } else { js.processConsumerAssignment(ca) } @@ -2099,15 +2126,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo } if isRecovering { js.setConsumerAssignmentRecovering(ca) - key := ca.recoveryKey() - skey := ca.streamRecoveryKey() - if _, ok := ru.removeConsumers[skey]; !ok { - ru.removeConsumers[skey] = map[string]*consumerAssignment{} - } - ru.removeConsumers[skey][key] = ca - if consumers, ok := ru.updateConsumers[skey]; ok { - delete(consumers, key) - } + ru.removeConsumer(ca) } else { js.processConsumerRemoval(ca) } @@ -2119,10 +2138,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo } if isRecovering { js.setStreamAssignmentRecovering(sa) - key := sa.recoveryKey() - ru.updateStreams[key] = sa - delete(ru.addStreams, key) - delete(ru.removeStreams, key) + ru.updateStream(sa) } else { js.processUpdateStreamAssignment(sa) } @@ -2581,11 +2597,19 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps return case <-mqch: // Clean signal from shutdown routine so do best effort attempt to snapshot. - doSnapshot() + // Don't snapshot if not shutting down, monitor goroutine could be going away + // on a scale down or a remove for example. + if s.isShuttingDown() { + doSnapshot() + } return case <-qch: // Clean signal from shutdown routine so do best effort attempt to snapshot. - doSnapshot() + // Don't snapshot if not shutting down, Raft node could be going away on a + // scale down or remove for example. + if s.isShuttingDown() { + doSnapshot() + } return case <-aq.ch: var ne, nb uint64 @@ -3663,8 +3687,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) { // If unsupported, we can't register any further. if sa.unsupported != nil { sa.unsupported.setupInfoSub(s, sa) - apiLevel := getRequiredApiLevel(sa.Config.Metadata) - s.Warnf("Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", accName, stream, apiLevel) + s.Warnf("Detected unsupported stream '%s > %s': %s", accName, stream, sa.unsupported.reason) js.mu.Unlock() // Need to stop the stream, we can't keep running with an old config. @@ -3793,8 +3816,7 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) { // If unsupported, we can't register any further. if sa.unsupported != nil { sa.unsupported.setupInfoSub(s, sa) - apiLevel := getRequiredApiLevel(sa.Config.Metadata) - s.Warnf("Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", accName, stream, apiLevel) + s.Warnf("Detected unsupported stream '%s > %s': %s", accName, stream, sa.unsupported.reason) js.mu.Unlock() // Need to stop the stream, we can't keep running with an old config. @@ -4474,12 +4496,11 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { // If unsupported, we can't register any further. if ca.unsupported != nil { ca.unsupported.setupInfoSub(s, ca) - apiLevel := getRequiredApiLevel(ca.Config.Metadata) - s.Warnf("Detected unsupported consumer '%s > %s > %s', delete the consumer or upgrade the server to API level %s", accName, stream, ca.Name, apiLevel) + s.Warnf("Detected unsupported consumer '%s > %s > %s': %s", accName, stream, ca.Name, ca.unsupported.reason) // Mark stream as unsupported as well if sa.unsupported == nil { - sa.unsupported = newUnsupportedStreamAssignment(s, sa) + sa.unsupported = newUnsupportedStreamAssignment(s, sa, fmt.Errorf("unsupported consumer %q", ca.Name)) } sa.unsupported.setupInfoSub(s, sa) js.mu.Unlock() @@ -5161,11 +5182,19 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { return case <-mqch: // Clean signal from shutdown routine so do best effort attempt to snapshot. - doSnapshot(false) + // Don't snapshot if not shutting down, monitor goroutine could be going away + // on a scale down or a remove for example. + if s.isShuttingDown() { + doSnapshot(false) + } return case <-qch: // Clean signal from shutdown routine so do best effort attempt to snapshot. - doSnapshot(false) + // Don't snapshot if not shutting down, Raft node could be going away on a + // scale down or remove for example. + if s.isShuttingDown() { + doSnapshot(false) + } return case <-aq.ch: ces := aq.pop() @@ -7608,20 +7637,21 @@ func decodeStreamAssignment(s *Server, buf []byte) (*streamAssignment, error) { func decodeStreamAssignmentConfig(s *Server, sa *streamAssignment) error { var unsupported bool var cfg StreamConfig + var err error decoder := json.NewDecoder(bytes.NewReader(sa.ConfigJSON)) decoder.DisallowUnknownFields() - if err := decoder.Decode(&cfg); err != nil { + if err = decoder.Decode(&cfg); err != nil { unsupported = true cfg = StreamConfig{} - if err = json.Unmarshal(sa.ConfigJSON, &cfg); err != nil { - return err + if err2 := json.Unmarshal(sa.ConfigJSON, &cfg); err2 != nil { + return err2 } } sa.Config = &cfg fixCfgMirrorWithDedupWindow(sa.Config) - if unsupported || (sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata)) { - sa.unsupported = newUnsupportedStreamAssignment(s, sa) + if unsupported || err != nil || (sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata)) { + sa.unsupported = newUnsupportedStreamAssignment(s, sa, err) } return nil } @@ -8068,18 +8098,19 @@ func decodeConsumerAssignment(buf []byte) (*consumerAssignment, error) { func decodeConsumerAssignmentConfig(ca *consumerAssignment) error { var unsupported bool var cfg ConsumerConfig + var err error decoder := json.NewDecoder(bytes.NewReader(ca.ConfigJSON)) decoder.DisallowUnknownFields() - if err := decoder.Decode(&cfg); err != nil { + if err = decoder.Decode(&cfg); err != nil { unsupported = true cfg = ConsumerConfig{} - if err = json.Unmarshal(ca.ConfigJSON, &cfg); err != nil { - return err + if err2 := json.Unmarshal(ca.ConfigJSON, &cfg); err2 != nil { + return err2 } } ca.Config = &cfg - if unsupported || (ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata)) { - ca.unsupported = newUnsupportedConsumerAssignment(ca) + if unsupported || err != nil || (ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata)) { + ca.unsupported = newUnsupportedConsumerAssignment(ca, err) } return nil } @@ -8292,6 +8323,13 @@ func (mset *stream) stateSnapshotLocked() []byte { } // Older v1 version with deleted as a sorted []uint64. + // For a stream with millions or billions of interior deletes, this will be huge. + // Now that all server versions 2.10.+ support binary snapshots, we should never fall back. + assert.Unreachable("Legacy JSON stream snapshot used", map[string]any{ + "stream": mset.cfg.Name, + "account": mset.acc.Name, + }) + state := mset.store.State() snap := &streamSnapshot{ Msgs: state.Msgs, @@ -9562,6 +9600,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { } } + start := time.Now() mset.setCatchupPeer(sreq.Peer, last-seq) var spb int @@ -9570,7 +9609,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { sendNextBatchAndContinue := func(qch chan struct{}) bool { // Check if we know we will not enter the loop because we are done. if seq > last { - s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name()) + s.Noticef("Catchup for stream '%s > %s' complete (took %v)", mset.account(), mset.name(), time.Since(start)) // EOF s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil) return false @@ -9639,7 +9678,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { // See if we should use LoadNextMsg instead of walking sequence by sequence if we have an order magnitude more interior deletes. // Only makes sense with delete range capabilities. - useLoadNext := drOk && (uint64(state.NumDeleted) > 10*state.Msgs) + useLoadNext := drOk && (uint64(state.NumDeleted) > 2*state.Msgs || state.NumDeleted > 1_000_000) var smv StoreMsg for ; seq <= last && atomic.LoadInt64(&outb) <= maxOutBytes && atomic.LoadInt32(&outm) <= maxOutMsgs && s.gcbBelowMax(); seq++ { @@ -9679,8 +9718,8 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { // The snapshot has a larger last sequence then we have. This could be due to a truncation // when trying to recover after corruption, still not 100% sure. Could be off by 1 too somehow, // but tested a ton of those with no success. - s.Warnf("Catchup for stream '%s > %s' completed, but requested sequence %d was larger than current state: %+v", - mset.account(), mset.name(), seq, state) + s.Warnf("Catchup for stream '%s > %s' completed (took %v), but requested sequence %d was larger than current state: %+v", + mset.account(), mset.name(), time.Since(start), seq, state) // Try our best to redo our invalidated snapshot as well. if n := mset.raftNode(); n != nil { if snap := mset.stateSnapshot(); snap != nil { @@ -9726,7 +9765,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { if drOk && dr.First > 0 { sendDR() } - s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name()) + s.Noticef("Catchup for stream '%s > %s' complete (took %v)", mset.account(), mset.name(), time.Since(start)) // EOF s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil) return false diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index a7ce9fea6f1..c0572e41e51 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -9203,7 +9203,7 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *tes // Stream should also be reported as offline. // Specifically, as "stopped" because it's still supported, but can't run due to the unsupported consumer. - expectStreamInfo("stopped", "DowngradeConsumerTest") + expectStreamInfo("stopped - unsupported consumer \"DowngradeConsumerTest\"", "DowngradeConsumerTest") } // Consumer should be reported as offline, but healthz should report healthy to not block downgrades. diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index a9851482f7f..912f041fe31 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -4744,7 +4744,7 @@ func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) { checkHealth := func() { t.Helper() - checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { for _, s := range c.servers { status := s.healthz(nil) if status.Error != _EMPTY_ { diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 1ac2742d5df..c2f7bef9063 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -4105,6 +4105,88 @@ func TestJetStreamClusterMetaSnapshotMustNotIncludePendingConsumers(t *testing.T } } +func TestJetStreamClusterMetaSnapshotReCreateConsistency(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + scfg := &nats.StreamConfig{Name: "TEST", Replicas: 3} + _, err := js.AddStream(scfg) + require_NoError(t, err) + + ccfg := &nats.ConsumerConfig{Name: "consumer", Replicas: 3} + _, err = js.AddConsumer("TEST", ccfg) + require_NoError(t, err) + + ml := c.leader() + mjs := ml.getJetStream() + mjs.mu.Lock() + sa := mjs.streamAssignment(globalAccountName, "TEST") + ca := mjs.consumerAssignment(globalAccountName, "TEST", "consumer") + + oldStreamGroup := sa.Group.Name + oldConsumerGroup := ca.Group.Name + streamDelete := encodeDeleteStreamAssignment(sa) + + csa := sa.copyGroup() + cca := ca.copyGroup() + csa.Group.Name, csa.Config.Replicas = "new-group", 1 + cca.Group.Name, cca.Config.Replicas = "new-group", 1 + mjs.mu.Unlock() + + // Get the snapshot before removing the stream below so we can recover fresh. + snap, err := mjs.metaSnapshot() + require_NoError(t, err) + require_NoError(t, js.DeleteStream("TEST")) + nc.Close() + + ru := &recoveryUpdates{ + removeStreams: make(map[string]*streamAssignment), + removeConsumers: make(map[string]map[string]*consumerAssignment), + addStreams: make(map[string]*streamAssignment), + updateStreams: make(map[string]*streamAssignment), + updateConsumers: make(map[string]map[string]*consumerAssignment), + } + + // Simulate recovering: + // - snapshot with a stream and consumer + // - normal entry deleting the stream + // - normal entry re-adding the stream and consumer under different configs + // This should result in a consistent state. + mjs.mu.Lock() + mjs.metaRecovering = true + mjs.mu.Unlock() + _, err = mjs.applyMetaEntries([]*Entry{ + newEntry(EntrySnapshot, snap), + newEntry(EntryNormal, streamDelete), + newEntry(EntryNormal, encodeAddStreamAssignment(csa)), + newEntry(EntryNormal, encodeAddConsumerAssignment(cca)), + }, ru) + require_NoError(t, err) + + // Recovery should contain the stream and consumer create. + require_Len(t, len(ru.addStreams), 1) + require_Len(t, len(ru.updateConsumers), 1) + + // Process those updates. + for _, sa = range ru.addStreams { + mjs.processStreamAssignment(sa) + } + for _, cas := range ru.updateConsumers { + for _, ca = range cas { + mjs.processConsumerAssignment(ca) + } + } + + // Should not have created old Raft nodes during recovery. + n1 := ml.lookupRaftNode(oldStreamGroup) + n2 := ml.lookupRaftNode(oldConsumerGroup) + require_True(t, n1 == nil) + require_True(t, n2 == nil) +} + func TestJetStreamClusterConsumerDontSendSnapshotOnLeaderChange(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 5050dc1bf4b..bf181769fd8 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -151,7 +151,7 @@ func TestJetStreamEnableAndDisableAccount(t *testing.T) { } acc, _ := s.LookupOrRegisterAccount("$FOO") - if err := acc.EnableJetStream(nil); err != nil { + if err := acc.EnableJetStream(nil, nil); err != nil { t.Fatalf("Did not expect error on enabling account: %v", err) } if na := s.JetStreamNumAccounts(); na != 1 { @@ -170,7 +170,7 @@ func TestJetStreamEnableAndDisableAccount(t *testing.T) { } // Should get an error for trying to enable a non-registered account. acc = NewAccount("$BAZ") - if err := acc.EnableJetStream(nil); err == nil { + if err := acc.EnableJetStream(nil, nil); err == nil { t.Fatalf("Expected error on enabling account that was not registered") } } @@ -4872,20 +4872,20 @@ func TestJetStreamSystemLimits(t *testing.T) { } } - if err := facc.EnableJetStream(limits(24, 192)); err != nil { + if err := facc.EnableJetStream(limits(24, 192), nil); err != nil { t.Fatalf("Unexpected error: %v", err) } // Use up rest of our resources in memory - if err := bacc.EnableJetStream(limits(1000, 0)); err != nil { + if err := bacc.EnableJetStream(limits(1000, 0), nil); err != nil { t.Fatalf("Unexpected error: %v", err) } // Now ask for more memory. Should error. - if err := zacc.EnableJetStream(limits(1000, 0)); err == nil { + if err := zacc.EnableJetStream(limits(1000, 0), nil); err == nil { t.Fatalf("Expected an error when exhausting memory resource limits") } // Disk too. - if err := zacc.EnableJetStream(limits(0, 10000)); err == nil { + if err := zacc.EnableJetStream(limits(0, 10000), nil); err == nil { t.Fatalf("Expected an error when exhausting memory resource limits") } facc.DisableJetStream() @@ -4899,7 +4899,7 @@ func TestJetStreamSystemLimits(t *testing.T) { t.Fatalf("Expected reserved memory and store to be 0, got %v and %v", friendlyBytes(rm), friendlyBytes(rd)) } - if err := facc.EnableJetStream(limits(24, 192)); err != nil { + if err := facc.EnableJetStream(limits(24, 192), nil); err != nil { t.Fatalf("Unexpected error: %v", err) } // Test Adjust @@ -7412,7 +7412,7 @@ func TestJetStreamCanNotEnableOnSystemAccount(t *testing.T) { defer s.Shutdown() sa := s.SystemAccount() - if err := sa.EnableJetStream(nil); err == nil { + if err := sa.EnableJetStream(nil, nil); err == nil { t.Fatalf("Expected an error trying to enable on the system account") } } @@ -20706,7 +20706,7 @@ func TestJetStreamOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { mset, err = s.globalAccount().lookupStream("DowngradeConsumerTest") require_NoError(t, err) require_True(t, mset.closed.Load()) - require_Equal(t, mset.offlineReason, "stopped") + require_Equal(t, mset.offlineReason, "stopped - unsupported consumer \"DowngradeConsumerTest\"") obs := mset.getPublicConsumers() require_Len(t, len(obs), 1) @@ -20885,3 +20885,44 @@ func TestJetStreamReloadMetaCompact(t *testing.T) { require_Equal(t, s.getOpts().JetStreamMetaCompact, 0) } + +// https://github.com/nats-io/nats-server/issues/7511 +func TestJetStreamImplicitRePublishAfterSubjectTransform(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + cfg := &nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"a.>", "c.>"}, + SubjectTransform: &nats.SubjectTransformConfig{Source: "a.>", Destination: "b.>"}, + RePublish: &nats.RePublish{Destination: ">"}, // Implicitly RePublish 'b.>'. + } + // Forms a cycle since the RePublish captures both 'a.>' and 'c.>' + _, err := js.AddStream(cfg) + require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for republish destination forms a cycle"))) + + // Doesn't form a cycle as 'a.>' is mapped to 'b.>'. A RePublish for '>' can be translated to 'b.>'. + cfg.Subjects = []string{"a.>"} + _, err = js.AddStream(cfg) + require_NoError(t, err) + + sub, err := nc.SubscribeSync("b.>") + require_NoError(t, err) + defer sub.Drain() + + // The published message should be transformed and RePublished. + _, err = js.Publish("a.hello", nil) + require_NoError(t, err) + msg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, msg.Subject, "b.hello") + + // Forms a cycle since the implicit RePublish on 'b.>' is lost. + // The RePublish would now mean publishing to 'c.>' which is a cycle. + cfg.Subjects = []string{"c.>"} + _, err = js.UpdateStream(cfg) + require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for republish destination forms a cycle"))) +} diff --git a/server/leafnode_test.go b/server/leafnode_test.go index a90ff3eaf56..008aeca484e 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -10328,3 +10328,62 @@ func TestLeafNodeDaisyChainWithAccountImportExport(t *testing.T) { acc.mu.RUnlock() require_Len(t, len(sr.psubs), 0) } + +func TestLeafNodeConfigureWriteDeadline(t *testing.T) { + o1, o2 := DefaultOptions(), DefaultOptions() + + o1.LeafNode.WriteDeadline = 5 * time.Second + o1.LeafNode.Host = "127.0.0.1" + o1.LeafNode.Port = -1 + s1 := RunServer(o1) + defer s1.Shutdown() + + s1URL, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", o1.LeafNode.Port)) + o2.Cluster.Name = "somethingelse" + o2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{s1URL}}} + s2 := RunServer(o2) + defer s2.Shutdown() + + checkLeafNodeConnected(t, s2) + + s1.mu.RLock() + defer s1.mu.RUnlock() + + for _, r := range s1.leafs { + require_Equal(t, r.out.wdl, 5*time.Second) + } +} + +func TestLeafNodeConfigureWriteTimeoutPolicy(t *testing.T) { + for name, policy := range map[string]WriteTimeoutPolicy{ + "Default": WriteTimeoutPolicyDefault, + "Retry": WriteTimeoutPolicyRetry, + "Close": WriteTimeoutPolicyClose, + } { + t.Run(name, func(t *testing.T) { + o1 := testDefaultOptionsForGateway("B") + o1.Gateway.WriteTimeout = policy + s1 := runGatewayServer(o1) + defer s1.Shutdown() + + o2 := testGatewayOptionsFromToWithServers(t, "A", "B", s1) + s2 := runGatewayServer(o2) + defer s2.Shutdown() + + waitForOutboundGateways(t, s2, 1, time.Second) + waitForInboundGateways(t, s1, 1, time.Second) + waitForOutboundGateways(t, s1, 1, time.Second) + + s1.mu.RLock() + defer s1.mu.RUnlock() + + for _, r := range s1.leafs { + if policy == WriteTimeoutPolicyDefault { + require_Equal(t, r.out.wtp, WriteTimeoutPolicyRetry) + } else { + require_Equal(t, r.out.wtp, policy) + } + } + }) + } +} diff --git a/server/monitor.go b/server/monitor.go index af3af672187..e6c75207664 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1219,6 +1219,7 @@ type Varz struct { JetStream JetStreamVarz `json:"jetstream,omitempty"` // JetStream is the JetStream state TLSTimeout float64 `json:"tls_timeout"` // TLSTimeout is how long TLS operations have to complete WriteDeadline time.Duration `json:"write_deadline"` // WriteDeadline is the maximum time writes to sockets have to complete + WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors Start time.Time `json:"start"` // Start is time when the server was started Now time.Time `json:"now"` // Now is the current time of the server Uptime string `json:"uptime"` // Uptime is how long the server has been running @@ -1261,15 +1262,17 @@ type JetStreamVarz struct { // ClusterOptsVarz contains monitoring cluster information type ClusterOptsVarz struct { - Name string `json:"name,omitempty"` // Name is the configured cluster name - Host string `json:"addr,omitempty"` // Host is the host the cluster listens on for connections - Port int `json:"cluster_port,omitempty"` // Port is the port the cluster listens on for connections - AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication - URLs []string `json:"urls,omitempty"` // URLs is the list of cluster URLs - TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete - TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections - TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed - PoolSize int `json:"pool_size,omitempty"` // PoolSize is the configured route connection pool size + Name string `json:"name,omitempty"` // Name is the configured cluster name + Host string `json:"addr,omitempty"` // Host is the host the cluster listens on for connections + Port int `json:"cluster_port,omitempty"` // Port is the port the cluster listens on for connections + AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication + URLs []string `json:"urls,omitempty"` // URLs is the list of cluster URLs + TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete + TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections + TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed + PoolSize int `json:"pool_size,omitempty"` // PoolSize is the configured route connection pool size + WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete + WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors } // GatewayOptsVarz contains monitoring gateway information @@ -1285,6 +1288,8 @@ type GatewayOptsVarz struct { ConnectRetries int `json:"connect_retries,omitempty"` // ConnectRetries is how many connection attempts the route will make Gateways []RemoteGatewayOptsVarz `json:"gateways,omitempty"` // Gateways is state of configured gateway remotes RejectUnknown bool `json:"reject_unknown,omitempty"` // RejectUnknown indicates if unknown cluster connections will be rejected + WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete + WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors } // RemoteGatewayOptsVarz contains monitoring remote gateway information @@ -1304,6 +1309,8 @@ type LeafNodeOptsVarz struct { TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed Remotes []RemoteLeafOptsVarz `json:"remotes,omitempty"` // Remotes is state of configured Leafnode remotes TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be performed + WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete + WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors } // DenyRules Contains lists of subjects not allowed to be imported/exported @@ -1554,14 +1561,16 @@ func (s *Server) createVarz(pcpu float64, rss int64) *Varz { HTTPBasePath: opts.HTTPBasePath, HTTPSPort: opts.HTTPSPort, Cluster: ClusterOptsVarz{ - Name: info.Cluster, - Host: c.Host, - Port: c.Port, - AuthTimeout: c.AuthTimeout, - TLSTimeout: c.TLSTimeout, - TLSRequired: clustTlsReq, - TLSVerify: clustTlsReq, - PoolSize: opts.Cluster.PoolSize, + Name: info.Cluster, + Host: c.Host, + Port: c.Port, + AuthTimeout: c.AuthTimeout, + TLSTimeout: c.TLSTimeout, + TLSRequired: clustTlsReq, + TLSVerify: clustTlsReq, + PoolSize: opts.Cluster.PoolSize, + WriteDeadline: opts.Cluster.WriteDeadline, + WriteTimeout: opts.Cluster.WriteTimeout.String(), }, Gateway: GatewayOptsVarz{ Name: gw.Name, @@ -1575,6 +1584,8 @@ func (s *Server) createVarz(pcpu float64, rss int64) *Varz { ConnectRetries: gw.ConnectRetries, Gateways: []RemoteGatewayOptsVarz{}, RejectUnknown: gw.RejectUnknown, + WriteDeadline: opts.Cluster.WriteDeadline, + WriteTimeout: opts.Cluster.WriteTimeout.String(), }, LeafNode: LeafNodeOptsVarz{ Host: ln.Host, @@ -1585,6 +1596,8 @@ func (s *Server) createVarz(pcpu float64, rss int64) *Varz { TLSVerify: leafTlsVerify, TLSOCSPPeerVerify: leafTlsOCSPPeerVerify, Remotes: []RemoteLeafOptsVarz{}, + WriteDeadline: opts.Cluster.WriteDeadline, + WriteTimeout: opts.Cluster.WriteTimeout.String(), }, MQTT: MQTTOptsVarz{ Host: mqtt.Host, @@ -1702,6 +1715,7 @@ func (s *Server) updateVarzConfigReloadableFields(v *Varz) { v.MaxPending = opts.MaxPending v.TLSTimeout = opts.TLSTimeout v.WriteDeadline = opts.WriteDeadline + v.WriteTimeout = opts.WriteTimeout.String() v.ConfigLoadTime = s.configTime.UTC() v.ConfigDigest = opts.configDigest // Update route URLs if applicable diff --git a/server/monitor_test.go b/server/monitor_test.go index 469ee44da62..84280df7a7c 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -2743,6 +2743,7 @@ func TestMonitorCluster(t *testing.T) { opts.Cluster.TLSConfig != nil, opts.Cluster.TLSConfig != nil, DEFAULT_ROUTE_POOL_SIZE, + 0, _EMPTY_, } varzURL := fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port) @@ -2758,7 +2759,7 @@ func TestMonitorCluster(t *testing.T) { // Having this here to make sure that if fields are added in ClusterOptsVarz, // we make sure to update this test (compiler will report an error if we don't) - _ = ClusterOptsVarz{"", "", 0, 0, nil, 2, false, false, 0} + _ = ClusterOptsVarz{"", "", 0, 0, nil, 2, false, false, 0, 0, _EMPTY_} // Alter the fields to make sure that we have a proper deep copy // of what may be stored in the server. Anything we change here @@ -2913,6 +2914,7 @@ func TestMonitorGateway(t *testing.T) { opts.Gateway.ConnectRetries, []RemoteGatewayOptsVarz{{"B", 1, nil}}, opts.Gateway.RejectUnknown, + 0, _EMPTY_, } // Since URLs array is not guaranteed to be always the same order, // we don't add it in the expected GatewayOptsVarz, instead we @@ -2950,7 +2952,7 @@ func TestMonitorGateway(t *testing.T) { // Having this here to make sure that if fields are added in GatewayOptsVarz, // we make sure to update this test (compiler will report an error if we don't) - _ = GatewayOptsVarz{"", "", 0, 0, 0, false, false, "", 0, []RemoteGatewayOptsVarz{{"", 0, nil}}, false} + _ = GatewayOptsVarz{"", "", 0, 0, 0, false, false, "", 0, []RemoteGatewayOptsVarz{{"", 0, nil}}, false, 0, "default"} // Alter the fields to make sure that we have a proper deep copy // of what may be stored in the server. Anything we change here @@ -3136,6 +3138,7 @@ func TestMonitorLeafNode(t *testing.T) { }, }, false, + 0, _EMPTY_, } varzURL := fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port) @@ -3160,7 +3163,7 @@ func TestMonitorLeafNode(t *testing.T) { // Having this here to make sure that if fields are added in ClusterOptsVarz, // we make sure to update this test (compiler will report an error if we don't) - _ = LeafNodeOptsVarz{"", 0, 0, 0, false, false, []RemoteLeafOptsVarz{{"", 0, nil, nil, false}}, false} + _ = LeafNodeOptsVarz{"", 0, 0, 0, false, false, []RemoteLeafOptsVarz{{"", 0, nil, nil, false}}, false, 0, _EMPTY_} // Alter the fields to make sure that we have a proper deep copy // of what may be stored in the server. Anything we change here diff --git a/server/mqtt_test.go b/server/mqtt_test.go index e779096ea07..c08c75e94b3 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -1035,7 +1035,7 @@ func testMQTTEnableJSForAccount(t *testing.T, s *Server, accName string) { MaxStore: 1024 * 1024, }, } - if err := acc.EnableJetStream(limits); err != nil { + if err := acc.EnableJetStream(limits, nil); err != nil { t.Fatalf("Error enabling JS: %v", err) } } diff --git a/server/opts.go b/server/opts.go index 56bd7be7000..acf97d74800 100644 --- a/server/opts.go +++ b/server/opts.go @@ -62,27 +62,29 @@ type PinnedCertSet map[string]struct{} // NOTE: This structure is no longer used for monitoring endpoints // and json tags are deprecated and may be removed in the future. type ClusterOpts struct { - Name string `json:"-"` - Host string `json:"addr,omitempty"` - Port int `json:"cluster_port,omitempty"` - Username string `json:"-"` - Password string `json:"-"` - AuthTimeout float64 `json:"auth_timeout,omitempty"` - Permissions *RoutePermissions `json:"-"` - TLSTimeout float64 `json:"-"` - TLSConfig *tls.Config `json:"-"` - TLSMap bool `json:"-"` - TLSCheckKnownURLs bool `json:"-"` - TLSPinnedCerts PinnedCertSet `json:"-"` - ListenStr string `json:"-"` - Advertise string `json:"-"` - NoAdvertise bool `json:"-"` - ConnectRetries int `json:"-"` - PoolSize int `json:"-"` - PinnedAccounts []string `json:"-"` - Compression CompressionOpts `json:"-"` - PingInterval time.Duration `json:"-"` - MaxPingsOut int `json:"-"` + Name string `json:"-"` + Host string `json:"addr,omitempty"` + Port int `json:"cluster_port,omitempty"` + Username string `json:"-"` + Password string `json:"-"` + AuthTimeout float64 `json:"auth_timeout,omitempty"` + Permissions *RoutePermissions `json:"-"` + TLSTimeout float64 `json:"-"` + TLSConfig *tls.Config `json:"-"` + TLSMap bool `json:"-"` + TLSCheckKnownURLs bool `json:"-"` + TLSPinnedCerts PinnedCertSet `json:"-"` + ListenStr string `json:"-"` + Advertise string `json:"-"` + NoAdvertise bool `json:"-"` + ConnectRetries int `json:"-"` + PoolSize int `json:"-"` + PinnedAccounts []string `json:"-"` + Compression CompressionOpts `json:"-"` + PingInterval time.Duration `json:"-"` + MaxPingsOut int `json:"-"` + WriteDeadline time.Duration `json:"-"` + WriteTimeout WriteTimeoutPolicy `json:"-"` // Not exported (used in tests) resolver netResolver @@ -124,6 +126,8 @@ type GatewayOpts struct { ConnectRetries int `json:"connect_retries,omitempty"` Gateways []*RemoteGatewayOpts `json:"gateways,omitempty"` RejectUnknown bool `json:"reject_unknown,omitempty"` // config got renamed to reject_unknown_cluster + WriteDeadline time.Duration `json:"-"` + WriteTimeout WriteTimeoutPolicy `json:"-"` // Not exported, for tests. resolver netResolver @@ -169,10 +173,12 @@ type LeafNodeOpts struct { // to start before falling back to previous behavior of sending the // INFO protocol first. It allows for a mix of newer remote leafnodes // that can require a TLS handshake first, and older that can't. - TLSHandshakeFirstFallback time.Duration `json:"-"` - Advertise string `json:"-"` - NoAdvertise bool `json:"-"` - ReconnectInterval time.Duration `json:"-"` + TLSHandshakeFirstFallback time.Duration `json:"-"` + Advertise string `json:"-"` + NoAdvertise bool `json:"-"` + ReconnectInterval time.Duration `json:"-"` + WriteDeadline time.Duration `json:"-"` + WriteTimeout WriteTimeoutPolicy `json:"-"` // Compression options Compression CompressionOpts `json:"-"` @@ -384,12 +390,13 @@ type Options struct { // to start before falling back to previous behavior of sending the // INFO protocol first. It allows for a mix of newer clients that can // require a TLS handshake first, and older clients that can't. - TLSHandshakeFirstFallback time.Duration `json:"-"` - AllowNonTLS bool `json:"-"` - WriteDeadline time.Duration `json:"-"` - MaxClosedClients int `json:"-"` - LameDuckDuration time.Duration `json:"-"` - LameDuckGracePeriod time.Duration `json:"-"` + TLSHandshakeFirstFallback time.Duration `json:"-"` + AllowNonTLS bool `json:"-"` + WriteDeadline time.Duration `json:"-"` + WriteTimeout WriteTimeoutPolicy `json:"-"` + MaxClosedClients int `json:"-"` + LameDuckDuration time.Duration `json:"-"` + LameDuckGracePeriod time.Duration `json:"-"` // MaxTracedMsgLen is the maximum printable length for traced messages. MaxTracedMsgLen int `json:"-"` @@ -1287,6 +1294,8 @@ func (o *Options) processConfigFileLine(k string, v any, errors *[]error, warnin o.AllowNonTLS = v.(bool) case "write_deadline": o.WriteDeadline = parseDuration("write_deadline", tk, v, errors, warnings) + case "write_timeout": + o.WriteTimeout = parseWriteDeadlinePolicy(tk, v.(string), errors) case "lame_duck_duration": dur, err := time.ParseDuration(v.(string)) if err != nil { @@ -1743,6 +1752,21 @@ func parseDuration(field string, tk token, v any, errors *[]error, warnings *[]e } } +func parseWriteDeadlinePolicy(tk token, v string, errors *[]error) WriteTimeoutPolicy { + switch v { + case "default": + return WriteTimeoutPolicyDefault + case "close": + return WriteTimeoutPolicyClose + case "retry": + return WriteTimeoutPolicyRetry + default: + err := &configErr{tk, "write_timeout must be 'default', 'close' or 'retry'"} + *errors = append(*errors, err) + return WriteTimeoutPolicyDefault + } +} + func trackExplicitVal(pm *map[string]bool, name string, val bool) { m := *pm if m == nil { @@ -1915,6 +1939,10 @@ func parseCluster(v any, opts *Options, errors *[]error, warnings *[]error) erro } case "ping_max": opts.Cluster.MaxPingsOut = int(mv.(int64)) + case "write_deadline": + opts.Cluster.WriteDeadline = parseDuration("write_deadline", tk, mv, errors, warnings) + case "write_timeout": + opts.Cluster.WriteTimeout = parseWriteDeadlinePolicy(tk, mv.(string), errors) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -2101,6 +2129,10 @@ func parseGateway(v any, o *Options, errors *[]error, warnings *[]error) error { o.Gateway.Gateways = gateways case "reject_unknown", "reject_unknown_cluster": o.Gateway.RejectUnknown = mv.(bool) + case "write_deadline": + o.Gateway.WriteDeadline = parseDuration("write_deadline", tk, mv, errors, warnings) + case "write_timeout": + o.Gateway.WriteTimeout = parseWriteDeadlinePolicy(tk, mv.(string), errors) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -2586,6 +2618,10 @@ func parseLeafNodes(v any, opts *Options, errors *[]error, warnings *[]error) er *errors = append(*errors, err) continue } + case "write_deadline": + opts.LeafNode.WriteDeadline = parseDuration("write_deadline", tk, mv, errors, warnings) + case "write_timeout": + opts.LeafNode.WriteTimeout = parseWriteDeadlinePolicy(tk, mv.(string), errors) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/opts_test.go b/server/opts_test.go index 273936964a1..3fc1b44eaca 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -3746,3 +3746,127 @@ func parseConfigTolerantly(t *testing.T, data string) (*Options, error) { return o, nil } + +func TestWriteDeadlineConfigParsing(t *testing.T) { + type testCase struct { + name string + config string + expect func(t *testing.T, opts *Options) + } + + for _, tc := range []testCase{ + { + name: "LeafNode", + config: ` + leafnodes { + write_deadline: 5s + } + `, + expect: func(t *testing.T, opts *Options) { + require_Equal(t, opts.LeafNode.WriteDeadline, 5*time.Second) + }, + }, + { + name: "Gateway", + config: ` + gateway { + write_deadline: 6s + } + `, + expect: func(t *testing.T, opts *Options) { + require_Equal(t, opts.Gateway.WriteDeadline, 6*time.Second) + }, + }, + { + name: "Cluster", + config: ` + cluster { + write_deadline: 7s + } + `, + expect: func(t *testing.T, opts *Options) { + require_Equal(t, opts.Cluster.WriteDeadline, 7*time.Second) + }, + }, + { + name: "Global", + config: ` + write_deadline: 8s + `, + expect: func(t *testing.T, opts *Options) { + require_Equal(t, opts.WriteDeadline, 8*time.Second) + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + opts, err := parseConfigTolerantly(t, tc.config) + require_NoError(t, err) + tc.expect(t, opts) + }) + } +} + +func TestWriteTimeoutConfigParsing(t *testing.T) { + type testCase struct { + name string + config string + expect func(t *testing.T, opts *Options) + } + + for str, pol := range map[string]WriteTimeoutPolicy{ + "default": WriteTimeoutPolicyDefault, + "retry": WriteTimeoutPolicyRetry, + "close": WriteTimeoutPolicyClose, + } { + for _, tc := range []testCase{ + { + name: "LeafNode", + config: fmt.Sprintf(` + leafnodes { + write_timeout: %s + } + `, str), + expect: func(t *testing.T, opts *Options) { + require_Equal(t, opts.LeafNode.WriteTimeout, pol) + }, + }, + { + name: "Gateway", + config: fmt.Sprintf(` + gateway { + write_timeout: %s + } + `, str), + expect: func(t *testing.T, opts *Options) { + require_Equal(t, opts.Gateway.WriteTimeout, pol) + }, + }, + { + name: "Cluster", + config: fmt.Sprintf(` + cluster { + write_timeout: %s + } + `, str), + expect: func(t *testing.T, opts *Options) { + require_Equal(t, opts.Cluster.WriteTimeout, pol) + }, + }, + { + name: "Global", + config: fmt.Sprintf(` + write_timeout: %s + `, str), + expect: func(t *testing.T, opts *Options) { + require_Equal(t, opts.WriteTimeout, pol) + }, + }, + } { + t.Run(fmt.Sprintf("%s/%s", tc.name, str), func(t *testing.T) { + opts, err := parseConfigTolerantly(t, tc.config) + require_NoError(t, err) + tc.expect(t, opts) + }) + } + } +} diff --git a/server/raft.go b/server/raft.go index 2e466824b8b..dbde8102fcc 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1096,11 +1096,11 @@ func (n *raft) ResumeApply() { func (n *raft) DrainAndReplaySnapshot() bool { n.Lock() defer n.Unlock() - n.warn("Draining and replaying snapshot") snap, err := n.loadLastSnapshot() if err != nil { return false } + n.warn("Draining and replaying snapshot") n.pauseApplyLocked() n.apply.drain() n.commit = snap.lastIndex @@ -3032,6 +3032,15 @@ func (n *raft) applyCommit(index uint64) error { committed = append(committed, newEntry(EntrySnapshot, e.Data)) case EntrySnapshot: committed = append(committed, e) + // If we have no snapshot, install the leader's snapshot as our own. + if len(ae.entries) == 1 && n.snapfile == _EMPTY_ && ae.commit > 0 { + n.installSnapshot(&snapshot{ + lastTerm: ae.pterm, + lastIndex: ae.commit, + peerstate: encodePeerState(&peerState{n.peerNames(), n.csz, n.extSt}), + data: e.Data, + }) + } case EntryPeerState: if n.State() != Leader { if ps, err := decodePeerState(e.Data); err == nil { diff --git a/server/raft_test.go b/server/raft_test.go index 48aa265bbbf..045e4575c7e 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -3597,6 +3597,45 @@ func TestNRGReportLeaderAfterNoopEntry(t *testing.T) { require_True(t, n.Leader()) } +func TestNRGSendSnapshotInstallsSnapshot(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + require_Equal(t, n.pindex, 0) + require_Equal(t, n.snapfile, _EMPTY_) + + // Switch to candidate, to become leader. + require_Equal(t, n.term, 0) + n.switchToCandidate() + require_Equal(t, n.term, 1) + + // When switching to leader a NOOP-entry is sent. + n.switchToLeader() + require_Equal(t, n.pindex, 1) + require_Equal(t, n.snapfile, _EMPTY_) + require_NoError(t, n.applyCommit(1)) + require_Equal(t, n.snapfile, _EMPTY_) + + // On scaleup, we send a snapshot. + require_NoError(t, n.SendSnapshot([]byte("snapshot_data"))) + require_Equal(t, n.pindex, 2) + require_Equal(t, n.snapfile, _EMPTY_) + + // When applying the entry, the sent snapshot should be installed. + require_NoError(t, n.applyCommit(2)) + require_NotEqual(t, n.snapfile, _EMPTY_) + + snap, err := n.loadLastSnapshot() + require_NoError(t, err) + require_NotNil(t, snap) + require_Equal(t, snap.lastTerm, 1) + require_Equal(t, snap.lastIndex, 1) + require_Equal(t, string(snap.data), "snapshot_data") + + // Draining and replaying the snapshot should work. + require_True(t, n.DrainAndReplaySnapshot()) +} + // This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before // proposing the next one. // The test may fail if: diff --git a/server/reload.go b/server/reload.go index 123aad99adc..4e66223d58e 100644 --- a/server/reload.go +++ b/server/reload.go @@ -1186,7 +1186,7 @@ func imposeOrder(value any) error { slices.Sort(value.AllowedOrigins) case string, bool, uint8, uint16, uint64, int, int32, int64, time.Duration, float64, nil, LeafNodeOpts, ClusterOpts, *tls.Config, PinnedCertSet, *URLAccResolver, *MemAccResolver, *DirAccResolver, *CacheDirAccResolver, Authentication, MQTTOpts, jwt.TagList, - *OCSPConfig, map[string]string, JSLimitOpts, StoreCipher, *OCSPResponseCacheConfig: + *OCSPConfig, map[string]string, JSLimitOpts, StoreCipher, *OCSPResponseCacheConfig, WriteTimeoutPolicy: // explicitly skipped types case *AuthCallout: case JSTpmOpts: diff --git a/server/routes_test.go b/server/routes_test.go index 27eb9683ac9..351152419c1 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -4917,3 +4917,65 @@ func TestRouteImplicitJoinsSeparateGroups(t *testing.T) { }) } } + +func TestRouteConfigureWriteDeadline(t *testing.T) { + o1, o2 := DefaultOptions(), DefaultOptions() + + o1.Cluster.WriteDeadline = 5 * time.Second + s1 := RunServer(o1) + defer s1.Shutdown() + + o2.Cluster.WriteDeadline = 6 * time.Second + o2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port)) + s2 := RunServer(o2) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + s1.mu.RLock() + s2.mu.RLock() + defer s1.mu.RUnlock() + defer s2.mu.RUnlock() + + s1.forEachRoute(func(r *client) { + require_Equal(t, r.out.wdl, 5*time.Second) + }) + + s2.forEachRoute(func(r *client) { + require_Equal(t, r.out.wdl, 6*time.Second) + }) +} + +func TestRouteConfigureWriteTimeoutPolicy(t *testing.T) { + for name, policy := range map[string]WriteTimeoutPolicy{ + "Default": WriteTimeoutPolicyDefault, + "Retry": WriteTimeoutPolicyRetry, + "Close": WriteTimeoutPolicyClose, + } { + t.Run(name, func(t *testing.T) { + o1 := testDefaultOptionsForGateway("B") + o1.Gateway.WriteTimeout = policy + s1 := runGatewayServer(o1) + defer s1.Shutdown() + + o2 := testGatewayOptionsFromToWithServers(t, "A", "B", s1) + s2 := runGatewayServer(o2) + defer s2.Shutdown() + + waitForOutboundGateways(t, s2, 1, time.Second) + waitForInboundGateways(t, s1, 1, time.Second) + waitForOutboundGateways(t, s1, 1, time.Second) + + s1.mu.RLock() + defer s1.mu.RUnlock() + + s1.forEachRoute(func(r *client) { + if policy == WriteTimeoutPolicyDefault { + require_Equal(t, r.out.wtp, WriteTimeoutPolicyRetry) + } else { + require_Equal(t, r.out.wtp, policy) + } + }) + }) + } +} diff --git a/server/stream.go b/server/stream.go index da3b6c2538d..3cec5096183 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1765,6 +1765,18 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo } } + // Check the subject transform if any + if cfg.SubjectTransform != nil { + if cfg.SubjectTransform.Source != _EMPTY_ && !IsValidSubject(cfg.SubjectTransform.Source) { + return StreamConfig{}, NewJSStreamTransformInvalidSourceError(fmt.Errorf("%w %s", ErrBadSubject, cfg.SubjectTransform.Source)) + } + + err := ValidateMapping(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination) + if err != nil { + return StreamConfig{}, NewJSStreamTransformInvalidDestinationError(err) + } + } + // If we have a republish directive check if we can create a transform here. if cfg.RePublish != nil { // Check to make sure source is a valid subset of the subjects we have. @@ -1776,6 +1788,18 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo } cfg.RePublish.Source = fwcs } + // A RePublish from '>' to '>' could be used, normally this would form a cycle with the stream subjects. + // But if this aligns to a different subject based on the transform, we allow it still. + // The RePublish will be implicit based on the transform, but only if the transform's source + // is the only stream subject. + if cfg.RePublish.Destination == fwcs && cfg.RePublish.Source == fwcs && cfg.SubjectTransform != nil && + len(cfg.Subjects) == 1 && cfg.SubjectTransform.Source == cfg.Subjects[0] { + if pedantic { + return StreamConfig{}, NewJSPedanticError(fmt.Errorf("implicit republish based on subject transform")) + } + // RePublish all messages with the transformed subject. + cfg.RePublish.Source, cfg.RePublish.Destination = cfg.SubjectTransform.Destination, cfg.SubjectTransform.Destination + } var formsCycle bool for _, subj := range cfg.Subjects { if SubjectsCollide(cfg.RePublish.Destination, subj) { @@ -1791,18 +1815,6 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo } } - // Check the subject transform if any - if cfg.SubjectTransform != nil { - if cfg.SubjectTransform.Source != _EMPTY_ && !IsValidSubject(cfg.SubjectTransform.Source) { - return StreamConfig{}, NewJSStreamTransformInvalidSourceError(fmt.Errorf("%w %s", ErrBadSubject, cfg.SubjectTransform.Source)) - } - - err := ValidateMapping(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination) - if err != nil { - return StreamConfig{}, NewJSStreamTransformInvalidDestinationError(err) - } - } - // Remove placement if it's an empty object. if cfg.Placement != nil && reflect.DeepEqual(cfg.Placement, &Placement{}) { cfg.Placement = nil diff --git a/server/util.go b/server/util.go index f9fd695c328..cf4bf67490f 100644 --- a/server/util.go +++ b/server/util.go @@ -23,6 +23,7 @@ import ( "net" "net/url" "reflect" + "runtime" "strconv" "strings" "time" @@ -340,3 +341,25 @@ func generateInfoJSON(info *Info) []byte { pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)} return bytes.Join(pcs, []byte(" ")) } + +// parallelTaskQueue starts a number of goroutines and returns a channel +// which functions can be sent to for queued parallel execution. The +// goroutines will stop running when the returned channel is closed and +// all queued tasks have completed. The passed in mp limits concurrency, +// or a value <= 0 will default to GOMAXPROCS. +func parallelTaskQueue(mp int) chan<- func() { + if rmp := runtime.GOMAXPROCS(-1); mp <= 0 { + mp = rmp + } else { + mp = max(rmp, mp) + } + tq := make(chan func(), mp) + for range mp { + go func() { + for fn := range tq { + fn() + } + }() + } + return tq +}