diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index b1cdc3fa503..c8e8b280358 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -33,13 +33,13 @@ jobs: - name: Install cosign # Use commit hash here to avoid a re-tagging attack, as this is a third-party action - # Commit d7543c93d881b35a8faa02e8e3605f69b7a1ce62 = tag v3.10.0 - uses: sigstore/cosign-installer@d7543c93d881b35a8faa02e8e3605f69b7a1ce62 + # Commit faadad0cce49287aee09b3a48701e75088a2c6ad = tag v4.0.0 + uses: sigstore/cosign-installer@faadad0cce49287aee09b3a48701e75088a2c6ad - name: Install syft # Use commit hash here to avoid a re-tagging attack, as this is a third-party action - # Commit f8bdd1d8ac5e901a77a92f111440fdb1b593736b = tag v0.20.6 - uses: anchore/sbom-action/download-syft@f8bdd1d8ac5e901a77a92f111440fdb1b593736b + # Commit 8e94d75ddd33f69f691467e42275782e4bfefe84 = tag v0.20.9 + uses: anchore/sbom-action/download-syft@8e94d75ddd33f69f691467e42275782e4bfefe84 with: syft-version: "v1.27.1" diff --git a/go.mod b/go.mod index 7f127aca4c2..1dd04c5cf90 100644 --- a/go.mod +++ b/go.mod @@ -7,10 +7,10 @@ toolchain go1.24.9 require ( github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op github.com/google/go-tpm v0.9.6 - github.com/klauspost/compress v1.18.0 + github.com/klauspost/compress v1.18.1 github.com/minio/highwayhash v1.0.3 github.com/nats-io/jwt/v2 v2.8.0 - github.com/nats-io/nats.go v1.46.1 + github.com/nats-io/nats.go v1.47.0 github.com/nats-io/nkeys v0.4.11 github.com/nats-io/nuid v1.0.1 go.uber.org/automaxprocs v1.6.0 diff --git a/go.sum b/go.sum index 49aa2382556..b351d963d39 100644 --- a/go.sum +++ b/go.sum @@ -4,14 +4,14 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/go-tpm v0.9.6 h1:Ku42PT4LmjDu1H5C5ISWLlpI1mj+Zq7sPGKoRw2XROA= github.com/google/go-tpm v0.9.6/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= +github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g= github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= -github.com/nats-io/nats.go v1.46.1 h1:bqQ2ZcxVd2lpYI97xYASeRTY3I5boe/IVmuUDPitHfo= -github.com/nats-io/nats.go v1.46.1/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nats.go v1.47.0 h1:YQdADw6J/UfGUd2Oy6tn4Hq6YHxCaJrVKayxxFqYrgM= +github.com/nats-io/nats.go v1.47.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/server/accounts.go b/server/accounts.go index a9df6e0365a..548f6943ec4 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -3794,7 +3794,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim // If JetStream is enabled for this server we will call into configJetStream for the account // regardless of enabled or disabled. It handles both cases. if jsEnabled { - if err := s.configJetStream(a); err != nil { + if err := s.configJetStream(a, nil); err != nil { s.Errorf("Error configuring jetstream for account [%s]: %v", tl, err.Error()) a.mu.Lock() // Absent reload of js server cfg, this is going to be broken until js is disabled @@ -4371,7 +4371,7 @@ func (dr *DirAccResolver) Start(s *Server) error { s.Warnf("DirResolver - Error checking for JetStream support for account %q: %v", pubKey, err) } } else if jsa == nil { - if err = s.configJetStream(acc); err != nil { + if err = s.configJetStream(acc, nil); err != nil { s.Errorf("DirResolver - Error configuring JetStream for account %q: %v", pubKey, err) } } diff --git a/server/auth.go b/server/auth.go index 2d735c0e2f6..ede297441b4 100644 --- a/server/auth.go +++ b/server/auth.go @@ -1123,7 +1123,8 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au return ok } - if c.kind == CLIENT { + // Check for the use of simple auth. + if c.kind == CLIENT || c.kind == LEAF { if proxyRequired = opts.ProxyRequired; proxyRequired && !trustedProxy { return setProxyAuthError(ErrAuthProxyRequired) } diff --git a/server/client.go b/server/client.go index b7ef2ba60b5..af670bd33b1 100644 --- a/server/client.go +++ b/server/client.go @@ -4345,7 +4345,7 @@ func (c *client) setupResponseServiceImport(acc *Account, si *serviceImport, tra // Will remove a header if present. func removeHeaderIfPresent(hdr []byte, key string) []byte { - start := bytes.Index(hdr, []byte(key+":")) + start := getHeaderKeyIndex(key, hdr) // key can't be first and we want to check that it is preceded by a '\n' if start < 1 || hdr[start-1] != '\n' { return hdr @@ -4463,22 +4463,13 @@ func sliceHeader(key string, hdr []byte) []byte { if len(hdr) == 0 { return nil } - index := bytes.Index(hdr, stringToBytes(key+":")) - hdrLen := len(hdr) - // Check that we have enough characters, this will handle the -1 case of the key not - // being found and will also handle not having enough characters for trailing CRLF. - if index < 2 { - return nil - } - // There should be a terminating CRLF. - if index >= hdrLen-1 || hdr[index-1] != '\n' || hdr[index-2] != '\r' { + index := getHeaderKeyIndex(key, hdr) + if index == -1 { return nil } - // The key should be immediately followed by a : separator. + // Skip over the key and the : separator. index += len(key) + 1 - if index >= hdrLen || hdr[index-1] != ':' { - return nil - } + hdrLen := len(hdr) // Skip over whitespace before the value. for index < hdrLen && hdr[index] == ' ' { index++ @@ -4494,11 +4485,49 @@ func sliceHeader(key string, hdr []byte) []byte { return hdr[start:index:index] } +// getHeaderKeyIndex returns an index into the header slice for the given key. +// Returns -1 if not found. +func getHeaderKeyIndex(key string, hdr []byte) int { + if len(hdr) == 0 { + return -1 + } + bkey := stringToBytes(key) + keyLen, hdrLen := len(key), len(hdr) + var offset int + for { + index := bytes.Index(hdr[offset:], bkey) + // Check that we have enough characters, this will handle the -1 case of the key not + // being found and will also handle not having enough characters for trailing CRLF. + if index < 2 { + return -1 + } + index += offset + // There should be a terminating CRLF. + if index >= hdrLen-1 || hdr[index-1] != '\n' || hdr[index-2] != '\r' { + offset = index + keyLen + continue + } + // The key should be immediately followed by a : separator. + if index+keyLen >= hdrLen { + return -1 + } + if hdr[index+keyLen] != ':' { + offset = index + keyLen + continue + } + return index + } +} + func setHeader(key, val string, hdr []byte) []byte { - prefix := []byte(key + ": ") - start := bytes.Index(hdr, prefix) + start := getHeaderKeyIndex(key, hdr) if start >= 0 { - valStart := start + len(prefix) + valStart := start + len(key) + 1 + // Preserve single whitespace if used. + hdrLen := len(hdr) + if valStart < hdrLen && hdr[valStart] == ' ' { + valStart++ + } valEnd := bytes.Index(hdr[valStart:], []byte("\r")) if valEnd < 0 { return hdr // malformed headers diff --git a/server/client_test.go b/server/client_test.go index 88b6baf0063..03d0fca2e72 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -3195,7 +3195,7 @@ func TestSliceHeader(t *testing.T) { require_True(t, bytes.Equal(sliced, copied)) } -func TestSliceHeaderOrdering(t *testing.T) { +func TestSliceHeaderOrderingPrefix(t *testing.T) { hdr := []byte("NATS/1.0\r\n\r\n") // These headers share the same prefix, the longer subject @@ -3215,6 +3215,105 @@ func TestSliceHeaderOrdering(t *testing.T) { require_True(t, bytes.Equal(sliced, copied)) } +func TestSliceHeaderOrderingSuffix(t *testing.T) { + hdr := []byte("NATS/1.0\r\n\r\n") + + // These headers share the same suffix, the longer subject + // must not invalidate the existence of the shorter one. + hdr = genHeader(hdr, "Previous-Nats-Msg-Id", "user") + hdr = genHeader(hdr, "Nats-Msg-Id", "control") + + sliced := sliceHeader("Nats-Msg-Id", hdr) + copied := getHeader("Nats-Msg-Id", hdr) + + require_NotNil(t, sliced) + require_NotNil(t, copied) + require_True(t, bytes.Equal(sliced, copied)) + require_Equal(t, string(copied), "control") +} + +func TestRemoveHeaderIfPresentOrderingPrefix(t *testing.T) { + hdr := []byte("NATS/1.0\r\n\r\n") + + // These headers share the same prefix, the longer subject + // must not invalidate the existence of the shorter one. + hdr = genHeader(hdr, JSExpectedLastSubjSeqSubj, "foo") + hdr = genHeader(hdr, JSExpectedLastSubjSeq, "24") + + hdr = removeHeaderIfPresent(hdr, JSExpectedLastSubjSeq) + ehdr := genHeader(nil, JSExpectedLastSubjSeqSubj, "foo") + require_True(t, bytes.Equal(hdr, ehdr)) +} + +func TestRemoveHeaderIfPresentOrderingSuffix(t *testing.T) { + hdr := []byte("NATS/1.0\r\n\r\n") + + // These headers share the same suffix, the longer subject + // must not invalidate the existence of the shorter one. + hdr = genHeader(hdr, "Previous-Nats-Msg-Id", "user") + hdr = genHeader(hdr, "Nats-Msg-Id", "control") + + hdr = removeHeaderIfPresent(hdr, "Nats-Msg-Id") + ehdr := genHeader(nil, "Previous-Nats-Msg-Id", "user") + require_True(t, bytes.Equal(hdr, ehdr)) +} + +func TestSetHeaderOrderingPrefix(t *testing.T) { + for _, space := range []bool{true, false} { + title := "Normal" + if !space { + title = "Trimmed" + } + t.Run(title, func(t *testing.T) { + hdr := []byte("NATS/1.0\r\n\r\n") + + // These headers share the same prefix, the longer subject + // must not invalidate the existence of the shorter one. + hdr = genHeader(hdr, JSExpectedLastSubjSeqSubj, "foo") + hdr = genHeader(hdr, JSExpectedLastSubjSeq, "24") + if !space { + hdr = bytes.ReplaceAll(hdr, []byte(" "), nil) + } + + hdr = setHeader(JSExpectedLastSubjSeq, "12", hdr) + ehdr := genHeader(nil, JSExpectedLastSubjSeqSubj, "foo") + ehdr = genHeader(ehdr, JSExpectedLastSubjSeq, "12") + if !space { + ehdr = bytes.ReplaceAll(ehdr, []byte(" "), nil) + } + require_True(t, bytes.Equal(hdr, ehdr)) + }) + } +} + +func TestSetHeaderOrderingSuffix(t *testing.T) { + for _, space := range []bool{true, false} { + title := "Normal" + if !space { + title = "Trimmed" + } + t.Run(title, func(t *testing.T) { + hdr := []byte("NATS/1.0\r\n\r\n") + + // These headers share the same suffix, the longer subject + // must not invalidate the existence of the shorter one. + hdr = genHeader(hdr, "Previous-Nats-Msg-Id", "user") + hdr = genHeader(hdr, "Nats-Msg-Id", "control") + if !space { + hdr = bytes.ReplaceAll(hdr, []byte(" "), nil) + } + + hdr = setHeader("Nats-Msg-Id", "other", hdr) + ehdr := genHeader(nil, "Previous-Nats-Msg-Id", "user") + ehdr = genHeader(ehdr, "Nats-Msg-Id", "other") + if !space { + ehdr = bytes.ReplaceAll(ehdr, []byte(" "), nil) + } + require_True(t, bytes.Equal(hdr, ehdr)) + }) + } +} + func TestInProcessAllowedConnectionType(t *testing.T) { tmpl := ` listen: "127.0.0.1:-1" diff --git a/server/consumer.go b/server/consumer.go index 080ec7a1fdc..226e5a3177e 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4519,7 +4519,8 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) { var pre *waitingRequest for wr := wq.head; wr != nil; { // Check expiration. - if (eos && wr.noWait && wr.d > 0) || (!wr.expires.IsZero() && now.After(wr.expires)) { + expires := !wr.expires.IsZero() && now.After(wr.expires) + if (eos && wr.noWait) || expires { rdWait := o.replicateDeliveries() if rdWait { // Check if we need to send the timeout after pending replicated deliveries, or can do so immediately. @@ -4528,13 +4529,26 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) { } else { wd.pn, wd.pb = wr.n, wr.b } + // If we still need to wait for replicated deliveries, remove from waiting list. + if rdWait { + wr = remove(pre, wr) + continue + } } - if !rdWait { + // Normally it's a timeout. + if expires { hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b) o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + wr = remove(pre, wr) + continue + } else if wr.expires.IsZero() || wr.d > 0 { + // But if we're NoWait without expiry, we've reached the end of the stream, and we've not delivered any messages. + // Return no messages instead, which is the same as if we'd rejected the pull request initially. + hdr := fmt.Appendf(nil, "NATS/1.0 404 No Messages\r\n\r\n") + o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + wr = remove(pre, wr) + continue } - wr = remove(pre, wr) - continue } // Now check interest. interest := wr.acc.sl.HasInterest(wr.interest) diff --git a/server/events.go b/server/events.go index ff2ee46367e..8bcb3a713fa 100644 --- a/server/events.go +++ b/server/events.go @@ -1735,18 +1735,18 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su node := getHash(si.Name) accountNRG := si.AccountNRG() oldInfo, _ := s.nodeToInfo.Swap(node, nodeInfo{ - si.Name, - si.Version, - si.Cluster, - si.Domain, - si.ID, - si.Tags, - cfg, - stats, - false, - si.JetStreamEnabled(), - si.BinaryStreamSnapshot(), - accountNRG, + name: si.Name, + version: si.Version, + cluster: si.Cluster, + domain: si.Domain, + id: si.ID, + tags: si.Tags, + cfg: cfg, + stats: stats, + offline: false, + js: si.JetStreamEnabled(), + binarySnapshots: si.BinaryStreamSnapshot(), + accountNRG: accountNRG, }) if oldInfo == nil || accountNRG != oldInfo.(nodeInfo).accountNRG { // One of the servers we received statsz from changed its mind about @@ -1789,18 +1789,18 @@ func (s *Server) processNewServer(si *ServerInfo) { // Only update if non-existent if _, ok := s.nodeToInfo.Load(node); !ok { s.nodeToInfo.Store(node, nodeInfo{ - si.Name, - si.Version, - si.Cluster, - si.Domain, - si.ID, - si.Tags, - nil, - nil, - false, - si.JetStreamEnabled(), - si.BinaryStreamSnapshot(), - si.AccountNRG(), + name: si.Name, + version: si.Version, + cluster: si.Cluster, + domain: si.Domain, + id: si.ID, + tags: si.Tags, + cfg: nil, + stats: nil, + offline: false, + js: si.JetStreamEnabled(), + binarySnapshots: si.BinaryStreamSnapshot(), + accountNRG: si.AccountNRG(), }) } } diff --git a/server/events_test.go b/server/events_test.go index 8d4531e6bd8..68d2b83c55a 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1238,7 +1238,7 @@ func TestAccountReqMonitoring(t *testing.T) { s.EnableJetStream(&JetStreamConfig{StoreDir: t.TempDir()}) unusedAcc, _ := createAccount(s) acc, akp := createAccount(s) - acc.EnableJetStream(nil) + acc.EnableJetStream(nil, nil) subsz := fmt.Sprintf(accDirectReqSubj, acc.Name, "SUBSZ") connz := fmt.Sprintf(accDirectReqSubj, acc.Name, "CONNZ") jsz := fmt.Sprintf(accDirectReqSubj, acc.Name, "JSZ") diff --git a/server/filestore.go b/server/filestore.go index 2ac494f853a..3b99dcaad72 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2705,7 +2705,7 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm * if err != nil { continue } - expireOk := seq == lseq && mb.llseq == seq + expireOk := seq == lseq && mb.llseq != llseq && mb.llseq == seq updateLLTS = false // cacheLookup already updated it. if sl.HasInterest(fsm.subj) { return fsm, expireOk, nil @@ -2839,7 +2839,7 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor continue } updateLLTS = false // cacheLookup already updated it. - expireOk := seq == lseq && mb.llseq == seq + expireOk := seq == lseq && mb.llseq != llseq && mb.llseq == seq if isAll { return fsm, expireOk, nil } @@ -4756,6 +4756,17 @@ func (fs *fileStore) enforceMsgLimit() { return } for nmsgs := fs.state.Msgs; nmsgs > uint64(fs.cfg.MaxMsgs); nmsgs = fs.state.Msgs { + // If the first block can be removed fully, purge it entirely without needing to walk sequences. + if len(fs.blks) > 0 { + fmb := fs.blks[0] + fmb.mu.RLock() + msgs := fmb.msgs + fmb.mu.RUnlock() + if nmsgs-msgs > uint64(fs.cfg.MaxMsgs) { + fs.purgeMsgBlock(fmb) + continue + } + } if removed, err := fs.deleteFirstMsg(); err != nil || !removed { fs.rebuildFirst() return @@ -4773,6 +4784,17 @@ func (fs *fileStore) enforceBytesLimit() { return } for bs := fs.state.Bytes; bs > uint64(fs.cfg.MaxBytes); bs = fs.state.Bytes { + // If the first block can be removed fully, purge it entirely without needing to walk sequences. + if len(fs.blks) > 0 { + fmb := fs.blks[0] + fmb.mu.RLock() + bytes := fmb.bytes + fmb.mu.RUnlock() + if bs-bytes > uint64(fs.cfg.MaxBytes) { + fs.purgeMsgBlock(fmb) + continue + } + } if removed, err := fs.deleteFirstMsg(); err != nil || !removed { fs.rebuildFirst() return @@ -9347,6 +9369,25 @@ func (fs *fileStore) forceRemoveMsgBlock(mb *msgBlock) { fs.removeMsgBlockFromList(mb) } +// Purges and removes the msgBlock from the store. +// Lock should be held. +func (fs *fileStore) purgeMsgBlock(mb *msgBlock) { + mb.mu.Lock() + // Update top level accounting. + msgs, bytes := mb.msgs, mb.bytes + if msgs > fs.state.Msgs { + msgs = fs.state.Msgs + } + if bytes > fs.state.Bytes { + bytes = fs.state.Bytes + } + fs.state.Msgs -= msgs + fs.state.Bytes -= bytes + fs.removeMsgBlock(mb) + mb.mu.Unlock() + fs.selectNextFirst() +} + // Called by purge to simply get rid of the cache and close our fds. // Lock should not be held. func (mb *msgBlock) dirtyClose() { @@ -9872,12 +9913,17 @@ func timestampNormalized(t time.Time) int64 { // writeFullState will proceed to write the full meta state iff not complex and time consuming. // Since this is for quick recovery it is optional and should not block/stall normal operations. func (fs *fileStore) writeFullState() error { - return fs._writeFullState(false) + return fs._writeFullState(false, true) } -// forceWriteFullState will proceed to write the full meta state. This should only be called by stop() +// forceWriteFullState will proceed to write the full meta state. func (fs *fileStore) forceWriteFullState() error { - return fs._writeFullState(true) + return fs._writeFullState(true, true) +} + +// forceWriteFullStateLocked will proceed to write the full meta state. This should only be called by stop() +func (fs *fileStore) forceWriteFullStateLocked() error { + return fs._writeFullState(true, false) } // This will write the full binary state for the stream. @@ -9887,11 +9933,22 @@ func (fs *fileStore) forceWriteFullState() error { // 2. PSIM - Per Subject Index Map - Tracks first and last blocks with subjects present. // 3. MBs - Index, Bytes, First and Last Sequence and Timestamps, and the deleted map (avl.seqset). // 4. Last block index and hash of record inclusive to this stream state. -func (fs *fileStore) _writeFullState(force bool) error { +func (fs *fileStore) _writeFullState(force, needLock bool) error { + fsLock := func() { + if needLock { + fs.mu.Lock() + } + } + fsUnlock := func() { + if needLock { + fs.mu.Unlock() + } + } + start := time.Now() - fs.mu.Lock() + fsLock() if fs.closed || fs.dirty == 0 { - fs.mu.Unlock() + fsUnlock() return nil } @@ -9910,7 +9967,7 @@ func (fs *fileStore) _writeFullState(force bool) error { numDeleted = int((fs.state.LastSeq - fs.state.FirstSeq + 1) - fs.state.Msgs) } if numSubjects > numThreshold || numDeleted > numThreshold { - fs.mu.Unlock() + fsUnlock() return errStateTooBig } } @@ -10018,13 +10075,15 @@ func (fs *fileStore) _writeFullState(force bool) error { // Encrypt if needed. if fs.prf != nil { if err := fs.setupAEK(); err != nil { - fs.mu.Unlock() + fsUnlock() return err } nonce := make([]byte, fs.aek.NonceSize(), fs.aek.NonceSize()+len(buf)+fs.aek.Overhead()) if n, err := rand.Read(nonce); err != nil { + fsUnlock() return err } else if n != len(nonce) { + fsUnlock() return fmt.Errorf("not enough nonce bytes read (%d != %d)", n, len(nonce)) } buf = fs.aek.Seal(nonce, nonce, buf, nil) @@ -10041,13 +10100,17 @@ func (fs *fileStore) _writeFullState(force bool) error { statesEqual := trackingStatesEqual(&fs.state, &mstate) // Release lock. - fs.mu.Unlock() + fsUnlock() // Check consistency here. if !statesEqual { fs.warn("Stream state encountered internal inconsistency on write") // Rebuild our fs state from the mb state. - fs.rebuildState(nil) + if needLock { + fs.rebuildState(nil) + } else { + fs.rebuildStateLocked(nil) + } return errCorruptState } @@ -10072,14 +10135,14 @@ func (fs *fileStore) _writeFullState(force bool) error { // Update dirty if successful. if err == nil { - fs.mu.Lock() + fsLock() fs.dirty -= priorDirty - fs.mu.Unlock() + fsUnlock() } // Attempt to write both files, an error in one should not prevent the other from being written. - ttlErr := fs.writeTTLState() - schedErr := fs.writeMsgSchedulingState() + ttlErr := fs.writeTTLState(needLock) + schedErr := fs.writeMsgSchedulingState(needLock) if ttlErr != nil { return ttlErr } else if schedErr != nil { @@ -10088,30 +10151,42 @@ func (fs *fileStore) _writeFullState(force bool) error { return nil } -func (fs *fileStore) writeTTLState() error { - fs.mu.RLock() +func (fs *fileStore) writeTTLState(needLock bool) error { + if needLock { + fs.mu.RLock() + } if fs.ttls == nil { - fs.mu.RUnlock() + if needLock { + fs.mu.RUnlock() + } return nil } fn := filepath.Join(fs.fcfg.StoreDir, msgDir, ttlStreamStateFile) // Must be lseq+1 to identify up to which sequence the TTLs are valid. buf := fs.ttls.Encode(fs.state.LastSeq + 1) - fs.mu.RUnlock() + if needLock { + fs.mu.RUnlock() + } return fs.writeFileWithOptionalSync(fn, buf, defaultFilePerms) } -func (fs *fileStore) writeMsgSchedulingState() error { - fs.mu.RLock() +func (fs *fileStore) writeMsgSchedulingState(needLock bool) error { + if needLock { + fs.mu.RLock() + } if fs.scheduling == nil { - fs.mu.RUnlock() + if needLock { + fs.mu.RUnlock() + } return nil } fn := filepath.Join(fs.fcfg.StoreDir, msgDir, msgSchedulingStreamStateFile) // Must be lseq+1 to identify up to which sequence the schedules are valid. buf := fs.scheduling.encode(fs.state.LastSeq + 1) - fs.mu.RUnlock() + if needLock { + fs.mu.RUnlock() + } return fs.writeFileWithOptionalSync(fn, buf, defaultFilePerms) } @@ -10129,18 +10204,10 @@ func (fs *fileStore) stop(delete, writeState bool) error { return ErrStoreClosed } - // Mark as closing. Do before releasing the lock to writeFullState + // Mark as closing. Do before releasing the lock to wait on the state flush loop // so we don't end up with this function running more than once. fs.closing = true - if writeState { - fs.checkAndFlushLastBlock() - } - fs.closeAllMsgBlocks(false) - - fs.cancelSyncTimer() - fs.cancelAgeChk() - // Release the state flusher loop. if fs.qch != nil { close(fs.qch) @@ -10152,9 +10219,18 @@ func (fs *fileStore) stop(delete, writeState bool) error { fsld := fs.fsld fs.mu.Unlock() <-fsld - // Write full state if needed. If not dirty this is a no-op. - fs.forceWriteFullState() fs.mu.Lock() + + fs.checkAndFlushLastBlock() + } + fs.closeAllMsgBlocks(false) + + fs.cancelSyncTimer() + fs.cancelAgeChk() + + if writeState { + // Write full state if needed. If not dirty this is a no-op. + fs.forceWriteFullStateLocked() } // Mark as closed. Last message block needs to be cleared after diff --git a/server/filestore_test.go b/server/filestore_test.go index 2d9c14e087f..bf6675174cc 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -10963,3 +10963,44 @@ func TestFileStoreEraseMsgErr(t *testing.T) { fs.EraseMsg(2) }) } + +func TestFileStorePurgeMsgBlock(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fcfg.BlockSize = 10 * 33 + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + for range 20 { + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + } + + fs.mu.RLock() + blks := len(fs.blks) + fs.mu.RUnlock() + require_Equal(t, blks, 2) + + state := fs.State() + require_Equal(t, state.FirstSeq, 1) + require_Equal(t, state.LastSeq, 20) + require_Equal(t, state.Msgs, 20) + require_Equal(t, state.Bytes, 20*33) + + // Purging the block should both remove the block and do the accounting. + fmb := fs.getFirstBlock() + fs.mu.Lock() + fs.purgeMsgBlock(fmb) + blks = len(fs.blks) + fs.mu.Unlock() + + require_Equal(t, blks, 1) + state = fs.State() + require_Equal(t, state.FirstSeq, 11) + require_Equal(t, state.LastSeq, 20) + require_Equal(t, state.Msgs, 10) + require_Equal(t, state.Bytes, 10*33) + }) +} diff --git a/server/gateway.go b/server/gateway.go index 085f0e18f54..f4982cc2ce6 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -2561,11 +2561,18 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr return false } + // Copy off original pa in case it changes. + pa := c.pa + mt, _ := c.isMsgTraceEnabled() if mt != nil { - pa := c.pa + // We are going to replace "pa" with our copy of c.pa, but to restore + // to the original copy of c.pa, we need to save it again. + cpa := c.pa msg = mt.setOriginAccountHeaderIfNeeded(c, acc, msg) - defer func() { c.pa = pa }() + defer func() { c.pa = cpa }() + // Update pa with our current c.pa state. + pa = c.pa } var ( @@ -2579,6 +2586,7 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr didDeliver bool prodIsMQTT = c.isMqtt() dlvMsgs int64 + dlvExtraSz int64 ) // Get a subscription from the pool @@ -2676,8 +2684,11 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr } } + // Assume original message + dmsg := msg if mt != nil { - msg = mt.setHopHeader(c, msg) + // If trace is enabled, we need to set the hop header per gateway. + dmsg = mt.setHopHeader(c, dmsg) } // Setup the message header. @@ -2727,16 +2738,22 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr sub.nm, sub.max = 0, 0 sub.client = gwc sub.subject = subject - if c.deliverMsg(prodIsMQTT, sub, acc, subject, mreply, mh, msg, false) { + if c.deliverMsg(prodIsMQTT, sub, acc, subject, mreply, mh, dmsg, false) { // We don't count internal deliveries so count only if sub.icb is nil if sub.icb == nil { dlvMsgs++ + dlvExtraSz += int64(len(dmsg) - len(msg)) } didDeliver = true } + + // If we set the header reset the origin pub args. + if mt != nil { + c.pa = pa + } } if dlvMsgs > 0 { - totalBytes := dlvMsgs * int64(len(msg)) + totalBytes := dlvMsgs*int64(len(msg)) + dlvExtraSz // For non MQTT producers, remove the CR_LF * number of messages if !prodIsMQTT { totalBytes -= dlvMsgs * int64(LEN_CR_LF) diff --git a/server/gateway_test.go b/server/gateway_test.go index c457d115026..de23b97c063 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -33,6 +33,7 @@ import ( "github.com/nats-io/nats-server/v2/logger" "github.com/nats-io/nats.go" + "github.com/nats-io/nkeys" "golang.org/x/crypto/ocsp" . "github.com/nats-io/nats-server/v2/internal/ocsp" @@ -7557,3 +7558,59 @@ func TestGatewayConfigureWriteDeadline(t *testing.T) { require_Equal(t, r.out.wdl, 5*time.Second) }) } + +func TestGatewayProcessRSubNoBlockingAccountFetch(t *testing.T) { + createAccountPubKey := func() string { + kp, err := nkeys.CreateAccount() + require_NoError(t, err) + pubkey, err := kp.PublicKey() + require_NoError(t, err) + return pubkey + } + sysPub := createAccountPubKey() + accPub := createAccountPubKey() + dir := t.TempDir() + conf := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + server_name: srv + operator: %s + system_account: %s + resolver: { + type: cache + dir: '%s' + timeout: "2s" + } + gateway: { + name: "clust-B" + listen: 127.0.0.1:-1 + } + `, ojwt, sysPub, dir))) + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + // Set up a mock gateway client. + c := s.createInternalAccountClient() + c.mu.Lock() + c.gw = &gateway{} + c.gw.outsim = &sync.Map{} + c.nc = &net.IPConn{} + c.mu.Unlock() + + // Receiving a R+ should not be blocking, since we're in the gateway's readLoop. + start := time.Now() + require_NoError(t, c.processGatewayRSub(fmt.Appendf(nil, "%s subj queue 0", accPub))) + c.mu.Lock() + subs := len(c.subs) + c.mu.Unlock() + require_Len(t, subs, 1) + require_LessThan(t, time.Since(start), 100*time.Millisecond) + + // Receiving a R- should not be blocking, since we're in the gateway's readLoop. + start = time.Now() + require_NoError(t, c.processGatewayRUnsub(fmt.Appendf(nil, "%s subj queue", accPub))) + c.mu.Lock() + subs = len(c.subs) + c.mu.Unlock() + require_Len(t, subs, 0) + require_LessThan(t, time.Since(start), 100*time.Millisecond) +} diff --git a/server/jetstream.go b/server/jetstream.go index 0aa2e527aaf..840e56635ae 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -194,6 +194,11 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error { } s.Noticef("Starting JetStream") + start := time.Now() + defer func() { + s.Noticef("Took %s to start JetStream", time.Since(start)) + }() + if config == nil || config.MaxMemory <= 0 || config.MaxStore <= 0 { var storeDir, domain, uniqueTag string var maxStore, maxMem int64 @@ -686,6 +691,11 @@ func (s *Server) DisableJetStream() error { } func (s *Server) enableJetStreamAccounts() error { + // Reuse the same task workers across all accounts, so that we don't explode + // with a large number of goroutines on multi-account systems. + tq := parallelTaskQueue(0) + defer close(tq) + // If we have no configured accounts setup then setup imports on global account. if s.globalAccountOnly() { gacc := s.GlobalAccount() @@ -694,10 +704,10 @@ func (s *Server) enableJetStreamAccounts() error { gacc.jsLimits = defaultJSAccountTiers } gacc.mu.Unlock() - if err := s.configJetStream(gacc); err != nil { + if err := s.configJetStream(gacc, tq); err != nil { return err } - } else if err := s.configAllJetStreamAccounts(); err != nil { + } else if err := s.configAllJetStreamAccounts(tq); err != nil { return fmt.Errorf("Error enabling jetstream on configured accounts: %v", err) } return nil @@ -761,7 +771,7 @@ func (a *Account) enableJetStreamInfoServiceImportOnly() error { return a.enableAllJetStreamServiceImportsAndMappings() } -func (s *Server) configJetStream(acc *Account) error { +func (s *Server) configJetStream(acc *Account, tq chan<- func()) error { if acc == nil { return nil } @@ -778,7 +788,7 @@ func (s *Server) configJetStream(acc *Account) error { return err } } else { - if err := acc.EnableJetStream(jsLimits); err != nil { + if err := acc.EnableJetStream(jsLimits, tq); err != nil { return err } if s.gateway.enabled { @@ -799,7 +809,7 @@ func (s *Server) configJetStream(acc *Account) error { } // configAllJetStreamAccounts walk all configured accounts and turn on jetstream if requested. -func (s *Server) configAllJetStreamAccounts() error { +func (s *Server) configAllJetStreamAccounts(tq chan<- func()) error { // Check to see if system account has been enabled. We could arrive here via reload and // a non-default system account. s.checkJetStreamExports() @@ -839,7 +849,7 @@ func (s *Server) configAllJetStreamAccounts() error { // Process any jetstream enabled accounts here. These will be accounts we are // already aware of at startup etc. for _, acc := range jsAccounts { - if err := s.configJetStream(acc); err != nil { + if err := s.configJetStream(acc, tq); err != nil { return err } } @@ -852,7 +862,7 @@ func (s *Server) configAllJetStreamAccounts() error { // Only load up ones not already loaded since they are processed above. if _, ok := accounts.Load(accName); !ok { if acc, err := s.lookupAccount(accName); err != nil && acc != nil { - if err := s.configJetStream(acc); err != nil { + if err := s.configJetStream(acc, tq); err != nil { return err } } @@ -1100,7 +1110,7 @@ func (a *Account) assignJetStreamLimits(limits map[string]JetStreamAccountLimits // EnableJetStream will enable JetStream on this account with the defined limits. // This is a helper for JetStreamEnableAccount. -func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) error { +func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq chan<- func()) error { a.mu.RLock() s := a.srv a.mu.RUnlock() @@ -1253,30 +1263,139 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro } } - // Collect consumers, do after all streams. - type ce struct { - mset *stream - odir string - } - var consumers []*ce - - // Collect any interest policy streams to check for - // https://github.com/nats-io/nats-server/issues/3612 - var ipstreams []*stream - // Remember if we should be encrypted and what cipher we think we should use. encrypted := s.getOpts().JetStreamKey != _EMPTY_ - plaintext := true sc := s.getOpts().JetStreamCipher + doConsumers := func(mset *stream, odir string) { + ofis, _ := os.ReadDir(odir) + if len(ofis) > 0 { + s.Noticef(" Recovering %d consumers for stream - '%s > %s'", len(ofis), mset.accName(), mset.name()) + } + for _, ofi := range ofis { + metafile := filepath.Join(odir, ofi.Name(), JetStreamMetaFile) + metasum := filepath.Join(odir, ofi.Name(), JetStreamMetaFileSum) + if _, err := os.Stat(metafile); os.IsNotExist(err) { + s.Warnf(" Missing consumer metafile %q", metafile) + continue + } + buf, err := os.ReadFile(metafile) + if err != nil { + s.Warnf(" Error reading consumer metafile %q: %v", metafile, err) + continue + } + if _, err := os.Stat(metasum); os.IsNotExist(err) { + s.Warnf(" Missing consumer checksum for %q", metasum) + continue + } + + // Check if we are encrypted. + if key, err := os.ReadFile(filepath.Join(odir, ofi.Name(), JetStreamMetaFileKey)); err == nil { + s.Debugf(" Consumer metafile is encrypted, reading encrypted keyfile") + // Decode the buffer before proceeding. + ctxName := mset.name() + tsep + ofi.Name() + nbuf, _, err := s.decryptMeta(sc, key, buf, a.Name, ctxName) + if err != nil { + s.Warnf(" Error decrypting our consumer metafile: %v", err) + continue + } + buf = nbuf + } + + var cfg FileConsumerInfo + decoder := json.NewDecoder(bytes.NewReader(buf)) + decoder.DisallowUnknownFields() + strictErr := decoder.Decode(&cfg) + if strictErr != nil { + cfg = FileConsumerInfo{} + if err := json.Unmarshal(buf, &cfg); err != nil { + s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, err) + continue + } + } + if supported := supportsRequiredApiLevel(cfg.Metadata); !supported || strictErr != nil { + var offlineReason string + if !supported { + apiLevel := getRequiredApiLevel(cfg.Metadata) + if strictErr != nil { + offlineReason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(strictErr.Error(), "json: ")) + } else { + offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel) + } + s.Warnf(" Detected unsupported consumer '%s > %s > %s': %s", a.Name, mset.name(), cfg.Name, offlineReason) + } else { + offlineReason = fmt.Sprintf("decoding error: %v", strictErr) + s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, strictErr) + } + singleServerMode := !s.JetStreamIsClustered() && s.standAloneMode() + if singleServerMode { + if !mset.closed.Load() { + s.Warnf(" Stopping unsupported stream '%s > %s'", a.Name, mset.name()) + mset.mu.Lock() + mset.offlineReason = fmt.Sprintf("stopped - unsupported consumer %q", cfg.Name) + mset.mu.Unlock() + mset.stop(false, false) + } + + // Fake a consumer, so we can respond to API requests as single-server. + o := &consumer{ + mset: mset, + js: s.getJetStream(), + acc: a, + srv: s, + cfg: cfg.ConsumerConfig, + active: false, + stream: mset.name(), + name: cfg.Name, + dseq: 1, + sseq: 1, + created: time.Now().UTC(), + closed: true, + offlineReason: offlineReason, + } + if !cfg.Created.IsZero() { + o.created = cfg.Created + } + + mset.mu.Lock() + mset.setConsumer(o) + mset.mu.Unlock() + } + continue + } + + isEphemeral := !isDurableConsumer(&cfg.ConsumerConfig) + if isEphemeral { + // This is an ephemeral consumer and this could fail on restart until + // the consumer can reconnect. We will create it as a durable and switch it. + cfg.ConsumerConfig.Durable = ofi.Name() + } + obs, err := mset.addConsumerWithAssignment(&cfg.ConsumerConfig, _EMPTY_, nil, true, ActionCreateOrUpdate, false) + if err != nil { + s.Warnf(" Error adding consumer %q: %v", cfg.Name, err) + continue + } + if isEphemeral { + obs.switchToEphemeral() + } + if !cfg.Created.IsZero() { + obs.setCreatedTime(cfg.Created) + } + if err != nil { + s.Warnf(" Error restoring consumer %q state: %v", cfg.Name, err) + } + } + } + // Now recover the streams. fis, _ := os.ReadDir(sdir) - for _, fi := range fis { + doStream := func(fi os.DirEntry) error { + plaintext := true mdir := filepath.Join(sdir, fi.Name()) // Check for partially deleted streams. They are marked with "." prefix. if strings.HasPrefix(fi.Name(), tsep) { go os.RemoveAll(mdir) - continue + return nil } key := sha256.Sum256([]byte(fi.Name())) hh, err := highwayhash.New64(key[:]) @@ -1287,27 +1406,27 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro metasum := filepath.Join(mdir, JetStreamMetaFileSum) if _, err := os.Stat(metafile); os.IsNotExist(err) { s.Warnf(" Missing stream metafile for %q", metafile) - continue + return nil } buf, err := os.ReadFile(metafile) if err != nil { s.Warnf(" Error reading metafile %q: %v", metafile, err) - continue + return nil } if _, err := os.Stat(metasum); os.IsNotExist(err) { s.Warnf(" Missing stream checksum file %q", metasum) - continue + return nil } sum, err := os.ReadFile(metasum) if err != nil { s.Warnf(" Error reading Stream metafile checksum %q: %v", metasum, err) - continue + return nil } hh.Write(buf) checksum := hex.EncodeToString(hh.Sum(nil)) if checksum != string(sum) { s.Warnf(" Stream metafile %q: checksums do not match %q vs %q", metafile, sum, checksum) - continue + return nil } // Track if we are converting ciphers. @@ -1320,14 +1439,14 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro s.Debugf(" Stream metafile is encrypted, reading encrypted keyfile") if len(keyBuf) < minMetaKeySize { s.Warnf(" Bad stream encryption key length of %d", len(keyBuf)) - continue + return nil } // Decode the buffer before proceeding. var nbuf []byte nbuf, convertingCiphers, err = s.decryptMeta(sc, keyBuf, buf, a.Name, fi.Name()) if err != nil { s.Warnf(" Error decrypting our stream metafile: %v", err) - continue + return nil } buf = nbuf plaintext = false @@ -1341,7 +1460,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro cfg = FileStreamInfo{} if err := json.Unmarshal(buf, &cfg); err != nil { s.Warnf(" Error unmarshalling stream metafile %q: %v", metafile, err) - continue + return nil } } if supported := supportsRequiredApiLevel(cfg.Metadata); !supported || strictErr != nil { @@ -1384,13 +1503,16 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro // Now do the consumers. odir := filepath.Join(sdir, fi.Name(), consumerDir) - consumers = append(consumers, &ce{mset, odir}) + doConsumers(mset, odir) } - continue + return nil } if cfg.Template != _EMPTY_ { - if err := jsa.addStreamNameToTemplate(cfg.Template, cfg.Name); err != nil { + jsa.mu.Lock() + err := jsa.addStreamNameToTemplate(cfg.Template, cfg.Name) + jsa.mu.Unlock() + if err != nil { s.Warnf(" Error adding stream %q to template %q: %v", cfg.Name, cfg.Template, err) } } @@ -1415,7 +1537,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro } } if hadSubjErr { - continue + return nil } // The other possible bug is assigning subjects to mirrors, so check for that and patch as well. @@ -1449,7 +1571,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro s.Warnf(" Error replacing meta keyfile for stream %q: %v", cfg.Name, err) } } - continue + return nil } if !cfg.Created.IsZero() { mset.setCreatedTime(cfg.Created) @@ -1514,146 +1636,41 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro s.Noticef(" Restored %s messages for stream '%s > %s' in %v", comma(int64(state.Msgs)), mset.accName(), mset.name(), time.Since(rt).Round(time.Millisecond)) + // Now do the consumers. + odir := filepath.Join(sdir, fi.Name(), consumerDir) + doConsumers(mset, odir) + // Collect to check for dangling messages. // TODO(dlc) - Can be removed eventually. if cfg.StreamConfig.Retention == InterestPolicy { - ipstreams = append(ipstreams, mset) + mset.checkForOrphanMsgs() + mset.checkConsumerReplication() } - // Now do the consumers. - odir := filepath.Join(sdir, fi.Name(), consumerDir) - consumers = append(consumers, &ce{mset, odir}) + return nil } - for _, e := range consumers { - ofis, _ := os.ReadDir(e.odir) - if len(ofis) > 0 { - s.Noticef(" Recovering %d consumers for stream - '%s > %s'", len(ofis), e.mset.accName(), e.mset.name()) - } - for _, ofi := range ofis { - metafile := filepath.Join(e.odir, ofi.Name(), JetStreamMetaFile) - metasum := filepath.Join(e.odir, ofi.Name(), JetStreamMetaFileSum) - if _, err := os.Stat(metafile); os.IsNotExist(err) { - s.Warnf(" Missing consumer metafile %q", metafile) - continue - } - buf, err := os.ReadFile(metafile) - if err != nil { - s.Warnf(" Error reading consumer metafile %q: %v", metafile, err) - continue - } - if _, err := os.Stat(metasum); os.IsNotExist(err) { - s.Warnf(" Missing consumer checksum for %q", metasum) - continue - } - - // Check if we are encrypted. - if key, err := os.ReadFile(filepath.Join(e.odir, ofi.Name(), JetStreamMetaFileKey)); err == nil { - s.Debugf(" Consumer metafile is encrypted, reading encrypted keyfile") - // Decode the buffer before proceeding. - ctxName := e.mset.name() + tsep + ofi.Name() - nbuf, _, err := s.decryptMeta(sc, key, buf, a.Name, ctxName) - if err != nil { - s.Warnf(" Error decrypting our consumer metafile: %v", err) - continue - } - buf = nbuf - } - - var cfg FileConsumerInfo - decoder := json.NewDecoder(bytes.NewReader(buf)) - decoder.DisallowUnknownFields() - strictErr := decoder.Decode(&cfg) - if strictErr != nil { - cfg = FileConsumerInfo{} - if err := json.Unmarshal(buf, &cfg); err != nil { - s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, err) - continue - } - } - if supported := supportsRequiredApiLevel(cfg.Metadata); !supported || strictErr != nil { - var offlineReason string - if !supported { - apiLevel := getRequiredApiLevel(cfg.Metadata) - if strictErr != nil { - offlineReason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(strictErr.Error(), "json: ")) - } else { - offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel) - } - s.Warnf(" Detected unsupported consumer '%s > %s > %s': %s", a.Name, e.mset.name(), cfg.Name, offlineReason) - } else { - offlineReason = fmt.Sprintf("decoding error: %v", strictErr) - s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, strictErr) - } - singleServerMode := !s.JetStreamIsClustered() && s.standAloneMode() - if singleServerMode { - if !e.mset.closed.Load() { - s.Warnf(" Stopping unsupported stream '%s > %s'", a.Name, e.mset.name()) - e.mset.mu.Lock() - e.mset.offlineReason = fmt.Sprintf("stopped - unsupported consumer %q", cfg.Name) - e.mset.mu.Unlock() - e.mset.stop(false, false) - } - - // Fake a consumer, so we can respond to API requests as single-server. - o := &consumer{ - mset: e.mset, - js: s.getJetStream(), - acc: a, - srv: s, - cfg: cfg.ConsumerConfig, - active: false, - stream: e.mset.name(), - name: cfg.Name, - dseq: 1, - sseq: 1, - created: time.Now().UTC(), - closed: true, - offlineReason: offlineReason, - } - if !cfg.Created.IsZero() { - o.created = cfg.Created - } - - e.mset.mu.Lock() - e.mset.setConsumer(o) - e.mset.mu.Unlock() - } - continue - } - - isEphemeral := !isDurableConsumer(&cfg.ConsumerConfig) - if isEphemeral { - // This is an ephemeral consumer and this could fail on restart until - // the consumer can reconnect. We will create it as a durable and switch it. - cfg.ConsumerConfig.Durable = ofi.Name() - } - obs, err := e.mset.addConsumerWithAssignment(&cfg.ConsumerConfig, _EMPTY_, nil, true, ActionCreateOrUpdate, false) - if err != nil { - s.Warnf(" Error adding consumer %q: %v", cfg.Name, err) - continue - } - if isEphemeral { - obs.switchToEphemeral() - } - if !cfg.Created.IsZero() { - obs.setCreatedTime(cfg.Created) - } - if err != nil { - s.Warnf(" Error restoring consumer %q state: %v", cfg.Name, err) + if tq != nil { + // If a parallelTaskQueue was provided then use that for concurrency. + var wg sync.WaitGroup + wg.Add(len(fis)) + for _, fi := range fis { + tq <- func() { + doStream(fi) + wg.Done() } } + wg.Wait() + } else { + // No parallelTaskQueue provided, do inline as before. + for _, fi := range fis { + doStream(fi) + } } // Make sure to cleanup any old remaining snapshots. os.RemoveAll(filepath.Join(jsa.storeDir, snapsDir)) - // Check interest policy streams for auto cleanup. - for _, mset := range ipstreams { - mset.checkForOrphanMsgs() - mset.checkConsumerReplication() - } - s.Debugf("JetStream state for account %q recovered", a.Name) return nil diff --git a/server/jetstream_benchmark_test.go b/server/jetstream_benchmark_test.go index bdd263da39d..c9dd7097e0d 100644 --- a/server/jetstream_benchmark_test.go +++ b/server/jetstream_benchmark_test.go @@ -19,8 +19,13 @@ import ( "encoding/json" "fmt" "math" + "math/bits" "math/rand" + "os" + "path/filepath" + "runtime" "strconv" + "strings" "sync" "sync/atomic" "testing" @@ -2184,6 +2189,61 @@ func BenchmarkJetStreamPublishConcurrent(b *testing.B) { } } +func BenchmarkJetStreamParallelStartup(b *testing.B) { + omp := runtime.GOMAXPROCS(-1) + streams, msgs, cardinality := omp, 100_000, 10_000 + + _, s, shutdown, nc, js := startJSClusterAndConnect(b, 1) + defer shutdown() + jsc := *s.JetStreamConfig() + sd := strings.TrimSuffix(jsc.StoreDir, "/jetstream") + + b.Logf("Building %d streams with %d messages, %d subjects...", streams, msgs, cardinality) + start := time.Now() + for i := range streams { + jsStreamCreate(b, nc, &StreamConfig{ + Name: fmt.Sprintf("stream_%d", i), + Subjects: []string{fmt.Sprintf("%d.>", i)}, + Storage: FileStorage, + }) + for n := range msgs { + subj := fmt.Sprintf("%d.%d", i, n%cardinality) + _, err := js.Publish(subj, nil) + require_NoError(b, err) + } + } + b.Logf("Streams built in %s", time.Since(start)) + + bench := func(b *testing.B) { + s.shutdownJetStream() + jsc.StoreDir = sd + require_NoError(b, filepath.Walk(jsc.StoreDir, func(path string, info os.FileInfo, err error) error { + require_NoError(b, err) + if info.Mode().IsRegular() && info.Name() == "index.db" { + return os.Truncate(path, 0) + } + return nil + })) + b.ResetTimer() + s.EnableJetStream(&jsc) + } + + // Try to step down GOMAXPROCS in common CPU core counts. + mp := 1 << (bits.Len(uint(omp)) - 1) + if omp > mp { + b.Run(fmt.Sprintf("GOMAXPROCS=%d", omp), func(b *testing.B) { + bench(b) + }) + } + for ; mp >= 1; mp >>= 1 { + b.Run(fmt.Sprintf("GOMAXPROCS=%d", mp), func(b *testing.B) { + runtime.GOMAXPROCS(mp) + defer runtime.GOMAXPROCS(omp) + bench(b) + }) + } +} + // Helper function to stand up a JS-enabled single server or cluster func startJSClusterAndConnect(b *testing.B, clusterSize int) (c *cluster, s *Server, shutdown func(), nc *nats.Conn, js nats.JetStreamContext) { b.Helper() diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 1cef7348847..a40e9590cd1 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -896,6 +896,9 @@ func (js *jetStream) setupMetaGroup() error { } if cfg.Observer { s.Noticef("Turning JetStream metadata controller Observer Mode on") + s.Noticef("In cases where the JetStream domain is not intended to be extended through a SYS account leaf node connection") + s.Noticef("and waiting for leader election until first contact is not acceptable,") + s.Noticef(`manually disable Observer Mode by setting the JetStream Option "extension_hint: %s"`, jsNoExtend) } } else { s.Noticef("JetStream cluster recovering state") @@ -909,7 +912,7 @@ func (js *jetStream) setupMetaGroup() error { cfg.Observer = false case extUndetermined: s.Noticef("Turning JetStream metadata controller Observer Mode on - no previous contact") - s.Noticef("In cases where JetStream will not be extended") + s.Noticef("In cases where the JetStream domain is not intended to be extended through a SYS account leaf node connection") s.Noticef("and waiting for leader election until first contact is not acceptable,") s.Noticef(`manually disable Observer Mode by setting the JetStream Option "extension_hint: %s"`, jsNoExtend) } @@ -1341,13 +1344,17 @@ func (js *jetStream) monitorCluster() { js.setMetaRecovering() // Snapshotting function. - doSnapshot := func() { + doSnapshot := func(force bool) { // Suppress during recovery. if js.isMetaRecovering() { return } - // For the meta layer we want to snapshot when asked if we need one or have any entries that we can compact. - if ne, _ := n.Size(); ne > 0 || n.NeedSnapshot() { + // Look up what the threshold is for compaction. Re-reading from config here as it is reloadable. + js.srv.optsMu.RLock() + thresh := js.srv.opts.JetStreamMetaCompact + js.srv.optsMu.RUnlock() + // For the meta layer we want to snapshot when over the above threshold (which could be 0 by default). + if ne, _ := n.Size(); force || ne > thresh || n.NeedSnapshot() { snap, err := js.metaSnapshot() if err != nil { s.Warnf("Error generating JetStream cluster snapshot: %v", err) @@ -1376,15 +1383,15 @@ func (js *jetStream) monitorCluster() { select { case <-s.quitCh: // Server shutting down, but we might receive this before qch, so try to snapshot. - doSnapshot() + doSnapshot(false) return case <-rqch: // Clean signal from shutdown routine so do best effort attempt to snapshot meta layer. - doSnapshot() + doSnapshot(false) return case <-qch: // Clean signal from shutdown routine so do best effort attempt to snapshot meta layer. - doSnapshot() + doSnapshot(false) // Return the signal back since shutdown will be waiting. close(qch) return @@ -1420,6 +1427,8 @@ func (js *jetStream) monitorCluster() { // Clear. ru = nil s.Debugf("Recovered JetStream cluster metadata") + // Snapshot now so we start with freshly compacted log. + doSnapshot(true) oc = time.AfterFunc(30*time.Second, js.checkForOrphans) // Do a health check here as well. go checkHealth() @@ -1432,9 +1441,9 @@ func (js *jetStream) monitorCluster() { _, nb = n.Applied(ce.Index) } if js.hasPeerEntries(ce.Entries) || (didSnap && !isLeader) { - doSnapshot() + doSnapshot(true) } else if nb > compactSizeMin && time.Since(lastSnapTime) > minSnapDelta { - doSnapshot() + doSnapshot(false) } } else { s.Warnf("Error applying JetStream cluster entries: %v", err) @@ -1450,11 +1459,11 @@ func (js *jetStream) monitorCluster() { s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil) // Install a snapshot as we become leader. js.checkClusterSize() - doSnapshot() + doSnapshot(false) } case <-t.C: - doSnapshot() + doSnapshot(false) // Periodically check the cluster size. if n.Leader() { js.checkClusterSize() @@ -2614,11 +2623,19 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps return case <-mqch: // Clean signal from shutdown routine so do best effort attempt to snapshot. - doSnapshot() + // Don't snapshot if not shutting down, monitor goroutine could be going away + // on a scale down or a remove for example. + if s.isShuttingDown() { + doSnapshot() + } return case <-qch: // Clean signal from shutdown routine so do best effort attempt to snapshot. - doSnapshot() + // Don't snapshot if not shutting down, Raft node could be going away on a + // scale down or remove for example. + if s.isShuttingDown() { + doSnapshot() + } return case <-aq.ch: var ne, nb uint64 @@ -5512,11 +5529,19 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { return case <-mqch: // Clean signal from shutdown routine so do best effort attempt to snapshot. - doSnapshot(false) + // Don't snapshot if not shutting down, monitor goroutine could be going away + // on a scale down or a remove for example. + if s.isShuttingDown() { + doSnapshot(false) + } return case <-qch: // Clean signal from shutdown routine so do best effort attempt to snapshot. - doSnapshot(false) + // Don't snapshot if not shutting down, Raft node could be going away on a + // scale down or remove for example. + if s.isShuttingDown() { + doSnapshot(false) + } return case <-aq.ch: ces := aq.pop() @@ -8756,6 +8781,13 @@ func (mset *stream) stateSnapshotLocked() []byte { } // Older v1 version with deleted as a sorted []uint64. + // For a stream with millions or billions of interior deletes, this will be huge. + // Now that all server versions 2.10.+ support binary snapshots, we should never fall back. + assert.Unreachable("Legacy JSON stream snapshot used", map[string]any{ + "stream": mset.cfg.Name, + "account": mset.acc.Name, + }) + state := mset.store.State() snap := &streamSnapshot{ Msgs: state.Msgs, diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index e480db7080c..80257d0f163 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -9234,13 +9234,11 @@ func TestJetStreamClusterAsyncFlushFileStoreFlushOnSnapshot(t *testing.T) { // Confirm above write is pending. require_Equal(t, lmb.pendingWriteSize(), 33) - // Stop stream monitor routine, which will install a snapshot on shutdown. - mset.mu.Lock() - if mset.mqch != nil { - close(mset.mqch) - mset.mqch = nil - } - mset.mu.Unlock() + // Make the upper layer snapshot by sending leader change signal. + // It doesn't matter that we're already leader, it still gets handled. + // Previously this used the mqch, but that now only snapshots on shutdown. + n.leadc <- true + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { n.Lock() snap, err := n.loadLastSnapshot() @@ -10447,6 +10445,122 @@ func TestJetStreamClusterJszRaftLeaderReporting(t *testing.T) { } } +func TestJetStreamClusterNoInterestDesyncOnConsumerCreate(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + + // Pick a random server that will not know about the new consumer being created. + // If servers determine "no interest" individually, these servers will desync. + rs := c.randomNonLeader() + sjs := rs.getJetStream() + meta := sjs.getMetaGroup() + require_NoError(t, meta.PauseApply()) + + sub, err := js.PullSubscribe(_EMPTY_, "DURABLE", nats.BindStream("TEST")) + require_NoError(t, err) + defer sub.Drain() + + checkConsumersAssigned := func(expected int) { + t.Helper() + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + var count int + for _, s := range c.servers { + _, _, jsa := s.globalAccount().getJetStreamFromAccount() + if jsa.consumerAssigned("TEST", "DURABLE") { + count++ + } + } + if count != expected { + return fmt.Errorf("expected %d, got %d", expected, count) + } + return nil + }) + } + // Confirm only two servers know about the consumer. + checkConsumersAssigned(2) + c.waitOnConsumerLeader(globalAccountName, "TEST", "DURABLE") + + // Publish a single message. All servers will receive this, but only two will store it. + _, err = js.Publish("foo", nil) + require_NoError(t, err) + checkLastSeq := func(lseq uint64) { + t.Helper() + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + for _, s := range c.servers { + mset, err := s.globalAccount().lookupStream("TEST") + if err != nil { + return err + } + if seq := mset.lastSeq(); seq != lseq { + return fmt.Errorf("expected %d, got %d", lseq, seq) + } + } + return nil + }) + } + checkLastSeq(1) + + // Resume the meta layer such that the consumer gets created on the remaining server. + meta.ResumeApply() + checkConsumersAssigned(3) + + // All servers will now store another published message. + _, err = js.Publish("foo", nil) + require_NoError(t, err) + checkLastSeq(2) + + // Make sure the consumer leader is on the same server that didn't store the first message. + cl := c.consumerLeader(globalAccountName, "TEST", "DURABLE") + if cl != rs { + mset, err := cl.globalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("DURABLE") + require_NotNil(t, o) + n := o.raftNode() + require_NoError(t, n.StepDown(rs.NodeName())) + c.waitOnConsumerLeader(globalAccountName, "TEST", "DURABLE") + cl = c.consumerLeader(globalAccountName, "TEST", "DURABLE") + require_Equal(t, cl, rs) + } + + // Since the consumer leader is the same as the server that didn't store the first message, + // it can only receive and ack the second message. + msgs, err := sub.Fetch(1, nats.MaxWait(time.Second)) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + metadata, err := msgs[0].Metadata() + require_NoError(t, err) + require_Equal(t, metadata.Sequence.Stream, 2) + require_Equal(t, metadata.NumPending, 0) + require_NoError(t, msgs[0].AckSync()) + + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + // The servers will eventually be synced up again, but this relies on the interest state being checked. + for _, s := range c.servers { + if s == rs { + continue + } + mset, err := s.globalAccount().lookupStream("TEST") + if err != nil { + return err + } + mset.checkInterestState() + } + return checkState(t, c, globalAccountName, "TEST") + }) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 516ae2459f5..250bacb7aab 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -7078,3 +7078,56 @@ func TestJetStreamClusterAccountMaxConnectionsReconnect(t *testing.T) { return nil }) } + +func TestJetStreamClusterMetaCompactThreshold(t *testing.T) { + for _, thres := range []uint64{0, 5, 10} { + t.Run(fmt.Sprintf("%d", thres), func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R1TEST", 3) + defer c.shutdown() + for _, s := range c.servers { + s.optsMu.Lock() + s.opts.JetStreamMetaCompact = thres + s.optsMu.Unlock() + } + + nc, _ := jsClientConnect(t, c.servers[0]) + defer nc.Close() + + leader := c.leader() + _, cc := leader.getJetStreamCluster() + rg := cc.meta.(*raft) + + for i := range uint64(15) { + rg.RLock() + papplied := rg.papplied + rg.RUnlock() + + jsStreamCreate(t, nc, &StreamConfig{ + Name: fmt.Sprintf("test_%d", i), + Subjects: []string{fmt.Sprintf("test.%d", i)}, + Storage: MemoryStorage, + }) + + // Kicking the leader change channel is the easiest way to + // trick monitorCluster() into calling doSnapshot(). + entries, _ := cc.meta.Size() + cc.meta.(*raft).leadc <- true + + // Should we have compacted on this iteration? + if entries > thres { + checkFor(t, time.Second, 5*time.Millisecond, func() error { + rg.RLock() + npapplied := rg.papplied + rg.RUnlock() + if npapplied <= papplied { + return fmt.Errorf("haven't snapshotted yet (%d <= %d)", npapplied, papplied) + } + return nil + }) + entries, _ = cc.meta.Size() + require_Equal(t, entries, 0) + } + } + }) + } +} diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index cb7883d277d..9cc79db264d 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -10341,3 +10341,86 @@ func TestJetStreamConsumerMaxDeliverUnderflow(t *testing.T) { o.mu.RUnlock() require_Equal(t, maxdc, 0) } + +// https://github.com/nats-io/nats-server/issues/7457 +func TestJetStreamConsumerNoWaitNoMessagesOnEos(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}}) + require_NoError(t, err) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"}) + require_NoError(t, err) + + sub, err := nc.SubscribeSync("reply") + require_NoError(t, err) + defer sub.Drain() + require_NoError(t, nc.Flush()) + + mset, err := s.globalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + // Fiddle with the pending count such that the NoWait request will go through, + // and the "404 No Messages" will be sent when hitting the end of the stream. + o.mu.Lock() + o.npc++ + o.mu.Unlock() + + req := &JSApiConsumerGetNextRequest{NoWait: true} + jreq, err := json.Marshal(req) + require_NoError(t, err) + o.processNextMsgRequest("reply", jreq) + + msg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, msg.Header.Get("Status"), "404") + require_Equal(t, msg.Header.Get("Description"), "No Messages") +} + +// https://github.com/nats-io/nats-server/issues/5373 +func TestJetStreamConsumerNoWaitNoMessagesOnEosWithDeliveredMsgs(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}}) + require_NoError(t, err) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"}) + require_NoError(t, err) + + _, err = js.Publish("foo", []byte("msg")) + require_NoError(t, err) + + sub, err := nc.SubscribeSync("reply") + require_NoError(t, err) + defer sub.Drain() + require_NoError(t, nc.Flush()) + + mset, err := s.globalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + req := &JSApiConsumerGetNextRequest{NoWait: true, Batch: 2} + jreq, err := json.Marshal(req) + require_NoError(t, err) + o.processNextMsgRequest("reply", jreq) + + msg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, msg.Subject, "foo") + require_Equal(t, string(msg.Data), "msg") + + // We requested two messages but the stream only contained 1. + msg, err = sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, msg.Header.Get("Status"), "404") + require_Equal(t, msg.Header.Get("Description"), "No Messages") +} diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index bb626ec242b..332db4b5fd6 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -4411,7 +4411,7 @@ func TestJetStreamSuperClusterMixedModeSwitchToInterestOnlyOperatorConfig(t *tes if gw.Name == opts.Gateway.Name { continue } - checkGWInterestOnlyMode(t, s, gw.Name, apub) + checkGWInterestOnlyModeOrNotPresent(t, s, gw.Name, apub, true) } } s = sc.serverByName(si.Cluster.Leader) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 6762f4fcb86..d5d47fb55b2 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -152,7 +152,7 @@ func TestJetStreamEnableAndDisableAccount(t *testing.T) { } acc, _ := s.LookupOrRegisterAccount("$FOO") - if err := acc.EnableJetStream(nil); err != nil { + if err := acc.EnableJetStream(nil, nil); err != nil { t.Fatalf("Did not expect error on enabling account: %v", err) } if na := s.JetStreamNumAccounts(); na != 1 { @@ -171,7 +171,7 @@ func TestJetStreamEnableAndDisableAccount(t *testing.T) { } // Should get an error for trying to enable a non-registered account. acc = NewAccount("$BAZ") - if err := acc.EnableJetStream(nil); err == nil { + if err := acc.EnableJetStream(nil, nil); err == nil { t.Fatalf("Expected error on enabling account that was not registered") } } @@ -4869,20 +4869,20 @@ func TestJetStreamSystemLimits(t *testing.T) { } } - if err := facc.EnableJetStream(limits(24, 192)); err != nil { + if err := facc.EnableJetStream(limits(24, 192), nil); err != nil { t.Fatalf("Unexpected error: %v", err) } // Use up rest of our resources in memory - if err := bacc.EnableJetStream(limits(1000, 0)); err != nil { + if err := bacc.EnableJetStream(limits(1000, 0), nil); err != nil { t.Fatalf("Unexpected error: %v", err) } // Now ask for more memory. Should error. - if err := zacc.EnableJetStream(limits(1000, 0)); err == nil { + if err := zacc.EnableJetStream(limits(1000, 0), nil); err == nil { t.Fatalf("Expected an error when exhausting memory resource limits") } // Disk too. - if err := zacc.EnableJetStream(limits(0, 10000)); err == nil { + if err := zacc.EnableJetStream(limits(0, 10000), nil); err == nil { t.Fatalf("Expected an error when exhausting memory resource limits") } facc.DisableJetStream() @@ -4896,7 +4896,7 @@ func TestJetStreamSystemLimits(t *testing.T) { t.Fatalf("Expected reserved memory and store to be 0, got %v and %v", friendlyBytes(rm), friendlyBytes(rd)) } - if err := facc.EnableJetStream(limits(24, 192)); err != nil { + if err := facc.EnableJetStream(limits(24, 192), nil); err != nil { t.Fatalf("Unexpected error: %v", err) } // Test Adjust @@ -7409,7 +7409,7 @@ func TestJetStreamCanNotEnableOnSystemAccount(t *testing.T) { defer s.Shutdown() sa := s.SystemAccount() - if err := sa.EnableJetStream(nil); err == nil { + if err := sa.EnableJetStream(nil, nil); err == nil { t.Fatalf("Expected an error trying to enable on the system account") } } @@ -22254,3 +22254,106 @@ func TestJetStreamScheduledMessageNotDeactivated(t *testing.T) { }) } } + +func TestJetStreamDirectGetBatchParallelWriteDeadlock(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: nats.FileStorage, + AllowDirect: true, + }) + require_NoError(t, err) + + mset, err := s.globalAccount().lookupStream("TEST") + require_NoError(t, err) + for range 2 { + require_NoError(t, mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 0, 0, nil, false, true)) + } + + // We'll lock the message blocks such that we can't read, but NumPending should still function. + fs := mset.store.(*fileStore) + fs.lockAllMsgBlocks() + total, validThrough := fs.NumPending(1, _EMPTY_, false) + require_Equal(t, total, 2) + require_Equal(t, validThrough, 2) + + // We'll now run things in the following order: + // - do a read through Direct Batch Get, which is blocked by the message blocks being locked + // - do a write in parallel, which is blocked by the read to complete + // - unlock the message blocks while both read and write goroutines are active + // If there's no deadlock the read and write will complete, and 3 messages will end up in the stream. + var wg, read sync.WaitGroup + read.Add(1) + wg.Add(1) + go func() { + read.Done() + mset.getDirectRequest(&JSApiMsgGetRequest{Seq: 1, Batch: 2}, _EMPTY_) + }() + go func() { + // Make sure we enter getDirectRequest first. + read.Wait() + <-time.After(100 * time.Millisecond) + wg.Done() + mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 0, 0, nil, false, true) + }() + go func() { + // Run some time after we've entered processJetStreamMsg above. + wg.Wait() + <-time.After(100 * time.Millisecond) + fs.unlockAllMsgBlocks() + }() + read.Wait() + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + if msgs := mset.state().Msgs; msgs != 3 { + return fmt.Errorf("expected 3 msgs, got %d", msgs) + } + return nil + }) +} + +func TestJetStreamReloadMetaCompact(t *testing.T) { + storeDir := t.TempDir() + + conf := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + jetstream: { + max_mem_store: 2MB + max_file_store: 8MB + store_dir: '%s' + } + `, storeDir))) + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + require_Equal(t, s.getOpts().JetStreamMetaCompact, 0) + + reloadUpdateConfig(t, s, conf, fmt.Sprintf(` + listen: 127.0.0.1:-1 + jetstream: { + max_mem_store: 2MB + max_file_store: 8MB + store_dir: '%s' + meta_compact: 100 + } + `, storeDir)) + + require_Equal(t, s.getOpts().JetStreamMetaCompact, 100) + + reloadUpdateConfig(t, s, conf, fmt.Sprintf(` + listen: 127.0.0.1:-1 + jetstream: { + max_mem_store: 2MB + max_file_store: 8MB + store_dir: '%s' + meta_compact: 0 + } + `, storeDir)) + + require_Equal(t, s.getOpts().JetStreamMetaCompact, 0) +} diff --git a/server/leafnode.go b/server/leafnode.go index 0049823d5cb..5e233f7d99d 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1049,17 +1049,22 @@ func (c *client) sendLeafConnect(clusterName string, headers bool) error { // In addition, and this is to allow auth callout, set user/password or // token if applicable. if userInfo := c.leaf.remote.curURL.User; userInfo != nil { - // For backward compatibility, if only username is provided, set both - // Token and User, not just Token. cinfo.User = userInfo.Username() var ok bool cinfo.Pass, ok = userInfo.Password() + // For backward compatibility, if only username is provided, set both + // Token and User, not just Token. if !ok { cinfo.Token = cinfo.User } } else if c.leaf.remote.username != _EMPTY_ { cinfo.User = c.leaf.remote.username cinfo.Pass = c.leaf.remote.password + // For backward compatibility, if only username is provided, set both + // Token and User, not just Token. + if cinfo.Pass == _EMPTY_ { + cinfo.Token = cinfo.User + } } b, err := json.Marshal(cinfo) if err != nil { @@ -2421,7 +2426,8 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) { // updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-. func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscription, delta int32) { - acc, err := s.LookupAccount(accName) + // Since we're in the gateway's readLoop, and we would otherwise block, don't allow fetching. + acc, err := s.lookupOrFetchAccount(accName, false) if acc == nil || err != nil { s.Debugf("No or bad account for %q, failed to update interest from gateway", accName) return diff --git a/server/leafnode_test.go b/server/leafnode_test.go index d0ea7305dd6..9d7f94bd227 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -10830,3 +10830,37 @@ func TestLeafNodeConfigureWriteDeadline(t *testing.T) { require_Equal(t, r.out.wdl, 6*time.Second) }) } + +// https://github.com/nats-io/nats-server/issues/7441 +func TestLeafNodesBasicTokenAuth(t *testing.T) { + hubConf := createConfFile(t, []byte(` + server_name: "HUB" + listen: "127.0.0.1:-1" + authorization { + token: secret + } + leafnodes { + listen: "127.0.0.1:-1" + } + `)) + hub, ohub := RunServerWithConfig(hubConf) + defer hub.Shutdown() + + port := ohub.LeafNode.Port + leafTmpl := ` + server_name: "LEAF" + listen: "127.0.0.1:-1" + leafnodes { + remotes: [ + { url: "nats://secret@localhost:%d" } + ] + } + ` + leafConf := createConfFile(t, fmt.Appendf(nil, leafTmpl, port)) + leaf, _ := RunServerWithConfig(leafConf) + defer leaf.Shutdown() + + // Verify that we have only 1 leaf + checkLeafNodeConnected(t, hub) + checkLeafNodeConnected(t, leaf) +} diff --git a/server/monitor.go b/server/monitor.go index 83a239d5300..c59a47b81a6 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1501,7 +1501,8 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { LeafNodes %s Gateways %s Raft Groups %s - Health Probe %s + Health Probe %s + Expvar %s Help `, @@ -1518,6 +1519,7 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { s.basePath(GatewayzPath), GatewayzPath, s.basePath(RaftzPath), RaftzPath, s.basePath(HealthzPath), HealthzPath, + s.basePath(ExpvarzPath), ExpvarzPath, ) } diff --git a/server/mqtt_test.go b/server/mqtt_test.go index e779096ea07..c08c75e94b3 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -1035,7 +1035,7 @@ func testMQTTEnableJSForAccount(t *testing.T, s *Server, accName string) { MaxStore: 1024 * 1024, }, } - if err := acc.EnableJetStream(limits); err != nil { + if err := acc.EnableJetStream(limits, nil); err != nil { t.Fatalf("Error enabling JS: %v", err) } } diff --git a/server/msgtrace_test.go b/server/msgtrace_test.go index 57b72186f94..769a5e2cd4c 100644 --- a/server/msgtrace_test.go +++ b/server/msgtrace_test.go @@ -1496,17 +1496,33 @@ func TestMsgTraceWithGateways(t *testing.T) { s1 := runGatewayServer(o1) defer s1.Shutdown() - waitForOutboundGateways(t, s1, 1, time.Second) - waitForInboundGateways(t, s2, 1, time.Second) - waitForOutboundGateways(t, s2, 1, time.Second) + o3 := testGatewayOptionsFromToWithServers(t, "C", "B", s2) + o3.NoSystemAccount = false + s3 := runGatewayServer(o3) + defer s3.Shutdown() + + waitForOutboundGateways(t, s1, 2, time.Second) + waitForInboundGateways(t, s1, 2, time.Second) + waitForInboundGateways(t, s2, 2, time.Second) + waitForOutboundGateways(t, s2, 2, time.Second) + waitForInboundGateways(t, s3, 2, time.Second) + waitForOutboundGateways(t, s3, 2, time.Second) nc2 := natsConnect(t, s2.ClientURL(), nats.Name("sub2")) defer nc2.Close() - sub2 := natsQueueSubSync(t, nc2, "foo.*", "my_queue") + sub2 := natsQueueSubSync(t, nc2, "foo.*", "my_queue_2") + + nc22 := natsConnect(t, s2.ClientURL(), nats.Name("sub22")) + defer nc22.Close() + sub22 := natsQueueSubSync(t, nc22, "*.*", "my_queue_22") - nc3 := natsConnect(t, s2.ClientURL(), nats.Name("sub3")) + nc3 := natsConnect(t, s3.ClientURL(), nats.Name("sub3")) defer nc3.Close() - sub3 := natsQueueSubSync(t, nc3, "*.*", "my_queue_2") + sub3 := natsQueueSubSync(t, nc3, "foo.*", "my_queue_3") + + nc32 := natsConnect(t, s3.ClientURL(), nats.Name("sub32")) + defer nc32.Close() + sub32 := natsQueueSubSync(t, nc32, "*.*", "my_queue_32") nc1 := natsConnect(t, s1.ClientURL(), nats.Name("sub1")) defer nc1.Close() @@ -1540,17 +1556,18 @@ func TestMsgTraceWithGateways(t *testing.T) { checkAppMsg := func(sub *nats.Subscription, expected bool) { if expected { appMsg := natsNexMsg(t, sub, time.Second) - require_Equal[string](t, string(appMsg.Data), "hello!") + require_Equal(t, string(appMsg.Data), "hello!") } // Check that no (more) messages are received. if msg, err := sub.NextMsg(100 * time.Millisecond); err != nats.ErrTimeout { t.Fatalf("Did not expect application message, got %s", msg.Data) } } - for _, sub := range []*nats.Subscription{sub1, sub2, sub3} { + for _, sub := range []*nats.Subscription{sub1, sub2, sub22, sub3, sub32} { checkAppMsg(sub, test.deliverMsg) } + var previousHop string check := func() { traceMsg := natsNexMsg(t, traceSub, time.Second) var e MsgTraceEvent @@ -1560,58 +1577,81 @@ func TestMsgTraceWithGateways(t *testing.T) { require_True(t, ingress != nil) switch ingress.Kind { case CLIENT: - require_Equal[string](t, e.Server.Name, s1.Name()) - require_Equal[string](t, ingress.Account, globalAccountName) - require_Equal[string](t, ingress.Subject, "foo.bar") + require_Equal(t, e.Server.Name, s1.Name()) + require_Equal(t, ingress.Account, globalAccountName) + require_Equal(t, ingress.Subject, "foo.bar") egress := e.Egresses() - require_Equal[int](t, len(egress), 2) + require_Equal(t, len(egress), 3) for _, eg := range egress { switch eg.Kind { case CLIENT: - require_Equal[string](t, eg.Name, "sub1") - require_Equal[string](t, eg.Subscription, "*.bar") - require_Equal[string](t, eg.Queue, _EMPTY_) + require_Equal(t, eg.Name, "sub1") + require_Equal(t, eg.Subscription, "*.bar") + require_Equal(t, eg.Queue, _EMPTY_) case GATEWAY: - require_Equal[string](t, eg.Name, s2.Name()) - require_Equal[string](t, eg.Error, _EMPTY_) - require_Equal[string](t, eg.Subscription, _EMPTY_) - require_Equal[string](t, eg.Queue, _EMPTY_) + if eg.Name != s2.Name() && eg.Name != s3.Name() { + t.Fatalf("Expected name to be %q or %q, got %q", s2.Name(), s3.Name(), eg.Name) + } + require_Equal(t, eg.Error, _EMPTY_) + require_Equal(t, eg.Subscription, _EMPTY_) + require_Equal(t, eg.Queue, _EMPTY_) default: t.Fatalf("Unexpected egress: %+v", eg) } } case GATEWAY: - require_Equal[string](t, e.Server.Name, s2.Name()) - require_Equal[string](t, ingress.Account, globalAccountName) - require_Equal[string](t, ingress.Subject, "foo.bar") + require_True(t, e.Request.Header != nil) + require_Len(t, len(e.Request.Header[MsgTraceHop]), 1) + hop := e.Request.Header[MsgTraceHop][0] + require_True(t, hop == "1" || hop == "2") + if previousHop == _EMPTY_ { + previousHop = hop + } else if hop == previousHop { + t.Fatalf("Expected different hop value, got the same %q", hop) + } + var sub2Name, queue2Name, sub3Name, queue3Name string + switch e.Server.Name { + case s2.Name(): + require_Equal(t, e.Server.Cluster, "B") + sub2Name, sub3Name = "sub2", "sub22" + queue2Name, queue3Name = "my_queue_2", "my_queue_22" + case s3.Name(): + require_Equal(t, e.Server.Cluster, "C") + sub2Name, sub3Name = "sub3", "sub32" + queue2Name, queue3Name = "my_queue_3", "my_queue_32" + default: + t.Fatalf("Unexpected server name %q", e.Server.Name) + } + require_Equal(t, ingress.Account, globalAccountName) + require_Equal(t, ingress.Subject, "foo.bar") egress := e.Egresses() - require_Equal[int](t, len(egress), 2) + require_Equal(t, len(egress), 2) var gotSub2, gotSub3 int for _, eg := range egress { require_True(t, eg.Kind == CLIENT) switch eg.Name { - case "sub2": - require_Equal[string](t, eg.Subscription, "foo.*") - require_Equal[string](t, eg.Queue, "my_queue") + case sub2Name: + require_Equal(t, eg.Subscription, "foo.*") + require_Equal(t, eg.Queue, queue2Name) gotSub2++ - case "sub3": - require_Equal[string](t, eg.Subscription, "*.*") - require_Equal[string](t, eg.Queue, "my_queue_2") + case sub3Name: + require_Equal(t, eg.Subscription, "*.*") + require_Equal(t, eg.Queue, queue3Name) gotSub3++ default: t.Fatalf("Unexpected egress name: %+v", eg) } } - require_Equal[int](t, gotSub2, 1) - require_Equal[int](t, gotSub3, 1) - + require_Equal(t, gotSub2, 1) + require_Equal(t, gotSub3, 1) default: t.Fatalf("Unexpected ingress: %+v", ingress) } } - // We should get 2 events - check() - check() + // We should get 3 events + for range 3 { + check() + } // Make sure we are not receiving more traces if tm, err := traceSub.NextMsg(250 * time.Millisecond); err == nil { t.Fatalf("Should not have received trace message: %s", tm.Data) diff --git a/server/opts.go b/server/opts.go index f7aed1081e5..d05b3074954 100644 --- a/server/opts.go +++ b/server/opts.go @@ -383,6 +383,7 @@ type Options struct { JetStreamTpm JSTpmOpts JetStreamMaxCatchup int64 JetStreamRequestQueueLimit int64 + JetStreamMetaCompact uint64 StreamMaxBufferedMsgs int `json:"-"` StreamMaxBufferedSize int64 `json:"-"` StoreDir string `json:"-"` @@ -2603,6 +2604,12 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} } opts.JetStreamRequestQueueLimit = lim + case "meta_compact": + thres, ok := mv.(int64) + if !ok || thres < 0 { + return &configErr{tk, fmt.Sprintf("Expected an absolute size for %q, got %v", mk, mv)} + } + opts.JetStreamMetaCompact = uint64(thres) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -2889,6 +2896,7 @@ func parseRemoteLeafNodes(v any, errors *[]error, warnings *[]error) ([]*RemoteL continue } remote := &RemoteLeafOpts{} + var proxyToken token for k, v := range rm { tk, v = unwrapValue(v, <) switch strings.ToLower(k) { @@ -3022,7 +3030,7 @@ func parseRemoteLeafNodes(v any, errors *[]error, warnings *[]error) ([]*RemoteL continue } // Capture the token for the "proxy" field itself, before the map iteration - proxyToken := tk + proxyToken = tk for pk, pv := range proxyMap { tk, pv = unwrapValue(pv, <) switch strings.ToLower(pk) { @@ -3047,16 +3055,6 @@ func parseRemoteLeafNodes(v any, errors *[]error, warnings *[]error) ([]*RemoteL } } } - // Use the saved proxy token for validation errors, not the last field token - if warns, err := validateLeafNodeProxyOptions(remote); err != nil { - *errors = append(*errors, &configErr{proxyToken, err.Error()}) - continue - } else { - // Add any warnings about proxy configuration - for _, warn := range warns { - *warnings = append(*warnings, &configErr{proxyToken, warn}) - } - } default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -3070,6 +3068,16 @@ func parseRemoteLeafNodes(v any, errors *[]error, warnings *[]error) ([]*RemoteL } } } + // Use the saved proxy token for validation errors, not the last field token + if warns, err := validateLeafNodeProxyOptions(remote); err != nil { + *errors = append(*errors, &configErr{proxyToken, err.Error()}) + continue + } else { + // Add any warnings about proxy configuration + for _, warn := range warns { + *warnings = append(*warnings, &configErr{proxyToken, warn}) + } + } remotes = append(remotes, remote) } return remotes, nil diff --git a/server/raft.go b/server/raft.go index 7ea1ee54c85..163d3e6e061 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2652,9 +2652,6 @@ func (n *raft) runAsLeader() { n.unsubscribe(rpsub) n.Unlock() }() - - // To send out our initial peer state. - n.sendPeerState() n.Unlock() hb := time.NewTicker(hbInterval) @@ -4627,37 +4624,19 @@ func (n *raft) switchToLeader() { } n.Lock() + defer n.Unlock() n.debug("Switching to leader") - // Check if we have items pending as we are taking over. - sendHB := n.pindex > n.commit - n.lxfer = false n.updateLeader(n.id) - leadChange := n.switchState(Leader) - - if leadChange { - // Wait for messages to be applied if we've stored more, otherwise signal immediately. - // It's important to wait signaling we're leader if we're not up-to-date yet, as that - // would mean we're in a consistent state compared with the previous leader. - if n.pindex > n.applied { - n.aflr = n.pindex - } else { - // We know we have applied all entries in our log and can signal immediately. - // For sanity reset applied floor back down to 0, so we aren't able to signal twice. - n.aflr = 0 - if !n.leaderState.Swap(true) { - // Only update timestamp if leader state actually changed. - nowts := time.Now().UTC() - n.leaderSince.Store(&nowts) - } - n.updateLeadChange(true) - } - } - n.Unlock() + n.switchState(Leader) - if sendHB { - n.sendHeartbeat() - } + // To send out our initial peer state. + // In our implementation this is equivalent to sending a NOOP-entry upon becoming leader. + // Wait for this message (and potentially more) to be applied. + // It's important to wait signaling we're leader if we're not up-to-date yet, as that + // would mean we're in a consistent state compared with the previous leader. + n.sendPeerState() + n.aflr = n.pindex } diff --git a/server/raft_test.go b/server/raft_test.go index e202237c9b3..2ae33c73b6f 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1570,11 +1570,11 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { aeMsg2 := encode(t, &appendEntry{leader: nats1, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) // Timeline, we temporarily became leader - aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 2, pterm: 1, pindex: 2, entries: nil}) - aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 2, pterm: 1, pindex: 2, entries: entries}) + aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 3, pterm: 1, pindex: 3, entries: nil}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 3, pterm: 1, pindex: 3, entries: entries}) // Timeline, old leader is back. - aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 3, commit: 2, pterm: 1, pindex: 2, entries: nil}) + aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 3, commit: 3, pterm: 1, pindex: 3, entries: nil}) // Simply receive first message. n.processAppendEntry(aeMsg1, n.aesub) @@ -1604,10 +1604,10 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { reply: _EMPTY_, success: true, }) - require_Equal(t, n.commit, 2) + require_Equal(t, n.commit, 3) // Simulate upper layer calling down to apply. - n.Applied(2) + n.Applied(3) // Install snapshot and check it exists. err = n.InstallSnapshot(nil) @@ -1621,9 +1621,9 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { // Store a third message, it stays uncommitted. require_NoError(t, n.storeToWAL(aeMsg3)) - require_Equal(t, n.commit, 2) + require_Equal(t, n.commit, 3) require_Equal(t, n.wal.State().Msgs, 1) - entry, err = n.loadEntry(3) + entry, err = n.loadEntry(4) require_NoError(t, err) require_Equal(t, entry.leader, nats0) @@ -1631,8 +1631,8 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { n.stepdown(noLeader) n.processAppendEntry(aeHeartbeat2, n.aesub) require_Equal(t, n.wal.State().Msgs, 0) - require_Equal(t, n.commit, 2) - require_Equal(t, n.applied, 2) + require_Equal(t, n.commit, 3) + require_Equal(t, n.applied, 3) } func TestNRGIgnoreDoubleSnapshot(t *testing.T) { @@ -2212,7 +2212,7 @@ func TestNRGHealthCheckWaitForPendingCommitsWhenPaused(t *testing.T) { require_True(t, n.Healthy()) } -func TestNRGHeartbeatCanEstablishQuorumAfterLeaderChange(t *testing.T) { +func TestNRGAppendEntryCanEstablishQuorumAfterLeaderChange(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() @@ -2224,7 +2224,7 @@ func TestNRGHeartbeatCanEstablishQuorumAfterLeaderChange(t *testing.T) { // Timeline aeMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) - aeHeartbeatResponse := &appendEntryResponse{term: 1, index: 1, peer: nats0, success: true} + aeHeartbeatResponse := &appendEntryResponse{term: 1, index: 2, peer: nats0, success: true} // Process first message. n.processAppendEntry(aeMsg, n.aesub) @@ -2234,16 +2234,16 @@ func TestNRGHeartbeatCanEstablishQuorumAfterLeaderChange(t *testing.T) { // Simulate becoming leader, and not knowing if the stored entry has quorum and can be committed. // Switching to leader should send a heartbeat. n.switchToLeader() - require_Equal(t, n.aflr, 1) + require_Equal(t, n.aflr, 2) require_Equal(t, n.commit, 0) // We simulate receiving the successful heartbeat response here. It should move the commit up. n.processAppendEntryResponse(aeHeartbeatResponse) - require_Equal(t, n.commit, 1) - require_Equal(t, n.aflr, 1) + require_Equal(t, n.commit, 2) + require_Equal(t, n.aflr, 2) // Once the entry is applied, it should reset the applied floor. - n.Applied(1) + n.Applied(2) require_Equal(t, n.aflr, 0) } @@ -2251,10 +2251,6 @@ func TestNRGQuorumAccounting(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() - // Create a sample entry, the content doesn't matter, just that it's stored. - esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) - entries := []*Entry{newEntry(EntryNormal, esm)} - nats1 := "yrzKKRBu" // "nats-1" nats2 := "cnrtt3eg" // "nats-2" @@ -2267,10 +2263,8 @@ func TestNRGQuorumAccounting(t *testing.T) { require_Equal(t, n.csz, 5) require_Equal(t, n.qn, 3) - // Switch this node to leader, and send an entry. + // Switch this node to leader which sends an entry. n.switchToLeader() - require_Equal(t, n.pindex, 0) - n.sendAppendEntry(entries) require_Equal(t, n.pindex, 1) // The first response MUST NOT indicate quorum has been reached. @@ -2286,10 +2280,6 @@ func TestNRGRevalidateQuorumAfterLeaderChange(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() - // Create a sample entry, the content doesn't matter, just that it's stored. - esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) - entries := []*Entry{newEntry(EntryNormal, esm)} - nats1 := "yrzKKRBu" // "nats-1" nats2 := "cnrtt3eg" // "nats-2" @@ -2302,12 +2292,10 @@ func TestNRGRevalidateQuorumAfterLeaderChange(t *testing.T) { require_Equal(t, n.csz, 5) require_Equal(t, n.qn, 3) - // Switch this node to leader, and send an entry. + // Switch this node to leader which sends an entry. n.term++ n.switchToLeader() require_Equal(t, n.term, 1) - require_Equal(t, n.pindex, 0) - n.sendAppendEntry(entries) require_Equal(t, n.pindex, 1) // We have one server that signals the message was stored. The leader will add 1 to the acks count. @@ -2354,10 +2342,10 @@ func TestNRGSignalLeadChangeFalseIfCampaignImmediately(t *testing.T) { n.switchToCandidate() n.switchToLeader() select { - case isLeader := <-n.LeadChangeC(): - require_True(t, isLeader) + case <-n.LeadChangeC(): + t.Error("Expected no leadChange signal") default: - t.Error("Expected leadChange signal") + // Expecting no signal yet. } }, }, @@ -2460,16 +2448,10 @@ func TestNRGIgnoreTrackResponseWhenNotLeader(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() - // Create a sample entry, the content doesn't matter, just that it's stored. - esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) - entries := []*Entry{newEntry(EntryNormal, esm)} - - // Switch this node to leader, and send two entries. The first will get quorum, the second will not. + // Switch this node to leader which sends an entry. n.term++ n.switchToLeader() require_Equal(t, n.term, 1) - require_Equal(t, n.pindex, 0) - n.sendAppendEntry(entries) require_Equal(t, n.pindex, 1) require_Equal(t, n.pterm, 1) require_Equal(t, n.commit, 0) @@ -3753,6 +3735,35 @@ func TestNRGParallelCatchupRollback(t *testing.T) { require_Equal(t, n.pindex, 2) } +func TestNRGReportLeaderAfterNoopEntry(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + require_Equal(t, n.State(), Follower) + require_Equal(t, n.term, 0) + require_False(t, n.Leader()) + + n.switchToCandidate() + require_Equal(t, n.State(), Candidate) + require_Equal(t, n.term, 1) + require_False(t, n.Leader()) + + // Switching to leader will put us into Leader state, + // but we're not necessarily an up-to-date leader yet. + n.switchToLeader() + require_Equal(t, n.State(), Leader) + require_Equal(t, n.term, 1) + require_Equal(t, n.pindex, 1) // Should've sent a NOOP-entry to establish leadership. + require_Equal(t, n.applied, 0) + require_False(t, n.Leader()) + + // Once we commit and apply the final entry, we should starting to report we're leader. + n.commit = 1 + n.Applied(1) + require_Equal(t, n.applied, 1) + require_True(t, n.Leader()) +} + // This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before // proposing the next one. // The test may fail if: diff --git a/server/reload.go b/server/reload.go index 89594d1b9e3..d3530a550ad 100644 --- a/server/reload.go +++ b/server/reload.go @@ -1257,7 +1257,7 @@ func imposeOrder(value any) error { slices.SortFunc(value.Gateways, func(i, j *RemoteGatewayOpts) int { return cmp.Compare(i.Name, j.Name) }) case WebsocketOpts: slices.Sort(value.AllowedOrigins) - case string, bool, uint8, uint16, int, int32, int64, time.Duration, float64, nil, LeafNodeOpts, ClusterOpts, *tls.Config, PinnedCertSet, + case string, bool, uint8, uint16, uint64, int, int32, int64, time.Duration, float64, nil, LeafNodeOpts, ClusterOpts, *tls.Config, PinnedCertSet, *URLAccResolver, *MemAccResolver, *DirAccResolver, *CacheDirAccResolver, Authentication, MQTTOpts, jwt.TagList, *OCSPConfig, map[string]string, JSLimitOpts, StoreCipher, *OCSPResponseCacheConfig, *ProxiesConfig: // explicitly skipped types @@ -1659,6 +1659,8 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { return nil, fmt.Errorf("config reload not supported for jetstream max memory and store") } } + case "jetstreammetacompact": + // Allowed at runtime but monitorCluster looks at s.opts directly, so no further work needed here. case "websocket": // Similar to gateways tmpOld := oldValue.(WebsocketOpts) diff --git a/server/route.go b/server/route.go index 008e6ede51c..57e5320fa70 100644 --- a/server/route.go +++ b/server/route.go @@ -2346,8 +2346,20 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo bool, gossipMod if doOnce { // check to be consistent and future proof. but will be same domain if s.sameDomain(info.Domain) { - s.nodeToInfo.Store(rHash, - nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false, false}) + s.nodeToInfo.Store(rHash, nodeInfo{ + name: rn, + version: s.info.Version, + cluster: s.info.Cluster, + domain: info.Domain, + id: id, + tags: nil, + cfg: nil, + stats: nil, + offline: false, + js: info.JetStream, + binarySnapshots: true, // Updated default to true. Versions 2.10.0+ support it. + accountNRG: false, + }) } } diff --git a/server/server.go b/server/server.go index fb3e472b87e..b366475da6a 100644 --- a/server/server.go +++ b/server/server.go @@ -44,6 +44,8 @@ import ( // Allow dynamic profiling. _ "net/http/pprof" + "expvar" + "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats-server/v2/logger" @@ -841,15 +843,18 @@ func NewServer(opts *Options) (*Server, error) { if opts.JetStream { ourNode := getHash(serverName) s.nodeToInfo.Store(ourNode, nodeInfo{ - serverName, - VERSION, - opts.Cluster.Name, - opts.JetStreamDomain, - info.ID, - opts.Tags, - &JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, CompressOK: true}, - nil, - false, true, true, true, + name: serverName, + version: VERSION, + cluster: opts.Cluster.Name, + domain: opts.JetStreamDomain, + id: info.ID, + tags: opts.Tags, + cfg: &JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, CompressOK: true}, + stats: nil, + offline: false, + js: true, + binarySnapshots: true, + accountNRG: true, }) } @@ -2049,6 +2054,13 @@ func (s *Server) setRouteInfo(acc *Account) { // associated with an account name. // Lock MUST NOT be held upon entry. func (s *Server) lookupAccount(name string) (*Account, error) { + return s.lookupOrFetchAccount(name, true) +} + +// lookupOrFetchAccount is a function to return the account structure +// associated with an account name. +// Lock MUST NOT be held upon entry. +func (s *Server) lookupOrFetchAccount(name string, fetch bool) (*Account, error) { var acc *Account if v, ok := s.accounts.Load(name); ok { acc = v.(*Account) @@ -2058,7 +2070,7 @@ func (s *Server) lookupAccount(name string) (*Account, error) { // return the latest information from the resolver. if acc.IsExpired() { s.Debugf("Requested account [%s] has expired", name) - if s.AccountResolver() != nil { + if s.AccountResolver() != nil && fetch { if err := s.updateAccount(acc); err != nil { // This error could mask expired, so just return expired here. return nil, ErrAccountExpired @@ -2070,7 +2082,7 @@ func (s *Server) lookupAccount(name string) (*Account, error) { return acc, nil } // If we have a resolver see if it can fetch the account. - if s.AccountResolver() == nil { + if s.AccountResolver() == nil || !fetch { return nil, ErrMissingAccount } return s.fetchAccount(name) @@ -3017,6 +3029,7 @@ const ( HealthzPath = "/healthz" IPQueuesPath = "/ipqueuesz" RaftzPath = "/raftz" + ExpvarzPath = "/debug/vars" ) func (s *Server) basePath(p string) string { @@ -3135,6 +3148,8 @@ func (s *Server) startMonitoring(secure bool) error { mux.HandleFunc(s.basePath(IPQueuesPath), s.HandleIPQueuesz) // Raftz mux.HandleFunc(s.basePath(RaftzPath), s.HandleRaftz) + // Expvarz + mux.Handle(s.basePath(ExpvarzPath), expvar.Handler()) // Do not set a WriteTimeout because it could cause cURL/browser // to return empty response or unable to display page if the diff --git a/server/stream.go b/server/stream.go index 1329c1642a3..514d4b970de 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5287,8 +5287,8 @@ func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) { // If batch was requested send EOB. if isBatchRequest { - // Update if the stream's lasts sequence has moved past our validThrough. - if mset.lastSeq() > validThrough { + // Update if the stream's last sequence has moved past our validThrough. + if mset.lseq > validThrough { np, _ = store.NumPending(seq, req.NextFor, false) } hdr := fmt.Appendf(nil, eob, np, lseq) @@ -6507,7 +6507,7 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr } // Reject unsupported headers. - if getExpectedLastMsgId(hdr) != _EMPTY_ { + if getExpectedLastMsgId(bhdr) != _EMPTY_ { return errorOnUnsupported(seq, JSExpectedLastMsgId) } @@ -7593,7 +7593,19 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) bool { // Only propose message deletion to the stream if we're consumer leader, otherwise all followers would also propose. // We must be the consumer leader, since we know for sure we've stored the message and don't register as pre-ack. if o != nil && !o.IsLeader() { + // Currently, interest-based streams can race on "no interest" because consumer creates/updates go over + // the meta layer and published messages go over the stream layer. Some servers could then either store + // or not store some initial set of messages that gained new interest. To get the stream back in sync, + // we allow moving the first sequence up. + // TODO(mvv): later on only the stream leader should determine "no interest" + interestRaiseFirst := mset.cfg.Retention == InterestPolicy && seq == state.FirstSeq mset.mu.Unlock() + if interestRaiseFirst { + if _, err := store.RemoveMsg(seq); err == ErrStoreEOF { + // This should not happen, but being pedantic. + mset.registerPreAckLock(o, seq) + } + } // Must still mark as removal if follower. If we become leader later, we must be able to retry the proposal. return true } diff --git a/server/util.go b/server/util.go index dcfa1000d43..4e08e3f0cda 100644 --- a/server/util.go +++ b/server/util.go @@ -23,6 +23,7 @@ import ( "net" "net/url" "reflect" + "runtime" "strconv" "strings" "time" @@ -340,3 +341,25 @@ func generateInfoJSON(info *Info) []byte { pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)} return bytes.Join(pcs, []byte(" ")) } + +// parallelTaskQueue starts a number of goroutines and returns a channel +// which functions can be sent to for queued parallel execution. The +// goroutines will stop running when the returned channel is closed and +// all queued tasks have completed. The passed in mp limits concurrency, +// or a value <= 0 will default to GOMAXPROCS. +func parallelTaskQueue(mp int) chan<- func() { + if rmp := runtime.GOMAXPROCS(-1); mp <= 0 { + mp = rmp + } else { + mp = max(rmp, mp) + } + tq := make(chan func(), mp) + for range mp { + go func() { + for fn := range tq { + fn() + } + }() + } + return tq +}