diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 86202c678db..992681b52b7 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -33,13 +33,13 @@ jobs: - name: Install cosign # Use commit hash here to avoid a re-tagging attack, as this is a third-party action - # Commit d7543c93d881b35a8faa02e8e3605f69b7a1ce62 = tag v3.10.0 - uses: sigstore/cosign-installer@d7543c93d881b35a8faa02e8e3605f69b7a1ce62 + # Commit faadad0cce49287aee09b3a48701e75088a2c6ad = tag v4.0.0 + uses: sigstore/cosign-installer@faadad0cce49287aee09b3a48701e75088a2c6ad - name: Install syft # Use commit hash here to avoid a re-tagging attack, as this is a third-party action - # Commit f8bdd1d8ac5e901a77a92f111440fdb1b593736b = tag v0.20.6 - uses: anchore/sbom-action/download-syft@f8bdd1d8ac5e901a77a92f111440fdb1b593736b + # Commit 8e94d75ddd33f69f691467e42275782e4bfefe84 = tag v0.20.9 + uses: anchore/sbom-action/download-syft@8e94d75ddd33f69f691467e42275782e4bfefe84 with: syft-version: "v1.27.1" diff --git a/.goreleaser.yml b/.goreleaser.yml index 409d17c5c5b..a20f523dd82 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -21,7 +21,7 @@ builds: env: # This is the toolchain version we use for releases. To override, set the env var, e.g.: # GORELEASER_TOOLCHAIN="go1.22.8" TARGET='linux_amd64' goreleaser build --snapshot --clean --single-target - - GOTOOLCHAIN={{ envOrDefault "GORELEASER_TOOLCHAIN" "go1.24.7" }} + - GOTOOLCHAIN={{ envOrDefault "GORELEASER_TOOLCHAIN" "go1.25.3" }} - GO111MODULE=on - CGO_ENABLED=0 goos: @@ -38,6 +38,8 @@ builds: - mips64le - s390x - ppc64le + # RISC-V currently only supported on Linux + - riscv64 goarm: - 6 - 7 @@ -52,6 +54,12 @@ builds: goarch: arm64 - goos: freebsd goarch: 386 + - goos: darwin + goarch: riscv64 + - goos: windows + goarch: riscv64 + - goos: freebsd + goarch: riscv64 mod_timestamp: "{{ .CommitTimestamp }}" nfpms: diff --git a/go.mod b/go.mod index d0548ef336e..1dd04c5cf90 100644 --- a/go.mod +++ b/go.mod @@ -2,17 +2,19 @@ module github.com/nats-io/nats-server/v2 go 1.24.0 +toolchain go1.24.9 + require ( github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op github.com/google/go-tpm v0.9.6 - github.com/klauspost/compress v1.18.0 + github.com/klauspost/compress v1.18.1 github.com/minio/highwayhash v1.0.3 github.com/nats-io/jwt/v2 v2.8.0 - github.com/nats-io/nats.go v1.46.1 + github.com/nats-io/nats.go v1.47.0 github.com/nats-io/nkeys v0.4.11 github.com/nats-io/nuid v1.0.1 go.uber.org/automaxprocs v1.6.0 - golang.org/x/crypto v0.42.0 - golang.org/x/sys v0.36.0 - golang.org/x/time v0.13.0 + golang.org/x/crypto v0.43.0 + golang.org/x/sys v0.37.0 + golang.org/x/time v0.14.0 ) diff --git a/go.sum b/go.sum index 3f4b9fba47f..b351d963d39 100644 --- a/go.sum +++ b/go.sum @@ -4,14 +4,14 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/go-tpm v0.9.6 h1:Ku42PT4LmjDu1H5C5ISWLlpI1mj+Zq7sPGKoRw2XROA= github.com/google/go-tpm v0.9.6/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= +github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g= github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= -github.com/nats-io/nats.go v1.46.1 h1:bqQ2ZcxVd2lpYI97xYASeRTY3I5boe/IVmuUDPitHfo= -github.com/nats-io/nats.go v1.46.1/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nats.go v1.47.0 h1:YQdADw6J/UfGUd2Oy6tn4Hq6YHxCaJrVKayxxFqYrgM= +github.com/nats-io/nats.go v1.47.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -24,12 +24,12 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= -golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI= -golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8= +golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04= +golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= -golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/time v0.13.0 h1:eUlYslOIt32DgYD6utsuUeHs4d7AsEYLuIAdg7FlYgI= -golang.org/x/time v0.13.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= +golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= +golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= +golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/server/.tmp b/server/.tmp new file mode 100644 index 00000000000..67d91f16b8d Binary files /dev/null and b/server/.tmp differ diff --git a/server/ciphersuites.go b/server/ciphersuites.go index 740db5128cb..1c5ec3a5765 100644 --- a/server/ciphersuites.go +++ b/server/ciphersuites.go @@ -14,6 +14,7 @@ package server import ( + "crypto/fips140" "crypto/tls" ) @@ -94,6 +95,16 @@ var curvePreferenceMap = map[string]tls.CurveID{ // reorder to default to the highest level of security. See: // https://blog.bracebin.com/achieving-perfect-ssl-labs-score-with-go func defaultCurvePreferences() []tls.CurveID { + if fips140.Enabled() { + // X25519 is not FIPS-approved by itself, but it is when + // combined with MLKEM768. + return []tls.CurveID{ + tls.X25519MLKEM768, // post-quantum + tls.CurveP256, + tls.CurveP384, + tls.CurveP521, + } + } return []tls.CurveID{ tls.X25519, // faster than P256, arguably more secure tls.CurveP256, diff --git a/server/client.go b/server/client.go index bad079d345f..af0d022e61d 100644 --- a/server/client.go +++ b/server/client.go @@ -29,6 +29,7 @@ import ( "net/url" "regexp" "runtime" + "slices" "strconv" "strings" "sync" @@ -4257,7 +4258,7 @@ func (c *client) setupResponseServiceImport(acc *Account, si *serviceImport, tra // Will remove a header if present. func removeHeaderIfPresent(hdr []byte, key string) []byte { - start := bytes.Index(hdr, []byte(key)) + start := getHeaderKeyIndex(key, hdr) // key can't be first and we want to check that it is preceded by a '\n' if start < 1 || hdr[start-1] != '\n' { return hdr @@ -4375,22 +4376,13 @@ func sliceHeader(key string, hdr []byte) []byte { if len(hdr) == 0 { return nil } - index := bytes.Index(hdr, stringToBytes(key+":")) - hdrLen := len(hdr) - // Check that we have enough characters, this will handle the -1 case of the key not - // being found and will also handle not having enough characters for trailing CRLF. - if index < 2 { - return nil - } - // There should be a terminating CRLF. - if index >= hdrLen-1 || hdr[index-1] != '\n' || hdr[index-2] != '\r' { + index := getHeaderKeyIndex(key, hdr) + if index == -1 { return nil } - // The key should be immediately followed by a : separator. + // Skip over the key and the : separator. index += len(key) + 1 - if index >= hdrLen || hdr[index-1] != ':' { - return nil - } + hdrLen := len(hdr) // Skip over whitespace before the value. for index < hdrLen && hdr[index] == ' ' { index++ @@ -4406,6 +4398,65 @@ func sliceHeader(key string, hdr []byte) []byte { return hdr[start:index:index] } +// getHeaderKeyIndex returns an index into the header slice for the given key. +// Returns -1 if not found. +func getHeaderKeyIndex(key string, hdr []byte) int { + if len(hdr) == 0 { + return -1 + } + bkey := stringToBytes(key) + keyLen, hdrLen := len(key), len(hdr) + var offset int + for { + index := bytes.Index(hdr[offset:], bkey) + // Check that we have enough characters, this will handle the -1 case of the key not + // being found and will also handle not having enough characters for trailing CRLF. + if index < 2 { + return -1 + } + index += offset + // There should be a terminating CRLF. + if index >= hdrLen-1 || hdr[index-1] != '\n' || hdr[index-2] != '\r' { + offset = index + keyLen + continue + } + // The key should be immediately followed by a : separator. + if index+keyLen >= hdrLen { + return -1 + } + if hdr[index+keyLen] != ':' { + offset = index + keyLen + continue + } + return index + } +} + +func setHeader(key, val string, hdr []byte) []byte { + start := getHeaderKeyIndex(key, hdr) + if start >= 0 { + valStart := start + len(key) + 1 + // Preserve single whitespace if used. + hdrLen := len(hdr) + if valStart < hdrLen && hdr[valStart] == ' ' { + valStart++ + } + valEnd := bytes.Index(hdr[valStart:], []byte("\r")) + if valEnd < 0 { + return hdr // malformed headers + } + valEnd += valStart + suffix := slices.Clone(hdr[valEnd:]) + newHdr := append(hdr[:valStart], val...) + return append(newHdr, suffix...) + } + if len(hdr) > 0 && bytes.HasSuffix(hdr, []byte("\r\n")) { + hdr = hdr[:len(hdr)-2] + val += "\r\n" + } + return fmt.Appendf(hdr, "%s: %s\r\n", key, val) +} + // For bytes.HasPrefix below. var ( jsRequestNextPreB = []byte(jsRequestNextPre) diff --git a/server/client_test.go b/server/client_test.go index 8bd2e550167..e6eb614bd3e 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -3040,7 +3040,7 @@ func TestSliceHeader(t *testing.T) { require_True(t, bytes.Equal(sliced, copied)) } -func TestSliceHeaderOrdering(t *testing.T) { +func TestSliceHeaderOrderingPrefix(t *testing.T) { hdr := []byte("NATS/1.0\r\n\r\n") // These headers share the same prefix, the longer subject @@ -3060,6 +3060,105 @@ func TestSliceHeaderOrdering(t *testing.T) { require_True(t, bytes.Equal(sliced, copied)) } +func TestSliceHeaderOrderingSuffix(t *testing.T) { + hdr := []byte("NATS/1.0\r\n\r\n") + + // These headers share the same suffix, the longer subject + // must not invalidate the existence of the shorter one. + hdr = genHeader(hdr, "Previous-Nats-Msg-Id", "user") + hdr = genHeader(hdr, "Nats-Msg-Id", "control") + + sliced := sliceHeader("Nats-Msg-Id", hdr) + copied := getHeader("Nats-Msg-Id", hdr) + + require_NotNil(t, sliced) + require_NotNil(t, copied) + require_True(t, bytes.Equal(sliced, copied)) + require_Equal(t, string(copied), "control") +} + +func TestRemoveHeaderIfPresentOrderingPrefix(t *testing.T) { + hdr := []byte("NATS/1.0\r\n\r\n") + + // These headers share the same prefix, the longer subject + // must not invalidate the existence of the shorter one. + hdr = genHeader(hdr, JSExpectedLastSubjSeqSubj, "foo") + hdr = genHeader(hdr, JSExpectedLastSubjSeq, "24") + + hdr = removeHeaderIfPresent(hdr, JSExpectedLastSubjSeq) + ehdr := genHeader(nil, JSExpectedLastSubjSeqSubj, "foo") + require_True(t, bytes.Equal(hdr, ehdr)) +} + +func TestRemoveHeaderIfPresentOrderingSuffix(t *testing.T) { + hdr := []byte("NATS/1.0\r\n\r\n") + + // These headers share the same suffix, the longer subject + // must not invalidate the existence of the shorter one. + hdr = genHeader(hdr, "Previous-Nats-Msg-Id", "user") + hdr = genHeader(hdr, "Nats-Msg-Id", "control") + + hdr = removeHeaderIfPresent(hdr, "Nats-Msg-Id") + ehdr := genHeader(nil, "Previous-Nats-Msg-Id", "user") + require_True(t, bytes.Equal(hdr, ehdr)) +} + +func TestSetHeaderOrderingPrefix(t *testing.T) { + for _, space := range []bool{true, false} { + title := "Normal" + if !space { + title = "Trimmed" + } + t.Run(title, func(t *testing.T) { + hdr := []byte("NATS/1.0\r\n\r\n") + + // These headers share the same prefix, the longer subject + // must not invalidate the existence of the shorter one. + hdr = genHeader(hdr, JSExpectedLastSubjSeqSubj, "foo") + hdr = genHeader(hdr, JSExpectedLastSubjSeq, "24") + if !space { + hdr = bytes.ReplaceAll(hdr, []byte(" "), nil) + } + + hdr = setHeader(JSExpectedLastSubjSeq, "12", hdr) + ehdr := genHeader(nil, JSExpectedLastSubjSeqSubj, "foo") + ehdr = genHeader(ehdr, JSExpectedLastSubjSeq, "12") + if !space { + ehdr = bytes.ReplaceAll(ehdr, []byte(" "), nil) + } + require_True(t, bytes.Equal(hdr, ehdr)) + }) + } +} + +func TestSetHeaderOrderingSuffix(t *testing.T) { + for _, space := range []bool{true, false} { + title := "Normal" + if !space { + title = "Trimmed" + } + t.Run(title, func(t *testing.T) { + hdr := []byte("NATS/1.0\r\n\r\n") + + // These headers share the same suffix, the longer subject + // must not invalidate the existence of the shorter one. + hdr = genHeader(hdr, "Previous-Nats-Msg-Id", "user") + hdr = genHeader(hdr, "Nats-Msg-Id", "control") + if !space { + hdr = bytes.ReplaceAll(hdr, []byte(" "), nil) + } + + hdr = setHeader("Nats-Msg-Id", "other", hdr) + ehdr := genHeader(nil, "Previous-Nats-Msg-Id", "user") + ehdr = genHeader(ehdr, "Nats-Msg-Id", "other") + if !space { + ehdr = bytes.ReplaceAll(ehdr, []byte(" "), nil) + } + require_True(t, bytes.Equal(hdr, ehdr)) + }) + } +} + func TestInProcessAllowedConnectionType(t *testing.T) { tmpl := ` listen: "127.0.0.1:-1" diff --git a/server/consumer.go b/server/consumer.go index b59dfe77e60..9a06cf5eb6b 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4307,7 +4307,8 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) { var pre *waitingRequest for wr := wq.head; wr != nil; { // Check expiration. - if (eos && wr.noWait && wr.d > 0) || (!wr.expires.IsZero() && now.After(wr.expires)) { + expires := !wr.expires.IsZero() && now.After(wr.expires) + if (eos && wr.noWait) || expires { rdWait := o.replicateDeliveries() if rdWait { // Check if we need to send the timeout after pending replicated deliveries, or can do so immediately. @@ -4316,13 +4317,26 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) { } else { wd.pn, wd.pb = wr.n, wr.b } + // If we still need to wait for replicated deliveries, remove from waiting list. + if rdWait { + wr = remove(pre, wr) + continue + } } - if !rdWait { + // Normally it's a timeout. + if expires { hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b) o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + wr = remove(pre, wr) + continue + } else if wr.expires.IsZero() || wr.d > 0 { + // But if we're NoWait without expiry, we've reached the end of the stream, and we've not delivered any messages. + // Return no messages instead, which is the same as if we'd rejected the pull request initially. + hdr := fmt.Appendf(nil, "NATS/1.0 404 No Messages\r\n\r\n") + o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + wr = remove(pre, wr) + continue } - wr = remove(pre, wr) - continue } // Now check interest. interest := wr.acc.sl.HasInterest(wr.interest) diff --git a/server/events.go b/server/events.go index 2c522c6cdab..b0d76249244 100644 --- a/server/events.go +++ b/server/events.go @@ -1718,18 +1718,18 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su node := getHash(si.Name) accountNRG := si.AccountNRG() oldInfo, _ := s.nodeToInfo.Swap(node, nodeInfo{ - si.Name, - si.Version, - si.Cluster, - si.Domain, - si.ID, - si.Tags, - cfg, - stats, - false, - si.JetStreamEnabled(), - si.BinaryStreamSnapshot(), - accountNRG, + name: si.Name, + version: si.Version, + cluster: si.Cluster, + domain: si.Domain, + id: si.ID, + tags: si.Tags, + cfg: cfg, + stats: stats, + offline: false, + js: si.JetStreamEnabled(), + binarySnapshots: si.BinaryStreamSnapshot(), + accountNRG: accountNRG, }) if oldInfo == nil || accountNRG != oldInfo.(nodeInfo).accountNRG { // One of the servers we received statsz from changed its mind about @@ -1772,18 +1772,18 @@ func (s *Server) processNewServer(si *ServerInfo) { // Only update if non-existent if _, ok := s.nodeToInfo.Load(node); !ok { s.nodeToInfo.Store(node, nodeInfo{ - si.Name, - si.Version, - si.Cluster, - si.Domain, - si.ID, - si.Tags, - nil, - nil, - false, - si.JetStreamEnabled(), - si.BinaryStreamSnapshot(), - si.AccountNRG(), + name: si.Name, + version: si.Version, + cluster: si.Cluster, + domain: si.Domain, + id: si.ID, + tags: si.Tags, + cfg: nil, + stats: nil, + offline: false, + js: si.JetStreamEnabled(), + binarySnapshots: si.BinaryStreamSnapshot(), + accountNRG: si.AccountNRG(), }) } } diff --git a/server/filestore.go b/server/filestore.go index 7562839d2f0..3ac37f28f4f 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1445,14 +1445,31 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { // For tombstones that we find and collect. var ( tombstones []uint64 - minTombstoneSeq uint64 - minTombstoneTs int64 + maxTombstoneSeq uint64 + maxTombstoneTs int64 ) // To detect gaps from compaction, and to ensure the sequence keeps moving up. var last uint64 var hb [highwayhash.Size64]byte + updateLast := func(seq uint64, ts int64) { + // The sequence needs to only ever move up. + if seq <= last { + return + } + + // Check for any gaps from compaction, meaning no ebit entry. + if last > 0 && seq != last+1 && mb.msgs != 0 { + for dseq := last + 1; dseq < seq; dseq++ { + addToDmap(dseq) + } + } + last = seq + atomic.StoreUint64(&mb.last.seq, last) + mb.last.ts = ts + } + for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; { if index+msgHdrSize > lbuf { truncate(index) @@ -1504,8 +1521,8 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { seq = seq &^ tbit // Need to process this here and make sure we have accounted for this properly. tombstones = append(tombstones, seq) - if minTombstoneSeq == 0 || seq < minTombstoneSeq { - minTombstoneSeq, minTombstoneTs = seq, ts + if maxTombstoneSeq == 0 || seq > maxTombstoneSeq { + maxTombstoneSeq, maxTombstoneTs = seq, ts } index += rl continue @@ -1516,8 +1533,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { if seq == 0 || seq&ebit != 0 || seq < fseq { seq = seq &^ ebit if seq >= fseq { - atomic.StoreUint64(&mb.last.seq, seq) - mb.last.ts = ts + updateLast(seq, ts) if mb.msgs == 0 { atomic.StoreUint64(&mb.first.seq, seq+1) mb.first.ts = 0 @@ -1555,17 +1571,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { } } - // Check for any gaps from compaction, meaning no ebit entry. - if last > 0 && seq != last+1 { - for dseq := last + 1; dseq < seq; dseq++ { - addToDmap(dseq) - } - } - - // Always set last - last = seq - atomic.StoreUint64(&mb.last.seq, last) - mb.last.ts = ts + updateLast(seq, ts) // Advance to next record. index += rl @@ -1578,12 +1584,12 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { fseq := atomic.LoadUint64(&mb.first.seq) if fseq > 0 { atomic.StoreUint64(&mb.last.seq, fseq-1) - } else if fseq == 0 && minTombstoneSeq > 0 { - atomic.StoreUint64(&mb.first.seq, minTombstoneSeq+1) + } else if fseq == 0 && maxTombstoneSeq > 0 { + atomic.StoreUint64(&mb.first.seq, maxTombstoneSeq+1) mb.first.ts = 0 if mb.last.seq == 0 { - atomic.StoreUint64(&mb.last.seq, minTombstoneSeq) - mb.last.ts = minTombstoneTs + atomic.StoreUint64(&mb.last.seq, maxTombstoneSeq) + mb.last.ts = maxTombstoneTs } } } @@ -2114,6 +2120,12 @@ func (fs *fileStore) recoverMsgs() error { fs.removeMsgBlockFromList(mb) continue } + // If the stream is empty, reset the first/last sequences so these can + // properly move up based purely on tombstones spread over multiple blocks. + if fs.state.Msgs == 0 { + fs.state.FirstSeq, fs.state.LastSeq = 0, 0 + fs.state.FirstTime, fs.state.LastTime = time.Time{}, time.Time{} + } fseq := atomic.LoadUint64(&mb.first.seq) if fs.state.FirstSeq == 0 || (fseq < fs.state.FirstSeq && mb.first.ts != 0) { fs.state.FirstSeq = fseq @@ -2521,7 +2533,7 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm * if err != nil { continue } - expireOk := seq == lseq && mb.llseq == seq + expireOk := seq == lseq && mb.llseq != llseq && mb.llseq == seq updateLLTS = false // cacheLookup already updated it. if sl.HasInterest(fsm.subj) { return fsm, expireOk, nil @@ -2654,7 +2666,7 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor continue } updateLLTS = false // cacheLookup already updated it. - expireOk := seq == lseq && mb.llseq == seq + expireOk := seq == lseq && mb.llseq != llseq && mb.llseq == seq if isAll { return fsm, expireOk, nil } @@ -4300,19 +4312,27 @@ func (mb *msgBlock) skipMsg(seq uint64, now int64) { } // SkipMsg will use the next sequence number but not store anything. -func (fs *fileStore) SkipMsg() uint64 { +func (fs *fileStore) SkipMsg(seq uint64) (uint64, error) { + // Grab time. + now := ats.AccessTime() + fs.mu.Lock() defer fs.mu.Unlock() + // Check sequence matches our last sequence. + if seq != fs.state.LastSeq+1 { + if seq > 0 { + return 0, ErrSequenceMismatch + } + seq = fs.state.LastSeq + 1 + } + // Grab our current last message block. mb, err := fs.checkLastBlock(emptyRecordLen) if err != nil { - return 0 + return 0, err } - // Grab time and last seq. - now, seq := ats.AccessTime(), fs.state.LastSeq+1 - // Write skip msg. mb.skipMsg(seq, now) @@ -4327,7 +4347,7 @@ func (fs *fileStore) SkipMsg() uint64 { // Mark as dirty for stream state. fs.dirty++ - return seq + return seq, nil } // Skip multiple msgs. We will determine if we can fit into current lmb or we need to create a new block. @@ -4492,6 +4512,17 @@ func (fs *fileStore) enforceMsgLimit() { return } for nmsgs := fs.state.Msgs; nmsgs > uint64(fs.cfg.MaxMsgs); nmsgs = fs.state.Msgs { + // If the first block can be removed fully, purge it entirely without needing to walk sequences. + if len(fs.blks) > 0 { + fmb := fs.blks[0] + fmb.mu.RLock() + msgs := fmb.msgs + fmb.mu.RUnlock() + if nmsgs-msgs > uint64(fs.cfg.MaxMsgs) { + fs.purgeMsgBlock(fmb) + continue + } + } if removed, err := fs.deleteFirstMsg(); err != nil || !removed { fs.rebuildFirst() return @@ -4509,6 +4540,17 @@ func (fs *fileStore) enforceBytesLimit() { return } for bs := fs.state.Bytes; bs > uint64(fs.cfg.MaxBytes); bs = fs.state.Bytes { + // If the first block can be removed fully, purge it entirely without needing to walk sequences. + if len(fs.blks) > 0 { + fmb := fs.blks[0] + fmb.mu.RLock() + bytes := fmb.bytes + fmb.mu.RUnlock() + if bs-bytes > uint64(fs.cfg.MaxBytes) { + fs.purgeMsgBlock(fmb) + continue + } + } if removed, err := fs.deleteFirstMsg(); err != nil || !removed { fs.rebuildFirst() return @@ -4819,8 +4861,10 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( // If erase but block is empty, we can simply remove the block later. if secure && !isEmpty { // Grab record info. - ri, rl, _, _ := mb.slotInfo(int(seq - mb.cache.fseq)) - if err := mb.eraseMsg(seq, int(ri), int(rl), isLastBlock); err != nil { + ri, _, _, _ := mb.slotInfo(int(seq - mb.cache.fseq)) + if err := mb.eraseMsg(seq, int(ri), int(msz), isLastBlock); err != nil { + mb.mu.Unlock() + fsUnlock() return false, err } } @@ -5393,9 +5437,21 @@ func (mb *msgBlock) selectNextFirst() { } // Select the next FirstSeq +// Also cleans up empty blocks at the start only containing tombstones. // Lock should be held. func (fs *fileStore) selectNextFirst() { if len(fs.blks) > 0 { + for len(fs.blks) > 1 { + mb := fs.blks[0] + mb.mu.Lock() + empty := mb.msgs == 0 + if !empty { + mb.mu.Unlock() + break + } + fs.forceRemoveMsgBlock(mb) + mb.mu.Unlock() + } mb := fs.blks[0] mb.mu.RLock() fs.state.FirstSeq = atomic.LoadUint64(&mb.first.seq) @@ -8781,7 +8837,7 @@ func (fs *fileStore) Truncate(seq uint64) error { } mb.mu.Lock() } - fs.removeMsgBlock(mb) + fs.forceRemoveMsgBlock(mb) mb.mu.Unlock() } @@ -8803,7 +8859,7 @@ func (fs *fileStore) Truncate(seq uint64) error { } smb.mu.Lock() } - fs.removeMsgBlock(smb) + fs.forceRemoveMsgBlock(smb) smb.mu.Unlock() goto SKIP } @@ -8845,7 +8901,7 @@ SKIP: if !hasWrittenTombstones { fs.lmb = smb tmb.mu.Lock() - fs.removeMsgBlock(tmb) + fs.forceRemoveMsgBlock(tmb) tmb.mu.Unlock() } @@ -8925,8 +8981,8 @@ func (fs *fileStore) removeMsgBlockFromList(mb *msgBlock) { // Both locks should be held. func (fs *fileStore) removeMsgBlock(mb *msgBlock) { // Check for us being last message block + lseq, lts := atomic.LoadUint64(&mb.last.seq), mb.last.ts if mb == fs.lmb { - lseq, lts := atomic.LoadUint64(&mb.last.seq), mb.last.ts // Creating a new message write block requires that the lmb lock is not held. mb.mu.Unlock() // Write the tombstone to remember since this was last block. @@ -8934,12 +8990,40 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) { fs.writeTombstone(lseq, lts) } mb.mu.Lock() + } else if lseq == fs.state.LastSeq { + // Need to write a tombstone for the last sequence if we're removing the block containing it. + fs.writeTombstone(lseq, lts) } - // Only delete message block after (potentially) writing a new lmb. + // Only delete message block after (potentially) writing a tombstone. + fs.forceRemoveMsgBlock(mb) +} + +// Removes the msgBlock, without writing tombstones to ensure the last sequence is preserved. +// Both locks should be held. +func (fs *fileStore) forceRemoveMsgBlock(mb *msgBlock) { mb.dirtyCloseWithRemove(true) fs.removeMsgBlockFromList(mb) } +// Purges and removes the msgBlock from the store. +// Lock should be held. +func (fs *fileStore) purgeMsgBlock(mb *msgBlock) { + mb.mu.Lock() + // Update top level accounting. + msgs, bytes := mb.msgs, mb.bytes + if msgs > fs.state.Msgs { + msgs = fs.state.Msgs + } + if bytes > fs.state.Bytes { + bytes = fs.state.Bytes + } + fs.state.Msgs -= msgs + fs.state.Bytes -= bytes + fs.removeMsgBlock(mb) + mb.mu.Unlock() + fs.selectNextFirst() +} + // Called by purge to simply get rid of the cache and close our fds. // Lock should not be held. func (mb *msgBlock) dirtyClose() { @@ -9675,11 +9759,7 @@ func (fs *fileStore) writeTTLState() error { buf := fs.ttls.Encode(fs.state.LastSeq + 1) fs.mu.RUnlock() - <-dios - err := os.WriteFile(fn, buf, defaultFilePerms) - dios <- struct{}{} - - return err + return fs.writeFileWithOptionalSync(fn, buf, defaultFilePerms) } // Stop the current filestore. @@ -11349,29 +11429,47 @@ func (alg StoreCompression) Decompress(buf []byte) ([]byte, error) { // sets O_SYNC on the open file if SyncAlways is set. The dios semaphore is // handled automatically by this function, so don't wrap calls to it in dios. func (fs *fileStore) writeFileWithOptionalSync(name string, data []byte, perm fs.FileMode) error { - if fs.fcfg.SyncAlways { - return writeFileWithSync(name, data, perm) - } - <-dios - defer func() { - dios <- struct{}{} - }() - return os.WriteFile(name, data, perm) + return writeAtomically(name, data, perm, fs.fcfg.SyncAlways) } func writeFileWithSync(name string, data []byte, perm fs.FileMode) error { + return writeAtomically(name, data, perm, true) +} + +func writeAtomically(name string, data []byte, perm fs.FileMode, sync bool) error { + tmp := name + ".tmp" + flags := os.O_CREATE | os.O_WRONLY | os.O_TRUNC + if sync { + flags = flags | os.O_SYNC + } <-dios defer func() { dios <- struct{}{} }() - flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC | os.O_SYNC - f, err := os.OpenFile(name, flags, perm) + f, err := os.OpenFile(tmp, flags, perm) if err != nil { return err } - if _, err = f.Write(data); err != nil { + if _, err := f.Write(data); err != nil { _ = f.Close() + _ = os.Remove(tmp) + return err + } + if err := f.Close(); err != nil { + _ = os.Remove(tmp) return err } - return f.Close() + if err := os.Rename(tmp, name); err != nil { + _ = os.Remove(tmp) + return err + } + if sync { + // To ensure that the file rename was persisted on all filesystems, + // also try to flush the directory metadata. + if d, err := os.Open(filepath.Dir(name)); err == nil { + _ = d.Sync() + _ = d.Close() + } + } + return nil } diff --git a/server/filestore_test.go b/server/filestore_test.go index a329f47ddab..ee290c5250b 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -344,7 +344,7 @@ func TestFileStoreSkipMsg(t *testing.T) { numSkips := 10 for i := 0; i < numSkips; i++ { - fs.SkipMsg() + fs.SkipMsg(0) } state := fs.State() if state.Msgs != 0 { @@ -355,10 +355,10 @@ func TestFileStoreSkipMsg(t *testing.T) { } fs.StoreMsg("zzz", nil, []byte("Hello World!"), 0) - fs.SkipMsg() - fs.SkipMsg() + fs.SkipMsg(0) + fs.SkipMsg(0) fs.StoreMsg("zzz", nil, []byte("Hello World!"), 0) - fs.SkipMsg() + fs.SkipMsg(0) state = fs.State() if state.Msgs != 2 { @@ -392,7 +392,7 @@ func TestFileStoreSkipMsg(t *testing.T) { t.Fatalf("Message did not match") } - fs.SkipMsg() + fs.SkipMsg(0) nseq, _, err := fs.StoreMsg("AAA", nil, []byte("Skip?"), 0) if err != nil { t.Fatalf("Unexpected error looking up seq 11: %v", err) @@ -4171,6 +4171,7 @@ func TestFileStoreEncrypted(t *testing.T) { err = o.Update(state) require_NoError(t, err) + o.Stop() fs.Stop() fs, err = newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, created, prf(&fcfg), nil) require_NoError(t, err) @@ -4971,36 +4972,41 @@ func TestFileStoreSubjectsTotals(t *testing.T) { func TestFileStoreConsumerStoreEncodeAfterRestart(t *testing.T) { testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { - fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil) - require_NoError(t, err) - defer fs.Stop() + state := &ConsumerState{} - o, err := fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit}) - require_NoError(t, err) + func() { // for defers + fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() - state := &ConsumerState{} - state.Delivered.Consumer = 22 - state.Delivered.Stream = 22 - state.AckFloor.Consumer = 11 - state.AckFloor.Stream = 11 - err = o.Update(state) - require_NoError(t, err) + o, err := fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit}) + require_NoError(t, err) + defer o.Stop() - fs.Stop() + state.Delivered.Consumer = 22 + state.Delivered.Stream = 22 + state.AckFloor.Consumer = 11 + state.AckFloor.Stream = 11 + err = o.Update(state) + require_NoError(t, err) + }() - fs, err = newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil) - require_NoError(t, err) - defer fs.Stop() + func() { // for defers + fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() - o, err = fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit}) - require_NoError(t, err) + o, err := fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit}) + require_NoError(t, err) + defer o.Stop() - if o.(*consumerFileStore).state.Delivered != state.Delivered { - t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state) - } - if o.(*consumerFileStore).state.AckFloor != state.AckFloor { - t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state) - } + if o.(*consumerFileStore).state.Delivered != state.Delivered { + t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state) + } + if o.(*consumerFileStore).state.AckFloor != state.AckFloor { + t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state) + } + }() }) } @@ -5063,7 +5069,7 @@ func TestFileStoreSkipMsgAndNumBlocks(t *testing.T) { fs.StoreMsg(subj, nil, msg, 0) for i := 0; i < numMsgs; i++ { - fs.SkipMsg() + fs.SkipMsg(0) } fs.StoreMsg(subj, nil, msg, 0) require_Equal(t, fs.numMsgBlocks(), 3) @@ -6807,7 +6813,7 @@ func TestFileStoreEraseMsgWithDbitSlots(t *testing.T) { fs.StoreMsg("foo", nil, []byte("abd"), 0) for i := 0; i < 10; i++ { - fs.SkipMsg() + fs.SkipMsg(0) } fs.StoreMsg("foo", nil, []byte("abd"), 0) // Now grab that first block and compact away the skips which will @@ -6836,7 +6842,7 @@ func TestFileStoreEraseMsgWithAllTrailingDbitSlots(t *testing.T) { fs.StoreMsg("foo", nil, []byte("abcdefg"), 0) for i := 0; i < 10; i++ { - fs.SkipMsg() + fs.SkipMsg(0) } // Now grab that first block and compact away the skips which will // introduce dbits into our idx. @@ -7183,7 +7189,7 @@ func TestFileStoreReloadAndLoseLastSequence(t *testing.T) { defer fs.Stop() for i := 0; i < 22; i++ { - fs.SkipMsg() + fs.SkipMsg(0) } // Restart 5 times. @@ -9030,7 +9036,7 @@ func TestFileStoreLeftoverSkipMsgInDmap(t *testing.T) { } // Only skip a message. - fs.SkipMsg() + fs.SkipMsg(0) // Confirm state. state := fs.State() @@ -10305,3 +10311,388 @@ func BenchmarkFileStoreGetSeqFromTime(b *testing.B) { } }) } + +func TestFileStoreEraseMsgDoesNotLoseTombstones(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + secret := []byte("secret!") + // The first message will remain throughout. + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + // The second message wil be removed, so a tombstone will be placed. + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + // The third message is secret and will be erased. + _, _, err = fs.StoreMsg("foo", nil, secret, 0) + require_NoError(t, err) + + // Removing the second message places a tombstone. + _, err = fs.RemoveMsg(2) + require_NoError(t, err) + + // A fourth message gets placed after the tombstone. + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + + // Now we erase the third message. + // This erases this message and should not lose the tombstone that comes after it. + _, err = fs.EraseMsg(3) + require_NoError(t, err) + + before := fs.State() + require_Equal(t, before.Msgs, 2) + require_Equal(t, before.FirstSeq, 1) + require_Equal(t, before.LastSeq, 4) + require_True(t, slices.Equal(before.Deleted, []uint64{2, 3})) + + _, err = fs.LoadMsg(2, nil) + require_Error(t, err, errDeletedMsg) + _, err = fs.LoadMsg(3, nil) + require_Error(t, err, errDeletedMsg) + + // Make sure we can recover properly with no index.db present. + fs.Stop() + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) + + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + if state := fs.State(); !reflect.DeepEqual(state, before) { + t.Fatalf("Expected state\n of %+v, \ngot %+v without index.db state", before, state) + } + + _, err = fs.LoadMsg(2, nil) + require_Error(t, err, errDeletedMsg) + _, err = fs.LoadMsg(3, nil) + require_Error(t, err, errDeletedMsg) + }) +} + +func TestFileStoreTombstonesNoFirstSeqRollback(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fcfg.BlockSize = 10 * 33 // 10 messages per block. + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + for i := 0; i < 20; i++ { + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + } + + before := fs.State() + require_Equal(t, before.Msgs, 20) + require_Equal(t, before.FirstSeq, 1) + require_Equal(t, before.LastSeq, 20) + + // Expect 2 blocks with messages. + fs.mu.RLock() + lblks := len(fs.blks) + fs.mu.RUnlock() + require_Equal(t, lblks, 2) + + // Write some tombstones for all messages, these will be in multiple blocks. + for seq := uint64(1); seq <= 20; seq++ { + _, err = fs.RemoveMsg(seq) + require_NoError(t, err) + } + + before = fs.State() + require_Equal(t, before.Msgs, 0) + require_Equal(t, before.FirstSeq, 21) + require_Equal(t, before.LastSeq, 20) + + // Expect 1 block purely with tombstones. + fs.mu.RLock() + lblks = len(fs.blks) + fs.mu.RUnlock() + require_Equal(t, lblks, 1) + + // Make sure we can recover properly with no index.db present. + fs.Stop() + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) + + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + if state := fs.State(); !reflect.DeepEqual(state, before) { + t.Fatalf("Expected state\n of %+v, \ngot %+v without index.db state", before, state) + } + }) +} + +func TestFileStoreTombstonesSelectNextFirstCleanup(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fcfg.BlockSize = 10 * 33 // 10 messages per block. + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + // Write a bunch of messages in multiple blocks. + for i := 0; i < 50; i++ { + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + } + + for seq := uint64(2); seq <= 49; seq++ { + _, err = fs.RemoveMsg(seq) + require_NoError(t, err) + } + + _, err = fs.newMsgBlockForWrite() + require_NoError(t, err) + for i := 0; i < 50; i++ { + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + } + + for seq := uint64(50); seq <= 100; seq++ { + _, err = fs.RemoveMsg(seq) + require_NoError(t, err) + } + + before := fs.State() + require_Equal(t, before.Msgs, 1) + require_Equal(t, before.FirstSeq, 1) + require_Equal(t, before.LastSeq, 100) + + _, err = fs.RemoveMsg(1) + require_NoError(t, err) + + before = fs.State() + require_Equal(t, before.Msgs, 0) + require_Equal(t, before.FirstSeq, 101) + require_Equal(t, before.LastSeq, 100) + + // Make sure we can recover properly with no index.db present. + fs.Stop() + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) + + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + if state := fs.State(); !reflect.DeepEqual(state, before) { + t.Fatalf("Expected state\n of %+v, \ngot %+v without index.db state", before, state) + } + }) +} + +func TestFileStoreTombstonesSelectNextFirstCleanupOnRecovery(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fcfg.BlockSize = 10 * 33 // 10 messages per block. + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + // Write a bunch of messages in multiple blocks. + for i := 0; i < 50; i++ { + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + } + + for seq := uint64(2); seq <= 49; seq++ { + _, err = fs.RemoveMsg(seq) + require_NoError(t, err) + } + + _, err = fs.newMsgBlockForWrite() + require_NoError(t, err) + for i := 0; i < 50; i++ { + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + } + + for seq := uint64(50); seq <= 100; seq++ { + _, err = fs.RemoveMsg(seq) + require_NoError(t, err) + } + + before := fs.State() + require_Equal(t, before.Msgs, 1) + require_Equal(t, before.FirstSeq, 1) + require_Equal(t, before.LastSeq, 100) + + // Explicitly write tombstone instead of calling fs.RemoveMsg, + // so we need to recover from a hard kill. + require_NoError(t, fs.writeTombstone(1, 0)) + before = StreamState{FirstSeq: 101, FirstTime: time.Time{}, LastSeq: 100, LastTime: before.LastTime} + + // Make sure we can recover properly with no index.db present. + fs.Stop() + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) + + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + if state := fs.State(); !reflect.DeepEqual(state, before) { + t.Fatalf("Expected state\n of %+v, \ngot %+v without index.db state", before, state) + } + }) +} + +func TestFileStoreDetectDeleteGapWithLastSkipMsg(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + + // Skip a message at a sequence such that a gap is created. + // The gap should be detected later on as deleted messages. + require_NoError(t, fs.SkipMsgs(2, 3)) + + // We should have 3 deletes, one is the skip msg, the other two is the gap. + before := fs.State() + require_Equal(t, before.Msgs, 1) + require_Equal(t, before.FirstSeq, 1) + require_Equal(t, before.LastSeq, 4) + require_Equal(t, before.NumDeleted, 3) + + // Make sure we can recover properly with no index.db present. + fs.Stop() + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) + + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + if state := fs.State(); !reflect.DeepEqual(state, before) { + t.Fatalf("Expected state of %+v, got %+v", before, state) + } + + mb := fs.getFirstBlock() + mb.mu.RLock() + defer mb.mu.RUnlock() + require_Equal(t, atomic.LoadUint64(&mb.first.seq), 1) + require_Equal(t, atomic.LoadUint64(&mb.last.seq), 4) + require_Len(t, mb.dmap.Size(), 3) + }) +} + +func TestFileStoreDetectDeleteGapWithOnlySkipMsg(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + // Skip messages with a gap. + _, err = fs.SkipMsg(1) + require_NoError(t, err) + require_NoError(t, fs.SkipMsgs(2, 3)) + + // We should have no deletes, as the SkipMsgs only move the sequences up. + before := fs.State() + require_Equal(t, before.Msgs, 0) + require_Equal(t, before.FirstSeq, 5) + require_Equal(t, before.LastSeq, 4) + require_Equal(t, before.NumDeleted, 0) + + // Make sure we can recover properly with no index.db present. + fs.Stop() + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) + + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + if state := fs.State(); !reflect.DeepEqual(state, before) { + t.Fatalf("Expected state of %+v, got %+v", before, state) + } + + // The block should not register the deletes between the two SkipMsgs. + mb := fs.getFirstBlock() + mb.mu.RLock() + defer mb.mu.RUnlock() + require_Equal(t, atomic.LoadUint64(&mb.first.seq), 5) + require_Equal(t, atomic.LoadUint64(&mb.last.seq), 4) + require_Len(t, mb.dmap.Size(), 0) + }) +} + +func TestFileStoreEraseMsgErr(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + + mb := fs.getFirstBlock() + mb.mu.Lock() + if mb.cache == nil { + mb.mu.Unlock() + t.Fatal("Expected cache to be initialized") + } + // Set to a bogus value such that the file rename fails while performing the message erase. + mb.mfn = _EMPTY_ + mb.mu.Unlock() + fs.EraseMsg(2) + }) +} + +func TestFileStorePurgeMsgBlock(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fcfg.BlockSize = 10 * 33 + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + for range 20 { + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + } + + fs.mu.RLock() + blks := len(fs.blks) + fs.mu.RUnlock() + require_Equal(t, blks, 2) + + state := fs.State() + require_Equal(t, state.FirstSeq, 1) + require_Equal(t, state.LastSeq, 20) + require_Equal(t, state.Msgs, 20) + require_Equal(t, state.Bytes, 20*33) + + // Purging the block should both remove the block and do the accounting. + fmb := fs.getFirstBlock() + fs.mu.Lock() + fs.purgeMsgBlock(fmb) + blks = len(fs.blks) + fs.mu.Unlock() + + require_Equal(t, blks, 1) + state = fs.State() + require_Equal(t, state.FirstSeq, 11) + require_Equal(t, state.LastSeq, 20) + require_Equal(t, state.Msgs, 10) + require_Equal(t, state.Bytes, 10*33) + }) +} diff --git a/server/gateway.go b/server/gateway.go index 962858f99b0..550d1cc3dcd 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -2551,11 +2551,18 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr return false } + // Copy off original pa in case it changes. + pa := c.pa + mt, _ := c.isMsgTraceEnabled() if mt != nil { - pa := c.pa + // We are going to replace "pa" with our copy of c.pa, but to restore + // to the original copy of c.pa, we need to save it again. + cpa := c.pa msg = mt.setOriginAccountHeaderIfNeeded(c, acc, msg) - defer func() { c.pa = pa }() + defer func() { c.pa = cpa }() + // Update pa with our current c.pa state. + pa = c.pa } var ( @@ -2569,6 +2576,7 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr didDeliver bool prodIsMQTT = c.isMqtt() dlvMsgs int64 + dlvExtraSz int64 ) // Get a subscription from the pool @@ -2666,8 +2674,11 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr } } + // Assume original message + dmsg := msg if mt != nil { - msg = mt.setHopHeader(c, msg) + // If trace is enabled, we need to set the hop header per gateway. + dmsg = mt.setHopHeader(c, dmsg) } // Setup the message header. @@ -2717,16 +2728,22 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr sub.nm, sub.max = 0, 0 sub.client = gwc sub.subject = subject - if c.deliverMsg(prodIsMQTT, sub, acc, subject, mreply, mh, msg, false) { + if c.deliverMsg(prodIsMQTT, sub, acc, subject, mreply, mh, dmsg, false) { // We don't count internal deliveries so count only if sub.icb is nil if sub.icb == nil { dlvMsgs++ + dlvExtraSz += int64(len(dmsg) - len(msg)) } didDeliver = true } + + // If we set the header reset the origin pub args. + if mt != nil { + c.pa = pa + } } if dlvMsgs > 0 { - totalBytes := dlvMsgs * int64(len(msg)) + totalBytes := dlvMsgs*int64(len(msg)) + dlvExtraSz // For non MQTT producers, remove the CR_LF * number of messages if !prodIsMQTT { totalBytes -= dlvMsgs * int64(LEN_CR_LF) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 5787cdcd9ba..15bacb61da0 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2562,15 +2562,16 @@ func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Ac } js, cc := s.getJetStreamCluster() - if js == nil || cc == nil || cc.meta == nil { + if js == nil || cc == nil { return } - // Extra checks here but only leader is listening. js.mu.RLock() isLeader := cc.isLeader() + meta := cc.meta js.mu.RUnlock() + // Extra checks here but only leader is listening. if !isLeader { return } @@ -2592,7 +2593,7 @@ func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Ac var found string js.mu.RLock() - for _, p := range cc.meta.Peers() { + for _, p := range meta.Peers() { // If Peer is specified, it takes precedence if req.Peer != _EMPTY_ { if p.ID == req.Peer { @@ -2617,7 +2618,7 @@ func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Ac // So we have a valid peer. js.mu.Lock() - cc.meta.ProposeRemovePeer(found) + meta.ProposeRemovePeer(found) js.mu.Unlock() resp.Success = true @@ -2668,7 +2669,7 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ } js, cc := s.getJetStreamCluster() - if js == nil || cc == nil || cc.meta == nil { + if js == nil || cc == nil { return } @@ -2827,7 +2828,7 @@ func (s *Server) jsLeaderServerStreamCancelMoveRequest(sub *subscription, c *cli } js, cc := s.getJetStreamCluster() - if js == nil || cc == nil || cc.meta == nil { + if js == nil || cc == nil { return } @@ -2977,7 +2978,13 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac } _, cc := s.getJetStreamCluster() - if cc == nil || cc.meta == nil || !cc.isLeader() { + + js.mu.RLock() + isLeader := cc.isLeader() + meta := cc.meta + js.mu.RUnlock() + + if !isLeader { return } @@ -2995,11 +3002,11 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac for _, oca := range osa.consumers { oca.deleted = true ca := &consumerAssignment{Group: oca.Group, Stream: oca.Stream, Name: oca.Name, Config: oca.Config, Subject: subject, Client: oca.Client} - cc.meta.Propose(encodeDeleteConsumerAssignment(ca)) + meta.Propose(encodeDeleteConsumerAssignment(ca)) nc++ } sa := &streamAssignment{Group: osa.Group, Config: osa.Config, Subject: subject, Client: osa.Client} - cc.meta.Propose(encodeDeleteStreamAssignment(sa)) + meta.Propose(encodeDeleteStreamAssignment(sa)) ns++ } js.mu.RUnlock() @@ -3030,13 +3037,14 @@ func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Accoun } js, cc := s.getJetStreamCluster() - if js == nil || cc == nil || cc.meta == nil { + if js == nil || cc == nil { return } // Extra checks here but only leader is listening. js.mu.RLock() isLeader := cc.isLeader() + meta := cc.meta js.mu.RUnlock() if !isLeader { @@ -3053,14 +3061,14 @@ func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Accoun s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } - if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(cc.meta, req.Placement); resp.Error != nil { + if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(meta, req.Placement); resp.Error != nil { s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } } // Call actual stepdown. - err = cc.meta.StepDown(preferredLeader) + err = meta.StepDown(preferredLeader) if err != nil { resp.Error = NewJSRaftGeneralError(err, Unless(err)) } else { @@ -4788,6 +4796,10 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, meta := cc.meta js.mu.RUnlock() + if meta == nil { + return + } + // Since these could wait on the Raft group lock, don't do so under the JS lock. ourID := meta.ID() groupLeaderless := meta.Leaderless() @@ -5104,6 +5116,7 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account nca := *ca ncfg := *ca.Config nca.Config = &ncfg + meta := cc.meta js.mu.RUnlock() pauseUTC := req.PauseUntil.UTC() if !pauseUTC.IsZero() { @@ -5117,7 +5130,7 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account setStaticConsumerMetadata(nca.Config) eca := encodeAddConsumerAssignment(&nca) - cc.meta.Propose(eca) + meta.Propose(eca) resp.PauseUntil = pauseUTC if resp.Paused = time.Now().Before(pauseUTC); resp.Paused { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index cba2215b200..64c492965b8 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -880,6 +880,9 @@ func (js *jetStream) setupMetaGroup() error { } if cfg.Observer { s.Noticef("Turning JetStream metadata controller Observer Mode on") + s.Noticef("In cases where the JetStream domain is not intended to be extended through a SYS account leaf node connection") + s.Noticef("and waiting for leader election until first contact is not acceptable,") + s.Noticef(`manually disable Observer Mode by setting the JetStream Option "extension_hint: %s"`, jsNoExtend) } } else { s.Noticef("JetStream cluster recovering state") @@ -893,7 +896,7 @@ func (js *jetStream) setupMetaGroup() error { cfg.Observer = false case extUndetermined: s.Noticef("Turning JetStream metadata controller Observer Mode on - no previous contact") - s.Noticef("In cases where JetStream will not be extended") + s.Noticef("In cases where the JetStream domain is not intended to be extended through a SYS account leaf node connection") s.Noticef("and waiting for leader election until first contact is not acceptable,") s.Noticef(`manually disable Observer Mode by setting the JetStream Option "extension_hint: %s"`, jsNoExtend) } @@ -1325,13 +1328,17 @@ func (js *jetStream) monitorCluster() { js.setMetaRecovering() // Snapshotting function. - doSnapshot := func() { + doSnapshot := func(force bool) { // Suppress during recovery. if js.isMetaRecovering() { return } - // For the meta layer we want to snapshot when asked if we need one or have any entries that we can compact. - if ne, _ := n.Size(); ne > 0 || n.NeedSnapshot() { + // Look up what the threshold is for compaction. Re-reading from config here as it is reloadable. + js.srv.optsMu.RLock() + thresh := js.srv.opts.JetStreamMetaCompact + js.srv.optsMu.RUnlock() + // For the meta layer we want to snapshot when over the above threshold (which could be 0 by default). + if ne, _ := n.Size(); force || ne > thresh || n.NeedSnapshot() { snap, err := js.metaSnapshot() if err != nil { s.Warnf("Error generating JetStream cluster snapshot: %v", err) @@ -1360,15 +1367,15 @@ func (js *jetStream) monitorCluster() { select { case <-s.quitCh: // Server shutting down, but we might receive this before qch, so try to snapshot. - doSnapshot() + doSnapshot(false) return case <-rqch: // Clean signal from shutdown routine so do best effort attempt to snapshot meta layer. - doSnapshot() + doSnapshot(false) return case <-qch: // Clean signal from shutdown routine so do best effort attempt to snapshot meta layer. - doSnapshot() + doSnapshot(false) // Return the signal back since shutdown will be waiting. close(qch) return @@ -1404,6 +1411,8 @@ func (js *jetStream) monitorCluster() { // Clear. ru = nil s.Debugf("Recovered JetStream cluster metadata") + // Snapshot now so we start with freshly compacted log. + doSnapshot(true) oc = time.AfterFunc(30*time.Second, js.checkForOrphans) // Do a health check here as well. go checkHealth() @@ -1416,9 +1425,9 @@ func (js *jetStream) monitorCluster() { _, nb = n.Applied(ce.Index) } if js.hasPeerEntries(ce.Entries) || (didSnap && !isLeader) { - doSnapshot() + doSnapshot(true) } else if nb > compactSizeMin && time.Since(lastSnapTime) > minSnapDelta { - doSnapshot() + doSnapshot(false) } } else { s.Warnf("Error applying JetStream cluster entries: %v", err) @@ -1434,11 +1443,11 @@ func (js *jetStream) monitorCluster() { s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil) // Install a snapshot as we become leader. js.checkClusterSize() - doSnapshot() + doSnapshot(false) } case <-t.C: - doSnapshot() + doSnapshot(false) // Periodically check the cluster size. if n.Leader() { js.checkClusterSize() @@ -3149,7 +3158,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco // Messages to be skipped have no subject or timestamp or msg or hdr. if subject == _EMPTY_ && ts == 0 && len(msg) == 0 && len(hdr) == 0 { // Skip and update our lseq. - last := mset.store.SkipMsg() + last, _ := mset.store.SkipMsg(0) mset.mu.Lock() mset.setLastSeq(last) mset.clearAllPreAcks(last) @@ -9165,7 +9174,7 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) { // Messages to be skipped have no subject or timestamp. // TODO(dlc) - formalize with skipMsgOp if subj == _EMPTY_ && ts == 0 { - if lseq := mset.store.SkipMsg(); lseq != seq { + if _, err = mset.store.SkipMsg(seq); err != nil { return 0, errCatchupWrongSeqForSkip } } else if err := mset.store.StoreRawMsg(subj, hdr, msg, seq, ts, ttl); err != nil { diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 1ef64219bf9..a7ce9fea6f1 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -9801,6 +9801,123 @@ func TestJetStreamClusterDeleteMsgEOF(t *testing.T) { } } +func TestJetStreamClusterCatchupSkipMsgDesync(t *testing.T) { + for _, storage := range []nats.StorageType{nats.FileStorage, nats.MemoryStorage} { + t.Run(storage.String(), func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: storage, + Replicas: 3, + }) + require_NoError(t, err) + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + return checkState(t, c, globalAccountName, "TEST") + }) + + rs := c.randomNonStreamLeader(globalAccountName, "TEST") + mset, err := rs.globalAccount().lookupStream("TEST") + require_NoError(t, err) + sa := mset.streamAssignment() + + // Make sure this server can't become the leader. + n := mset.raftNode().(*raft) + n.SetObserver(true) + + sysNc, err := nats.Connect(rs.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + require_NoError(t, err) + defer sysNc.Close() + + sjs := rs.getJetStream() + sjs.mu.RLock() + syncSubj := sa.Sync + sjs.mu.RUnlock() + + // Respond to the catchup with an out-of-order SkipMsg. + var eof bool + sub, err := sysNc.Subscribe(syncSubj, func(msg *nats.Msg) { + if !eof { + msg.Respond(encodeStreamMsg(_EMPTY_, _EMPTY_, nil, nil, 10, 0, false)) + eof = true + } + msg.Respond(nil) + }) + require_NoError(t, err) + defer sub.Drain() + require_NoError(t, sysNc.Flush()) // Must flush, otherwise our subscription could be too late. + + err = mset.processSnapshot(&StreamReplicatedState{FirstSeq: 1, LastSeq: 1}, 1) + require_Error(t, err, errCatchupTooManyRetries) + c.waitOnStreamLeader(globalAccountName, "TEST") + + pubAck, err := js.Publish("foo", nil) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, 1) + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + return checkState(t, c, globalAccountName, "TEST") + }) + }) + } +} + +func TestJetStreamClusterJszRaftLeaderReporting(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "DURABLE", + Replicas: 3, + }) + require_NoError(t, err) + + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + for _, s := range c.servers { + _, _, jsa := s.globalAccount().getJetStreamFromAccount() + if !jsa.streamAssigned("TEST") { + return fmt.Errorf("stream not assigned on %s", s.Name()) + } + if !jsa.consumerAssigned("TEST", "DURABLE") { + return fmt.Errorf("consumer not assigned on %s", s.Name()) + } + } + return nil + }) + + sl := c.streamLeader(globalAccountName, "TEST") + cl := c.consumerLeader(globalAccountName, "TEST", "DURABLE") + + for _, s := range c.servers { + jsi, err := s.Jsz(&JSzOptions{RaftGroups: true}) + require_NoError(t, err) + if s == sl { + require_Equal(t, jsi.StreamsLeader, 1) + } else { + require_Equal(t, jsi.StreamsLeader, 0) + } + if s == cl { + require_Equal(t, jsi.ConsumersLeader, 1) + } else { + require_Equal(t, jsi.ConsumersLeader, 0) + } + } +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 67be2dfc11b..1ac2742d5df 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -6823,3 +6823,56 @@ func TestJetStreamClusterAccountMaxConnectionsReconnect(t *testing.T) { return nil }) } + +func TestJetStreamClusterMetaCompactThreshold(t *testing.T) { + for _, thres := range []uint64{0, 5, 10} { + t.Run(fmt.Sprintf("%d", thres), func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R1TEST", 3) + defer c.shutdown() + for _, s := range c.servers { + s.optsMu.Lock() + s.opts.JetStreamMetaCompact = thres + s.optsMu.Unlock() + } + + nc, _ := jsClientConnect(t, c.servers[0]) + defer nc.Close() + + leader := c.leader() + _, cc := leader.getJetStreamCluster() + rg := cc.meta.(*raft) + + for i := range uint64(15) { + rg.RLock() + papplied := rg.papplied + rg.RUnlock() + + jsStreamCreate(t, nc, &StreamConfig{ + Name: fmt.Sprintf("test_%d", i), + Subjects: []string{fmt.Sprintf("test.%d", i)}, + Storage: MemoryStorage, + }) + + // Kicking the leader change channel is the easiest way to + // trick monitorCluster() into calling doSnapshot(). + entries, _ := cc.meta.Size() + cc.meta.(*raft).leadc <- true + + // Should we have compacted on this iteration? + if entries > thres { + checkFor(t, time.Second, 5*time.Millisecond, func() error { + rg.RLock() + npapplied := rg.papplied + rg.RUnlock() + if npapplied <= papplied { + return fmt.Errorf("haven't snapshotted yet (%d <= %d)", npapplied, papplied) + } + return nil + }) + entries, _ = cc.meta.Size() + require_Equal(t, entries, 0) + } + } + }) + } +} diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index c84ffbe72be..992cf5694c1 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -9966,3 +9966,86 @@ func TestJetStreamConsumerMaxDeliverUnderflow(t *testing.T) { o.mu.RUnlock() require_Equal(t, maxdc, 0) } + +// https://github.com/nats-io/nats-server/issues/7457 +func TestJetStreamConsumerNoWaitNoMessagesOnEos(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}}) + require_NoError(t, err) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"}) + require_NoError(t, err) + + sub, err := nc.SubscribeSync("reply") + require_NoError(t, err) + defer sub.Drain() + require_NoError(t, nc.Flush()) + + mset, err := s.globalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + // Fiddle with the pending count such that the NoWait request will go through, + // and the "404 No Messages" will be sent when hitting the end of the stream. + o.mu.Lock() + o.npc++ + o.mu.Unlock() + + req := &JSApiConsumerGetNextRequest{NoWait: true} + jreq, err := json.Marshal(req) + require_NoError(t, err) + o.processNextMsgRequest("reply", jreq) + + msg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, msg.Header.Get("Status"), "404") + require_Equal(t, msg.Header.Get("Description"), "No Messages") +} + +// https://github.com/nats-io/nats-server/issues/5373 +func TestJetStreamConsumerNoWaitNoMessagesOnEosWithDeliveredMsgs(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}}) + require_NoError(t, err) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"}) + require_NoError(t, err) + + _, err = js.Publish("foo", []byte("msg")) + require_NoError(t, err) + + sub, err := nc.SubscribeSync("reply") + require_NoError(t, err) + defer sub.Drain() + require_NoError(t, nc.Flush()) + + mset, err := s.globalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + req := &JSApiConsumerGetNextRequest{NoWait: true, Batch: 2} + jreq, err := json.Marshal(req) + require_NoError(t, err) + o.processNextMsgRequest("reply", jreq) + + msg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, msg.Subject, "foo") + require_Equal(t, string(msg.Data), "msg") + + // We requested two messages but the stream only contained 1. + msg, err = sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, msg.Header.Get("Status"), "404") + require_Equal(t, msg.Header.Get("Description"), "No Messages") +} diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 5ff9109f7a5..5050dc1bf4b 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -20844,3 +20844,44 @@ func TestJetStreamMessageTTLNotExpiring(t *testing.T) { }) } } + +func TestJetStreamReloadMetaCompact(t *testing.T) { + storeDir := t.TempDir() + + conf := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + jetstream: { + max_mem_store: 2MB + max_file_store: 8MB + store_dir: '%s' + } + `, storeDir))) + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + require_Equal(t, s.getOpts().JetStreamMetaCompact, 0) + + reloadUpdateConfig(t, s, conf, fmt.Sprintf(` + listen: 127.0.0.1:-1 + jetstream: { + max_mem_store: 2MB + max_file_store: 8MB + store_dir: '%s' + meta_compact: 100 + } + `, storeDir)) + + require_Equal(t, s.getOpts().JetStreamMetaCompact, 100) + + reloadUpdateConfig(t, s, conf, fmt.Sprintf(` + listen: 127.0.0.1:-1 + jetstream: { + max_mem_store: 2MB + max_file_store: 8MB + store_dir: '%s' + meta_compact: 0 + } + `, storeDir)) + + require_Equal(t, s.getOpts().JetStreamMetaCompact, 0) +} diff --git a/server/memstore.go b/server/memstore.go index 22385337e1c..ef0437004c3 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -316,12 +316,21 @@ func (ms *memStore) StoreMsg(subj string, hdr, msg []byte, ttl int64) (uint64, i } // SkipMsg will use the next sequence number but not store anything. -func (ms *memStore) SkipMsg() uint64 { +func (ms *memStore) SkipMsg(seq uint64) (uint64, error) { // Grab time. now := time.Unix(0, ats.AccessTime()).UTC() ms.mu.Lock() - seq := ms.state.LastSeq + 1 + defer ms.mu.Unlock() + + // Check sequence matches our last sequence. + if seq != ms.state.LastSeq+1 { + if seq > 0 { + return 0, ErrSequenceMismatch + } + seq = ms.state.LastSeq + 1 + } + ms.state.LastSeq = seq ms.state.LastTime = now if ms.state.Msgs == 0 { @@ -330,8 +339,7 @@ func (ms *memStore) SkipMsg() uint64 { } else { ms.dmap.Insert(seq) } - ms.mu.Unlock() - return seq + return seq, nil } // Skip multiple msgs. diff --git a/server/memstore_test.go b/server/memstore_test.go index 2ce5f668a7a..4ccf5c54944 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -1349,7 +1349,7 @@ func Benchmark_MemStoreNumPendingWithLargeInteriorDeletesScan(b *testing.B) { msg := []byte("abc") ms.StoreMsg("foo.bar.baz", nil, msg, 0) for i := 1; i <= 1_000_000; i++ { - ms.SkipMsg() + ms.SkipMsg(0) } ms.StoreMsg("foo.bar.baz", nil, msg, 0) @@ -1376,7 +1376,7 @@ func Benchmark_MemStoreNumPendingWithLargeInteriorDeletesExclude(b *testing.B) { msg := []byte("abc") ms.StoreMsg("foo.bar.baz", nil, msg, 0) for i := 1; i <= 1_000_000; i++ { - ms.SkipMsg() + ms.SkipMsg(0) } ms.StoreMsg("foo.bar.baz", nil, msg, 0) diff --git a/server/monitor.go b/server/monitor.go index 352d0b7cb96..af3af672187 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1454,7 +1454,8 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { LeafNodes %s Gateways %s Raft Groups %s - Health Probe %s + Health Probe %s + Expvar %s Help `, @@ -1471,6 +1472,7 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { s.basePath(GatewayzPath), GatewayzPath, s.basePath(RaftzPath), RaftzPath, s.basePath(HealthzPath), HealthzPath, + s.basePath(ExpvarzPath), ExpvarzPath, ) } @@ -2889,18 +2891,20 @@ type MetaClusterInfo struct { // JSInfo has detailed information on JetStream. type JSInfo struct { JetStreamStats - ID string `json:"server_id"` - Now time.Time `json:"now"` - Disabled bool `json:"disabled,omitempty"` - Config JetStreamConfig `json:"config,omitempty"` - Limits *JSLimitOpts `json:"limits,omitempty"` - Streams int `json:"streams"` - Consumers int `json:"consumers"` - Messages uint64 `json:"messages"` - Bytes uint64 `json:"bytes"` - Meta *MetaClusterInfo `json:"meta_cluster,omitempty"` - AccountDetails []*AccountDetail `json:"account_details,omitempty"` - Total int `json:"total"` + ID string `json:"server_id"` + Now time.Time `json:"now"` + Disabled bool `json:"disabled,omitempty"` + Config JetStreamConfig `json:"config,omitempty"` + Limits *JSLimitOpts `json:"limits,omitempty"` + Streams int `json:"streams"` + StreamsLeader int `json:"streams_leader,omitempty"` + Consumers int `json:"consumers"` + ConsumersLeader int `json:"consumers_leader,omitempty"` + Messages uint64 `json:"messages"` + Bytes uint64 `json:"bytes"` + Meta *MetaClusterInfo `json:"meta_cluster,omitempty"` + AccountDetails []*AccountDetail `json:"account_details,omitempty"` + Total int `json:"total"` } func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, optRaft, optStreamLeader bool) *AccountDetail { @@ -3120,6 +3124,16 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) { jsi.Messages += streamState.Msgs jsi.Bytes += streamState.Bytes jsi.Consumers += streamState.Consumers + if opts.RaftGroups { + if node := stream.raftNode(); node == nil || node.Leader() { + jsi.StreamsLeader++ + } + for _, consumer := range stream.getPublicConsumers() { + if node := consumer.raftNode(); node == nil || node.Leader() { + jsi.ConsumersLeader++ + } + } + } } } diff --git a/server/msgtrace_test.go b/server/msgtrace_test.go index 57b72186f94..769a5e2cd4c 100644 --- a/server/msgtrace_test.go +++ b/server/msgtrace_test.go @@ -1496,17 +1496,33 @@ func TestMsgTraceWithGateways(t *testing.T) { s1 := runGatewayServer(o1) defer s1.Shutdown() - waitForOutboundGateways(t, s1, 1, time.Second) - waitForInboundGateways(t, s2, 1, time.Second) - waitForOutboundGateways(t, s2, 1, time.Second) + o3 := testGatewayOptionsFromToWithServers(t, "C", "B", s2) + o3.NoSystemAccount = false + s3 := runGatewayServer(o3) + defer s3.Shutdown() + + waitForOutboundGateways(t, s1, 2, time.Second) + waitForInboundGateways(t, s1, 2, time.Second) + waitForInboundGateways(t, s2, 2, time.Second) + waitForOutboundGateways(t, s2, 2, time.Second) + waitForInboundGateways(t, s3, 2, time.Second) + waitForOutboundGateways(t, s3, 2, time.Second) nc2 := natsConnect(t, s2.ClientURL(), nats.Name("sub2")) defer nc2.Close() - sub2 := natsQueueSubSync(t, nc2, "foo.*", "my_queue") + sub2 := natsQueueSubSync(t, nc2, "foo.*", "my_queue_2") + + nc22 := natsConnect(t, s2.ClientURL(), nats.Name("sub22")) + defer nc22.Close() + sub22 := natsQueueSubSync(t, nc22, "*.*", "my_queue_22") - nc3 := natsConnect(t, s2.ClientURL(), nats.Name("sub3")) + nc3 := natsConnect(t, s3.ClientURL(), nats.Name("sub3")) defer nc3.Close() - sub3 := natsQueueSubSync(t, nc3, "*.*", "my_queue_2") + sub3 := natsQueueSubSync(t, nc3, "foo.*", "my_queue_3") + + nc32 := natsConnect(t, s3.ClientURL(), nats.Name("sub32")) + defer nc32.Close() + sub32 := natsQueueSubSync(t, nc32, "*.*", "my_queue_32") nc1 := natsConnect(t, s1.ClientURL(), nats.Name("sub1")) defer nc1.Close() @@ -1540,17 +1556,18 @@ func TestMsgTraceWithGateways(t *testing.T) { checkAppMsg := func(sub *nats.Subscription, expected bool) { if expected { appMsg := natsNexMsg(t, sub, time.Second) - require_Equal[string](t, string(appMsg.Data), "hello!") + require_Equal(t, string(appMsg.Data), "hello!") } // Check that no (more) messages are received. if msg, err := sub.NextMsg(100 * time.Millisecond); err != nats.ErrTimeout { t.Fatalf("Did not expect application message, got %s", msg.Data) } } - for _, sub := range []*nats.Subscription{sub1, sub2, sub3} { + for _, sub := range []*nats.Subscription{sub1, sub2, sub22, sub3, sub32} { checkAppMsg(sub, test.deliverMsg) } + var previousHop string check := func() { traceMsg := natsNexMsg(t, traceSub, time.Second) var e MsgTraceEvent @@ -1560,58 +1577,81 @@ func TestMsgTraceWithGateways(t *testing.T) { require_True(t, ingress != nil) switch ingress.Kind { case CLIENT: - require_Equal[string](t, e.Server.Name, s1.Name()) - require_Equal[string](t, ingress.Account, globalAccountName) - require_Equal[string](t, ingress.Subject, "foo.bar") + require_Equal(t, e.Server.Name, s1.Name()) + require_Equal(t, ingress.Account, globalAccountName) + require_Equal(t, ingress.Subject, "foo.bar") egress := e.Egresses() - require_Equal[int](t, len(egress), 2) + require_Equal(t, len(egress), 3) for _, eg := range egress { switch eg.Kind { case CLIENT: - require_Equal[string](t, eg.Name, "sub1") - require_Equal[string](t, eg.Subscription, "*.bar") - require_Equal[string](t, eg.Queue, _EMPTY_) + require_Equal(t, eg.Name, "sub1") + require_Equal(t, eg.Subscription, "*.bar") + require_Equal(t, eg.Queue, _EMPTY_) case GATEWAY: - require_Equal[string](t, eg.Name, s2.Name()) - require_Equal[string](t, eg.Error, _EMPTY_) - require_Equal[string](t, eg.Subscription, _EMPTY_) - require_Equal[string](t, eg.Queue, _EMPTY_) + if eg.Name != s2.Name() && eg.Name != s3.Name() { + t.Fatalf("Expected name to be %q or %q, got %q", s2.Name(), s3.Name(), eg.Name) + } + require_Equal(t, eg.Error, _EMPTY_) + require_Equal(t, eg.Subscription, _EMPTY_) + require_Equal(t, eg.Queue, _EMPTY_) default: t.Fatalf("Unexpected egress: %+v", eg) } } case GATEWAY: - require_Equal[string](t, e.Server.Name, s2.Name()) - require_Equal[string](t, ingress.Account, globalAccountName) - require_Equal[string](t, ingress.Subject, "foo.bar") + require_True(t, e.Request.Header != nil) + require_Len(t, len(e.Request.Header[MsgTraceHop]), 1) + hop := e.Request.Header[MsgTraceHop][0] + require_True(t, hop == "1" || hop == "2") + if previousHop == _EMPTY_ { + previousHop = hop + } else if hop == previousHop { + t.Fatalf("Expected different hop value, got the same %q", hop) + } + var sub2Name, queue2Name, sub3Name, queue3Name string + switch e.Server.Name { + case s2.Name(): + require_Equal(t, e.Server.Cluster, "B") + sub2Name, sub3Name = "sub2", "sub22" + queue2Name, queue3Name = "my_queue_2", "my_queue_22" + case s3.Name(): + require_Equal(t, e.Server.Cluster, "C") + sub2Name, sub3Name = "sub3", "sub32" + queue2Name, queue3Name = "my_queue_3", "my_queue_32" + default: + t.Fatalf("Unexpected server name %q", e.Server.Name) + } + require_Equal(t, ingress.Account, globalAccountName) + require_Equal(t, ingress.Subject, "foo.bar") egress := e.Egresses() - require_Equal[int](t, len(egress), 2) + require_Equal(t, len(egress), 2) var gotSub2, gotSub3 int for _, eg := range egress { require_True(t, eg.Kind == CLIENT) switch eg.Name { - case "sub2": - require_Equal[string](t, eg.Subscription, "foo.*") - require_Equal[string](t, eg.Queue, "my_queue") + case sub2Name: + require_Equal(t, eg.Subscription, "foo.*") + require_Equal(t, eg.Queue, queue2Name) gotSub2++ - case "sub3": - require_Equal[string](t, eg.Subscription, "*.*") - require_Equal[string](t, eg.Queue, "my_queue_2") + case sub3Name: + require_Equal(t, eg.Subscription, "*.*") + require_Equal(t, eg.Queue, queue3Name) gotSub3++ default: t.Fatalf("Unexpected egress name: %+v", eg) } } - require_Equal[int](t, gotSub2, 1) - require_Equal[int](t, gotSub3, 1) - + require_Equal(t, gotSub2, 1) + require_Equal(t, gotSub3, 1) default: t.Fatalf("Unexpected ingress: %+v", ingress) } } - // We should get 2 events - check() - check() + // We should get 3 events + for range 3 { + check() + } // Make sure we are not receiving more traces if tm, err := traceSub.NextMsg(250 * time.Millisecond); err == nil { t.Fatalf("Should not have received trace message: %s", tm.Data) diff --git a/server/opts.go b/server/opts.go index 07b207c0bc3..56bd7be7000 100644 --- a/server/opts.go +++ b/server/opts.go @@ -15,6 +15,7 @@ package server import ( "context" + "crypto/fips140" "crypto/tls" "crypto/x509" "errors" @@ -342,6 +343,7 @@ type Options struct { JetStreamTpm JSTpmOpts JetStreamMaxCatchup int64 JetStreamRequestQueueLimit int64 + JetStreamMetaCompact uint64 StreamMaxBufferedMsgs int `json:"-"` StreamMaxBufferedSize int64 `json:"-"` StoreDir string `json:"-"` @@ -2349,6 +2351,9 @@ func parseJetStreamTPM(v interface{}, opts *Options, errors *[]error) error { func setJetStreamEkCipher(opts *Options, mv interface{}, tk token) error { switch strings.ToLower(mv.(string)) { case "chacha", "chachapoly": + if fips140.Enabled() { + return &configErr{tk, fmt.Sprintf("Cipher type %q cannot be used in FIPS-140 mode", mv)} + } opts.JetStreamCipher = ChaCha case "aes": opts.JetStreamCipher = AES @@ -2464,6 +2469,12 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} } opts.JetStreamRequestQueueLimit = lim + case "meta_compact": + thres, ok := mv.(int64) + if !ok || thres < 0 { + return &configErr{tk, fmt.Sprintf("Expected an absolute size for %q, got %v", mk, mv)} + } + opts.JetStreamMetaCompact = uint64(thres) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -4186,6 +4197,10 @@ func parseAuthorization(v any, errors, warnings *[]error) (*authorization, error } auth.defaultPermissions = permissions case "auth_callout", "auth_hook": + if fips140.Enabled() { + *errors = append(*errors, fmt.Errorf("'auth_callout' cannot be configured in FIPS-140 mode")) + continue + } ac, err := parseAuthCallout(tk, errors) if err != nil { *errors = append(*errors, err) diff --git a/server/raft.go b/server/raft.go index 62337260336..2e466824b8b 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2615,9 +2615,6 @@ func (n *raft) runAsLeader() { n.unsubscribe(rpsub) n.Unlock() }() - - // To send out our initial peer state. - n.sendPeerState() n.Unlock() hb := time.NewTicker(hbInterval) @@ -2692,7 +2689,6 @@ func (n *raft) runAsLeader() { n.stepdown(noLeader) return } - n.trackPeer(vresp.peer) case <-n.reqs.ch: // Because of drain() it is possible that we get nil from popOne(). if voteReq, ok := n.reqs.popOne(); ok { @@ -3056,7 +3052,7 @@ func (n *raft) applyCommit(index uint64) error { if lp, ok := n.peers[newPeer]; !ok { // We are not tracking this one automatically so we need to bump cluster size. - n.peers[newPeer] = &lps{time.Now(), 0, true} + n.peers[newPeer] = &lps{time.Time{}, 0, true} } else { // Mark as added. lp.kp = true @@ -3440,6 +3436,17 @@ func (n *raft) updateLeader(newLeader string) { } } } + // Reset last seen timestamps. + // If we're the leader we track everyone, and don't reset. + // But if we're a follower we only track the leader, and reset all others. + if newLeader != n.id { + for peer, ps := range n.peers { + if peer == newLeader { + continue + } + ps.ts = time.Time{} + } + } } // processAppendEntry will process an appendEntry. This is called either @@ -3530,19 +3537,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { // sub, rather than a catch-up sub. isNew := sub != nil && sub == n.aesub - // Track leader directly - if isNew && ae.leader != noLeader { - if ps := n.peers[ae.leader]; ps != nil { - ps.ts = time.Now() - } else { - n.peers[ae.leader] = &lps{time.Now(), 0, true} - } - } - // If we are/were catching up ignore old catchup subs, but only if catching up from an older server - // that doesn't send the leader term when catching up. We can reject old catchups from newer subs - // later, just by checking the append entry is on the correct term. - if !isNew && sub != nil && ae.lterm == 0 && (!catchingUp || sub != n.catchup.sub) { + // that doesn't send the leader term when catching up or if we would truncate as a result. + // We can reject old catchups from newer subs later, just by checking the append entry is on the correct term. + if !isNew && sub != nil && (ae.lterm == 0 || ae.pindex < n.pindex) && (!catchingUp || sub != n.catchup.sub) { n.Unlock() n.debug("AppendEntry ignoring old entry from previous catchup") return @@ -3610,6 +3608,16 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.updateLeadChange(false) } + // Track leader directly + // But, do so after all consistency checks so we don't track an old leader. + if isNew && ae.leader != noLeader && ae.leader == n.leader { + if ps := n.peers[ae.leader]; ps != nil { + ps.ts = time.Now() + } else { + n.peers[ae.leader] = &lps{time.Now(), 0, true} + } + } + if ae.pterm != n.pterm || ae.pindex != n.pindex { // Check if this is a lower or equal index than what we were expecting. if ae.pindex <= n.pindex { @@ -3776,10 +3784,8 @@ CONTINUE: case EntryAddPeer: if newPeer := string(e.Data); len(newPeer) == idLen { // Track directly, but wait for commit to be official - if ps := n.peers[newPeer]; ps != nil { - ps.ts = time.Now() - } else { - n.peers[newPeer] = &lps{time.Now(), 0, false} + if _, ok := n.peers[newPeer]; !ok { + n.peers[newPeer] = &lps{time.Time{}, 0, false} } // Store our peer in our global peer map for all peers. peers.LoadOrStore(newPeer, newPeer) @@ -4316,10 +4322,6 @@ func (n *raft) processVoteRequest(vr *voteRequest) error { } n.debug("Received a voteRequest %+v", vr) - if err := n.trackPeer(vr.candidate); err != nil { - return err - } - n.Lock() vresp := &voteResponse{n.term, n.id, false} @@ -4544,37 +4546,19 @@ func (n *raft) switchToLeader() { } n.Lock() + defer n.Unlock() n.debug("Switching to leader") - // Check if we have items pending as we are taking over. - sendHB := n.pindex > n.commit - n.lxfer = false n.updateLeader(n.id) - leadChange := n.switchState(Leader) - - if leadChange { - // Wait for messages to be applied if we've stored more, otherwise signal immediately. - // It's important to wait signaling we're leader if we're not up-to-date yet, as that - // would mean we're in a consistent state compared with the previous leader. - if n.pindex > n.applied { - n.aflr = n.pindex - } else { - // We know we have applied all entries in our log and can signal immediately. - // For sanity reset applied floor back down to 0, so we aren't able to signal twice. - n.aflr = 0 - if !n.leaderState.Swap(true) { - // Only update timestamp if leader state actually changed. - nowts := time.Now().UTC() - n.leaderSince.Store(&nowts) - } - n.updateLeadChange(true) - } - } - n.Unlock() + n.switchState(Leader) - if sendHB { - n.sendHeartbeat() - } + // To send out our initial peer state. + // In our implementation this is equivalent to sending a NOOP-entry upon becoming leader. + // Wait for this message (and potentially more) to be applied. + // It's important to wait signaling we're leader if we're not up-to-date yet, as that + // would mean we're in a consistent state compared with the previous leader. + n.sendPeerState() + n.aflr = n.pindex } diff --git a/server/raft_test.go b/server/raft_test.go index 8fcd1599182..48aa265bbbf 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1590,11 +1590,11 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { aeMsg2 := encode(t, &appendEntry{leader: nats1, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) // Timeline, we temporarily became leader - aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 2, pterm: 1, pindex: 2, entries: nil}) - aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 2, pterm: 1, pindex: 2, entries: entries}) + aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 3, pterm: 1, pindex: 3, entries: nil}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 3, pterm: 1, pindex: 3, entries: entries}) // Timeline, old leader is back. - aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 3, commit: 2, pterm: 1, pindex: 2, entries: nil}) + aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 3, commit: 3, pterm: 1, pindex: 3, entries: nil}) // Simply receive first message. n.processAppendEntry(aeMsg1, n.aesub) @@ -1624,10 +1624,10 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { reply: _EMPTY_, success: true, }) - require_Equal(t, n.commit, 2) + require_Equal(t, n.commit, 3) // Simulate upper layer calling down to apply. - n.Applied(2) + n.Applied(3) // Install snapshot and check it exists. err = n.InstallSnapshot(nil) @@ -1641,9 +1641,9 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { // Store a third message, it stays uncommitted. require_NoError(t, n.storeToWAL(aeMsg3)) - require_Equal(t, n.commit, 2) + require_Equal(t, n.commit, 3) require_Equal(t, n.wal.State().Msgs, 1) - entry, err = n.loadEntry(3) + entry, err = n.loadEntry(4) require_NoError(t, err) require_Equal(t, entry.leader, nats0) @@ -1651,8 +1651,8 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { n.stepdown(noLeader) n.processAppendEntry(aeHeartbeat2, n.aesub) require_Equal(t, n.wal.State().Msgs, 0) - require_Equal(t, n.commit, 2) - require_Equal(t, n.applied, 2) + require_Equal(t, n.commit, 3) + require_Equal(t, n.applied, 3) } func TestNRGIgnoreDoubleSnapshot(t *testing.T) { @@ -2232,7 +2232,7 @@ func TestNRGHealthCheckWaitForPendingCommitsWhenPaused(t *testing.T) { require_True(t, n.Healthy()) } -func TestNRGHeartbeatCanEstablishQuorumAfterLeaderChange(t *testing.T) { +func TestNRGAppendEntryCanEstablishQuorumAfterLeaderChange(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() @@ -2244,7 +2244,7 @@ func TestNRGHeartbeatCanEstablishQuorumAfterLeaderChange(t *testing.T) { // Timeline aeMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) - aeHeartbeatResponse := &appendEntryResponse{term: 1, index: 1, peer: nats0, success: true} + aeHeartbeatResponse := &appendEntryResponse{term: 1, index: 2, peer: nats0, success: true} // Process first message. n.processAppendEntry(aeMsg, n.aesub) @@ -2254,16 +2254,16 @@ func TestNRGHeartbeatCanEstablishQuorumAfterLeaderChange(t *testing.T) { // Simulate becoming leader, and not knowing if the stored entry has quorum and can be committed. // Switching to leader should send a heartbeat. n.switchToLeader() - require_Equal(t, n.aflr, 1) + require_Equal(t, n.aflr, 2) require_Equal(t, n.commit, 0) // We simulate receiving the successful heartbeat response here. It should move the commit up. n.processAppendEntryResponse(aeHeartbeatResponse) - require_Equal(t, n.commit, 1) - require_Equal(t, n.aflr, 1) + require_Equal(t, n.commit, 2) + require_Equal(t, n.aflr, 2) // Once the entry is applied, it should reset the applied floor. - n.Applied(1) + n.Applied(2) require_Equal(t, n.aflr, 0) } @@ -2271,10 +2271,6 @@ func TestNRGQuorumAccounting(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() - // Create a sample entry, the content doesn't matter, just that it's stored. - esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) - entries := []*Entry{newEntry(EntryNormal, esm)} - nats1 := "yrzKKRBu" // "nats-1" nats2 := "cnrtt3eg" // "nats-2" @@ -2287,10 +2283,8 @@ func TestNRGQuorumAccounting(t *testing.T) { require_Equal(t, n.csz, 5) require_Equal(t, n.qn, 3) - // Switch this node to leader, and send an entry. + // Switch this node to leader which sends an entry. n.switchToLeader() - require_Equal(t, n.pindex, 0) - n.sendAppendEntry(entries) require_Equal(t, n.pindex, 1) // The first response MUST NOT indicate quorum has been reached. @@ -2306,10 +2300,6 @@ func TestNRGRevalidateQuorumAfterLeaderChange(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() - // Create a sample entry, the content doesn't matter, just that it's stored. - esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) - entries := []*Entry{newEntry(EntryNormal, esm)} - nats1 := "yrzKKRBu" // "nats-1" nats2 := "cnrtt3eg" // "nats-2" @@ -2322,12 +2312,10 @@ func TestNRGRevalidateQuorumAfterLeaderChange(t *testing.T) { require_Equal(t, n.csz, 5) require_Equal(t, n.qn, 3) - // Switch this node to leader, and send an entry. + // Switch this node to leader which sends an entry. n.term++ n.switchToLeader() require_Equal(t, n.term, 1) - require_Equal(t, n.pindex, 0) - n.sendAppendEntry(entries) require_Equal(t, n.pindex, 1) // We have one server that signals the message was stored. The leader will add 1 to the acks count. @@ -2374,10 +2362,10 @@ func TestNRGSignalLeadChangeFalseIfCampaignImmediately(t *testing.T) { n.switchToCandidate() n.switchToLeader() select { - case isLeader := <-n.LeadChangeC(): - require_True(t, isLeader) + case <-n.LeadChangeC(): + t.Error("Expected no leadChange signal") default: - t.Error("Expected leadChange signal") + // Expecting no signal yet. } }, }, @@ -2480,16 +2468,10 @@ func TestNRGIgnoreTrackResponseWhenNotLeader(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() - // Create a sample entry, the content doesn't matter, just that it's stored. - esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) - entries := []*Entry{newEntry(EntryNormal, esm)} - - // Switch this node to leader, and send two entries. The first will get quorum, the second will not. + // Switch this node to leader which sends an entry. n.term++ n.switchToLeader() require_Equal(t, n.term, 1) - require_Equal(t, n.pindex, 0) - n.sendAppendEntry(entries) require_Equal(t, n.pindex, 1) require_Equal(t, n.pterm, 1) require_Equal(t, n.commit, 0) @@ -3453,6 +3435,168 @@ func TestNRGDrainAndReplaySnapshot(t *testing.T) { require_Equal(t, n.hcommit, 0) } +func TestNRGTrackPeerActive(t *testing.T) { + // The leader should track timestamps for all peers. + // Each follower should only track the leader, otherwise we would get outdated timestamps. + checkLastSeen := func(peers map[string]RaftzGroupPeer) { + for _, peer := range peers { + if peer.LastSeen == _EMPTY_ { + continue + } + elapsed, err := time.ParseDuration(peer.LastSeen) + require_NoError(t, err) + require_LessThan(t, elapsed, time.Second) + } + } + + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + ml := c.leader() + rs := c.randomNonLeader() + var preferred *Server + for _, s := range c.servers { + if s == ml || s == rs { + continue + } + preferred = s + break + } + require_NotNil(t, preferred) + + time.Sleep(2 * time.Second) + before := (*rs.Raftz(&RaftzOptions{}))[DEFAULT_SYSTEM_ACCOUNT][defaultMetaGroupName].Peers + checkLastSeen(before) + + js := ml.getJetStream() + n := js.getMetaGroup() + require_NoError(t, n.StepDown(preferred.NodeName())) + + time.Sleep(2 * time.Second) + after := (*rs.Raftz(&RaftzOptions{}))[DEFAULT_SYSTEM_ACCOUNT][defaultMetaGroupName].Peers + checkLastSeen(after) +} + +func TestNRGLostQuorum(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + require_Equal(t, n.State(), Follower) + require_False(t, n.Quorum()) + require_True(t, n.lostQuorum()) + + nc, err := nats.Connect(n.s.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + require_NoError(t, err) + defer nc.Close() + + // Respond to a vote request. + sub, err := nc.Subscribe(n.vsubj, func(m *nats.Msg) { + req := decodeVoteRequest(m.Data, m.Reply) + resp := voteResponse{term: req.term, peer: "random", granted: true} + m.Respond(resp.encode()) + }) + require_NoError(t, err) + defer sub.Drain() + require_NoError(t, nc.Flush()) + + // Switch to candidate and make sure we properly track the peer as active. + n.switchToCandidate() + require_Equal(t, n.State(), Candidate) + require_False(t, n.Quorum()) + require_True(t, n.lostQuorum()) + + n.runAsCandidate() + require_Equal(t, n.State(), Leader) + require_True(t, n.Quorum()) + require_False(t, n.lostQuorum()) +} + +func TestNRGParallelCatchupRollback(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + aeReply := "$TEST" + nc, err := nats.Connect(n.s.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + require_NoError(t, err) + defer nc.Close() + + sub, err := nc.SubscribeSync(aeReply) + require_NoError(t, err) + defer sub.Drain() + require_NoError(t, nc.Flush()) + + // Timeline + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, lterm: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, lterm: 1, commit: 0, pterm: 1, pindex: 1, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 2, entries: nil, reply: aeReply}) + + // Trigger a catchup. + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.pindex, 0) + require_NotNil(t, n.catchup) + require_Equal(t, n.catchup.cterm, aeMsg2.pterm) + require_Equal(t, n.catchup.cindex, aeMsg2.pindex) + csub := n.catchup.sub + + // Receive the missed messages. + n.processAppendEntry(aeMsg1, csub) + require_Equal(t, n.pindex, 1) + n.processAppendEntry(aeMsg2, csub) + require_Equal(t, n.pindex, 2) + require_True(t, n.catchup == nil) + + // Should respond to the heartbeat and allow the leader to commit. + n.processAppendEntry(aeHeartbeat, n.aesub) + msg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + ar := decodeAppendEntryResponse(msg.Data) + require_NotNil(t, ar) + require_True(t, ar.success) + require_Equal(t, ar.term, aeHeartbeat.term) + require_Equal(t, ar.index, aeHeartbeat.pindex) + + // Now replay a message that was already received as a catchup entry. + // Likely due to running multiple catchups in parallel. + // Since our WAL is already ahead, we should not truncate based on this. + n.processAppendEntry(aeMsg1, csub) + require_Equal(t, n.pindex, 2) +} + +func TestNRGReportLeaderAfterNoopEntry(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + require_Equal(t, n.State(), Follower) + require_Equal(t, n.term, 0) + require_False(t, n.Leader()) + + n.switchToCandidate() + require_Equal(t, n.State(), Candidate) + require_Equal(t, n.term, 1) + require_False(t, n.Leader()) + + // Switching to leader will put us into Leader state, + // but we're not necessarily an up-to-date leader yet. + n.switchToLeader() + require_Equal(t, n.State(), Leader) + require_Equal(t, n.term, 1) + require_Equal(t, n.pindex, 1) // Should've sent a NOOP-entry to establish leadership. + require_Equal(t, n.applied, 0) + require_False(t, n.Leader()) + + // Once we commit and apply the final entry, we should starting to report we're leader. + n.commit = 1 + n.Applied(1) + require_Equal(t, n.applied, 1) + require_True(t, n.Leader()) +} + // This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before // proposing the next one. // The test may fail if: diff --git a/server/reload.go b/server/reload.go index 784b905778b..123aad99adc 100644 --- a/server/reload.go +++ b/server/reload.go @@ -1184,7 +1184,7 @@ func imposeOrder(value any) error { slices.SortFunc(value.Gateways, func(i, j *RemoteGatewayOpts) int { return cmp.Compare(i.Name, j.Name) }) case WebsocketOpts: slices.Sort(value.AllowedOrigins) - case string, bool, uint8, uint16, int, int32, int64, time.Duration, float64, nil, LeafNodeOpts, ClusterOpts, *tls.Config, PinnedCertSet, + case string, bool, uint8, uint16, uint64, int, int32, int64, time.Duration, float64, nil, LeafNodeOpts, ClusterOpts, *tls.Config, PinnedCertSet, *URLAccResolver, *MemAccResolver, *DirAccResolver, *CacheDirAccResolver, Authentication, MQTTOpts, jwt.TagList, *OCSPConfig, map[string]string, JSLimitOpts, StoreCipher, *OCSPResponseCacheConfig: // explicitly skipped types @@ -1575,6 +1575,8 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { return nil, fmt.Errorf("config reload not supported for jetstream max memory and store") } } + case "jetstreammetacompact": + // Allowed at runtime but monitorCluster looks at s.opts directly, so no further work needed here. case "websocket": // Similar to gateways tmpOld := oldValue.(WebsocketOpts) diff --git a/server/route.go b/server/route.go index ac4d9c66b8a..871506e48d8 100644 --- a/server/route.go +++ b/server/route.go @@ -2345,8 +2345,20 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo bool, gossipMod if doOnce { // check to be consistent and future proof. but will be same domain if s.sameDomain(info.Domain) { - s.nodeToInfo.Store(rHash, - nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false, false}) + s.nodeToInfo.Store(rHash, nodeInfo{ + name: rn, + version: s.info.Version, + cluster: s.info.Cluster, + domain: info.Domain, + id: id, + tags: nil, + cfg: nil, + stats: nil, + offline: false, + js: info.JetStream, + binarySnapshots: true, // Updated default to true. Versions 2.10.0+ support it. + accountNRG: false, + }) } } diff --git a/server/server.go b/server/server.go index cf539630fbc..69944eef6d9 100644 --- a/server/server.go +++ b/server/server.go @@ -16,6 +16,7 @@ package server import ( "bytes" "context" + "crypto/fips140" "crypto/tls" "encoding/json" "errors" @@ -42,6 +43,8 @@ import ( // Allow dynamic profiling. _ "net/http/pprof" + "expvar" + "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats-server/v2/logger" @@ -670,8 +673,12 @@ func NewServer(opts *Options) (*Server, error) { pub, _ := kp.PublicKey() // Create an xkey for encrypting messages from this server. - xkp, _ := nkeys.CreateCurveKeys() - xpub, _ := xkp.PublicKey() + var xkp nkeys.KeyPair + var xpub string + if !fips140.Enabled() { + xkp, _ = nkeys.CreateCurveKeys() + xpub, _ = xkp.PublicKey() + } serverName := pub if opts.ServerName != _EMPTY_ { @@ -778,15 +785,18 @@ func NewServer(opts *Options) (*Server, error) { if opts.JetStream { ourNode := getHash(serverName) s.nodeToInfo.Store(ourNode, nodeInfo{ - serverName, - VERSION, - opts.Cluster.Name, - opts.JetStreamDomain, - info.ID, - opts.Tags, - &JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, CompressOK: true}, - nil, - false, true, true, true, + name: serverName, + version: VERSION, + cluster: opts.Cluster.Name, + domain: opts.JetStreamDomain, + id: info.ID, + tags: opts.Tags, + cfg: &JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, CompressOK: true}, + stats: nil, + offline: false, + js: true, + binarySnapshots: true, + accountNRG: true, }) } @@ -2950,6 +2960,7 @@ const ( HealthzPath = "/healthz" IPQueuesPath = "/ipqueuesz" RaftzPath = "/raftz" + ExpvarzPath = "/debug/vars" ) func (s *Server) basePath(p string) string { @@ -3068,6 +3079,8 @@ func (s *Server) startMonitoring(secure bool) error { mux.HandleFunc(s.basePath(IPQueuesPath), s.HandleIPQueuesz) // Raftz mux.HandleFunc(s.basePath(RaftzPath), s.HandleRaftz) + // Expvarz + mux.Handle(s.basePath(ExpvarzPath), expvar.Handler()) // Do not set a WriteTimeout because it could cause cURL/browser // to return empty response or unable to display page if the diff --git a/server/store.go b/server/store.go index 271c02cbf45..3f3ec177393 100644 --- a/server/store.go +++ b/server/store.go @@ -92,7 +92,7 @@ type SubjectDeleteMarkerUpdateHandler func(*inMsg) type StreamStore interface { StoreMsg(subject string, hdr, msg []byte, ttl int64) (uint64, int64, error) StoreRawMsg(subject string, hdr, msg []byte, seq uint64, ts int64, ttl int64) error - SkipMsg() uint64 + SkipMsg(seq uint64) (uint64, error) SkipMsgs(seq uint64, num uint64) error LoadMsg(seq uint64, sm *StoreMsg) (*StoreMsg, error) LoadNextMsg(filter string, wc bool, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) diff --git a/server/store_test.go b/server/store_test.go index 8bbcc80f41c..e576844c9d3 100644 --- a/server/store_test.go +++ b/server/store_test.go @@ -639,7 +639,7 @@ func TestStoreStreamInteriorDeleteAccounting(t *testing.T) { { title: "SkipMsg", action: func(s StreamStore, lseq uint64) { - s.SkipMsg() + s.SkipMsg(0) }, }, { diff --git a/server/stream.go b/server/stream.go index 287dec0ea64..da3b6c2538d 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2858,6 +2858,7 @@ func (mset *stream) setupMirrorConsumer() error { } mirror := mset.mirror + mirrorWg := &mirror.wg // We want to throttle here in terms of how fast we request new consumers, // or if the previous is still in progress. @@ -3016,7 +3017,7 @@ func (mset *stream) setupMirrorConsumer() error { // Wait for previous processMirrorMsgs go routine to be completely done. // If none is running, this will not block. - mirror.wg.Wait() + mirrorWg.Wait() select { case ccr := <-respCh: @@ -5228,7 +5229,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // Skip msg here. if noInterest { - mset.lseq = store.SkipMsg() + mset.lseq, _ = store.SkipMsg(0) mset.lmsgId = msgId // If we have a msgId make sure to save. if msgId != _EMPTY_ {