diff --git a/.github/actions/nightly-release/action.yaml b/.github/actions/nightly-release/action.yaml index 26cafcd870b..6c7a8bef2e3 100644 --- a/.github/actions/nightly-release/action.yaml +++ b/.github/actions/nightly-release/action.yaml @@ -10,6 +10,11 @@ inputs: description: Docker hub password required: true + name: + description: The name of the build + default: nightly + required: false + workdir: description: The working directory for actions requiring it required: true @@ -21,16 +26,31 @@ runs: shell: bash run: docker login -u "${{ inputs.hub_username }}" -p "${{ inputs.hub_password }}" + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Set up Go uses: actions/setup-go@v6 with: go-version: stable + - name: Generate build metadata + shell: sh + run: | + echo "BUILD_DATE=$(date +%Y%m%d)" >> $GITHUB_ENV + echo "GIT_COMMIT=$(git rev-parse --short HEAD)" >> $GITHUB_ENV + echo "BUILD_NAME=$(echo ${{ inputs.name }} | tr '[:upper:]' '[:lower:]' | sed 's/[^a-z0-9.-]/-/g')" >> $GITHUB_ENV + - name: Build and push Docker images - # Use commit hash here to avoid a re-tagging attack, as this is a third-party action - # Commit 9ed2f89a662bf1735a48bc8557fd212fa902bebf = tag v6.1.0 - uses: goreleaser/goreleaser-action@9ed2f89a662bf1735a48bc8557fd212fa902bebf + uses: docker/build-push-action@v6 with: - workdir: "${{ inputs.workdir }}" - version: ~> v2 - args: release --skip=announce,validate --config .goreleaser-nightly.yml + context: "${{ inputs.workdir }}" + file: "${{ inputs.workdir }}/docker/Dockerfile.nightly" + build-args: | + VERSION=nightly-${{ env.BUILD_DATE }} + GIT_COMMIT=${{ env.GIT_COMMIT }} + platforms: linux/amd64,linux/arm64 + push: true + tags: | + synadia/nats-server:${{ env.BUILD_NAME }} + synadia/nats-server:${{ env.BUILD_NAME }}-${{ env.BUILD_DATE }} diff --git a/.github/workflows/nightly.yaml b/.github/workflows/nightly.yaml index fb22b5e84e0..d1b9f3cb823 100644 --- a/.github/workflows/nightly.yaml +++ b/.github/workflows/nightly.yaml @@ -29,6 +29,7 @@ jobs: - uses: ./src/github.com/nats-io/nats-server/.github/actions/nightly-release with: + name: ${{ inputs.target || 'nightly' }} workdir: src/github.com/nats-io/nats-server hub_username: "${{ secrets.DOCKER_USERNAME }}" hub_password: "${{ secrets.DOCKER_PASSWORD }}" diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 1e2602b2e93..b1cdc3fa503 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -38,8 +38,8 @@ jobs: - name: Install syft # Use commit hash here to avoid a re-tagging attack, as this is a third-party action - # Commit da167eac915b4e86f08b264dbdbc867b61be6f0c = tag v0.20.5 - uses: anchore/sbom-action/download-syft@da167eac915b4e86f08b264dbdbc867b61be6f0c + # Commit f8bdd1d8ac5e901a77a92f111440fdb1b593736b = tag v0.20.6 + uses: anchore/sbom-action/download-syft@f8bdd1d8ac5e901a77a92f111440fdb1b593736b with: syft-version: "v1.27.1" diff --git a/.goreleaser-nightly.yml b/.goreleaser-nightly.yml deleted file mode 100644 index ead4d5e2795..00000000000 --- a/.goreleaser-nightly.yml +++ /dev/null @@ -1,40 +0,0 @@ -project_name: nats-server -version: 2 - -builds: - - main: . - id: nats-server - binary: nats-server - ldflags: - - -w -X github.com/nats-io/nats-server/v2/server.gitCommit={{.ShortCommit}} - env: - - GO111MODULE=on - - CGO_ENABLED=0 - goos: - - linux - goarch: - - amd64 - mod_timestamp: "{{ .CommitTimestamp }}" - -release: - disable: true - -dockers: - - goos: linux - goarch: amd64 - dockerfile: docker/Dockerfile.nightly - skip_push: false - build_flag_templates: - - '--build-arg=VERSION={{ if ne .Branch "main" }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}-{{ time "20060102" }}' - image_templates: - - synadia/nats-server:{{ if ne .Branch "main" }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }} - - synadia/nats-server:{{ if ne .Branch "main" }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}-{{ time "20060102" }} - extra_files: - - docker/nats-server.conf - -checksum: - name_template: "SHA256SUMS" - algorithm: sha256 - -snapshot: - version_template: '{{ if ne .Branch "main" }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}-{{ time "20060102" }}' diff --git a/docker/Dockerfile.nightly b/docker/Dockerfile.nightly index 3c9d22f4b50..17f8e4272e6 100644 --- a/docker/Dockerfile.nightly +++ b/docker/Dockerfile.nightly @@ -1,24 +1,38 @@ FROM golang:alpine AS builder ARG VERSION="nightly" +ARG GIT_COMMIT +ARG TARGETOS +ARG TARGETARCH -RUN apk add --update git -RUN mkdir -p src/github.com/nats-io && \ - cd src/github.com/nats-io/ && \ - git clone https://github.com/nats-io/natscli.git && \ - cd natscli/nats && \ - go build -ldflags "-w -X main.version=${VERSION}" -o /nats +ENV GOOS=$TARGETOS \ + GOARCH=$TARGETARCH \ + GO111MODULE=on \ + CGO_ENABLED=0 -RUN go install github.com/nats-io/nsc/v2@latest +RUN apk add --no-cache git ca-certificates +RUN mkdir -p /src/nats-server /src/natscli /src/nsc -FROM alpine:latest +COPY . /src/nats-server +RUN git clone --depth 1 https://github.com/nats-io/natscli /src/natscli +RUN git clone --depth 1 https://github.com/nats-io/nsc /src/nsc + +WORKDIR /src/nats-server +RUN go install -v -trimpath -ldflags "-w -X server.serverVersion=${VERSION},server.gitCommit=${GIT_COMMIT}" . + +WORKDIR /src/natscli +RUN go install -v -trimpath -ldflags "-w -X main.version=${VERSION}" ./nats -RUN apk add --update ca-certificates && mkdir -p /nats/bin && mkdir /nats/conf +WORKDIR /src/nsc +RUN go install -v -trimpath . + +FROM alpine:latest COPY docker/nats-server.conf /nats/conf/nats-server.conf -COPY nats-server /bin/nats-server -COPY --from=builder /nats /bin/nats +COPY --from=builder /go/bin/nats-server /bin/nats-server +COPY --from=builder /go/bin/nats /bin/nats COPY --from=builder /go/bin/nsc /bin/nsc +COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ EXPOSE 4222 8222 6222 5222 diff --git a/go.mod b/go.mod index 3888c942954..67bc1d3e0f6 100644 --- a/go.mod +++ b/go.mod @@ -6,11 +6,11 @@ toolchain go1.24.7 require ( github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op - github.com/google/go-tpm v0.9.5 + github.com/google/go-tpm v0.9.6 github.com/klauspost/compress v1.18.0 github.com/minio/highwayhash v1.0.3 github.com/nats-io/jwt/v2 v2.8.0 - github.com/nats-io/nats.go v1.45.0 + github.com/nats-io/nats.go v1.46.1 github.com/nats-io/nkeys v0.4.11 github.com/nats-io/nuid v1.0.1 go.uber.org/automaxprocs v1.6.0 diff --git a/go.sum b/go.sum index d0adb2c0cb1..3f4b9fba47f 100644 --- a/go.sum +++ b/go.sum @@ -2,16 +2,16 @@ github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op h1:+OSa/t11TFhqfr github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/google/go-tpm v0.9.5 h1:ocUmnDebX54dnW+MQWGQRbdaAcJELsa6PqZhJ48KwVU= -github.com/google/go-tpm v0.9.5/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= +github.com/google/go-tpm v0.9.6 h1:Ku42PT4LmjDu1H5C5ISWLlpI1mj+Zq7sPGKoRw2XROA= +github.com/google/go-tpm v0.9.6/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g= github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= -github.com/nats-io/nats.go v1.45.0 h1:/wGPbnYXDM0pLKFjZTX+2JOw9TQPoIgTFrUaH97giwA= -github.com/nats-io/nats.go v1.45.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nats.go v1.46.1 h1:bqQ2ZcxVd2lpYI97xYASeRTY3I5boe/IVmuUDPitHfo= +github.com/nats-io/nats.go v1.46.1/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/server/config_check_test.go b/server/config_check_test.go index 0c4373474f9..03a61b0e119 100644 --- a/server/config_check_test.go +++ b/server/config_check_test.go @@ -2398,6 +2398,98 @@ func TestConfigCheck(t *testing.T) { errorLine: 9, errorPos: 9, }, + { + name: "leafnode proxy with unsupported scheme", + config: ` + leafnodes { + remotes = [ + { + url: "ws://127.0.0.1:7422" + proxy { + url: "ftp://proxy.example.com:8080" + } + } + ] + } + `, + err: errors.New("proxy URL scheme must be http or https, got: ftp"), + errorLine: 6, + errorPos: 8, + }, + { + name: "leafnode proxy with missing host", + config: ` + leafnodes { + remotes = [ + { + url: "ws://127.0.0.1:7422" + proxy { + url: "http://" + } + } + ] + } + `, + err: errors.New("proxy URL must specify a host"), + errorLine: 6, + errorPos: 8, + }, + { + name: "leafnode proxy with username but no password", + config: ` + leafnodes { + remotes = [ + { + url: "ws://127.0.0.1:7422" + proxy { + url: "http://proxy.example.com:8080" + username: "testuser" + } + } + ] + } + `, + err: errors.New("proxy username and password must both be specified or both be empty"), + errorLine: 6, + errorPos: 8, + }, + { + name: "leafnode proxy with password but no username", + config: ` + leafnodes { + remotes = [ + { + url: "ws://127.0.0.1:7422" + proxy { + url: "http://proxy.example.com:8080" + password: "testpass" + } + } + ] + } + `, + err: errors.New("proxy username and password must both be specified or both be empty"), + errorLine: 6, + errorPos: 8, + }, + { + name: "leafnode proxy with WSS URL but no TLS config", + config: ` + leafnodes { + remotes = [ + { + url: "wss://127.0.0.1:7422" + proxy { + url: "http://proxy.example.com:8080" + } + } + ] + } + `, + err: errors.New("proxy is configured but remote URL wss://127.0.0.1:7422 requires TLS and no TLS configuration is provided"), + errorLine: 6, + errorPos: 8, + }, } checkConfig := func(config string) error { diff --git a/server/errors.json b/server/errors.json index 7f04b23469a..410544bdaa2 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1988,5 +1988,15 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSAtomicPublishContainsDuplicateMessageErr", + "code": 400, + "error_code": 10201, + "description": "atomic publish batch contains duplicate message id", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } ] diff --git a/server/filestore.go b/server/filestore.go index b8e948b065a..9f7f32f4a58 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1520,8 +1520,8 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { // For tombstones that we find and collect. var ( tombstones []uint64 - minTombstoneSeq uint64 - minTombstoneTs int64 + maxTombstoneSeq uint64 + maxTombstoneTs int64 ) // To detect gaps from compaction, and to ensure the sequence keeps moving up. @@ -1580,8 +1580,8 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { seq = seq &^ tbit // Need to process this here and make sure we have accounted for this properly. tombstones = append(tombstones, seq) - if minTombstoneSeq == 0 || seq < minTombstoneSeq { - minTombstoneSeq, minTombstoneTs = seq, ts + if maxTombstoneSeq == 0 || seq > maxTombstoneSeq { + maxTombstoneSeq, maxTombstoneTs = seq, ts } index += rl continue @@ -1664,12 +1664,12 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { fseq := atomic.LoadUint64(&mb.first.seq) if fseq > 0 { atomic.StoreUint64(&mb.last.seq, fseq-1) - } else if fseq == 0 && minTombstoneSeq > 0 { - atomic.StoreUint64(&mb.first.seq, minTombstoneSeq+1) + } else if fseq == 0 && maxTombstoneSeq > 0 { + atomic.StoreUint64(&mb.first.seq, maxTombstoneSeq+1) mb.first.ts = 0 if mb.last.seq == 0 { - atomic.StoreUint64(&mb.last.seq, minTombstoneSeq) - mb.last.ts = minTombstoneTs + atomic.StoreUint64(&mb.last.seq, maxTombstoneSeq) + mb.last.ts = maxTombstoneTs } } } @@ -2285,6 +2285,12 @@ func (fs *fileStore) recoverMsgs() error { fs.removeMsgBlockFromList(mb) continue } + // If the stream is empty, reset the first/last sequences so these can + // properly move up based purely on tombstones spread over multiple blocks. + if fs.state.Msgs == 0 { + fs.state.FirstSeq, fs.state.LastSeq = 0, 0 + fs.state.FirstTime, fs.state.LastTime = time.Time{}, time.Time{} + } fseq := atomic.LoadUint64(&mb.first.seq) if fs.state.FirstSeq == 0 || (fseq < fs.state.FirstSeq && mb.first.ts != 0) { fs.state.FirstSeq = fseq @@ -5064,9 +5070,9 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( // If erase but block is empty, we can simply remove the block later. if secure && !isEmpty { - // Grab record info. - ri, rl, _, _ := mb.slotInfo(int(seq - mb.cache.fseq)) - if err := mb.eraseMsg(seq, int(ri), int(rl), isLastBlock); err != nil { + // Grab record info, but use the pre-computed record length. + ri, _, _, _ := mb.slotInfo(int(seq - mb.cache.fseq)) + if err := mb.eraseMsg(seq, int(ri), int(msz), isLastBlock); err != nil { mb.finishedWithCache() return false, err } @@ -5665,9 +5671,21 @@ func (mb *msgBlock) selectNextFirst() { } // Select the next FirstSeq +// Also cleans up empty blocks at the start only containing tombstones. // Lock should be held. func (fs *fileStore) selectNextFirst() { if len(fs.blks) > 0 { + for len(fs.blks) > 1 { + mb := fs.blks[0] + mb.mu.Lock() + empty := mb.msgs == 0 + if !empty { + mb.mu.Unlock() + break + } + fs.forceRemoveMsgBlock(mb) + mb.mu.Unlock() + } mb := fs.blks[0] mb.mu.RLock() fs.state.FirstSeq = atomic.LoadUint64(&mb.first.seq) @@ -7537,7 +7555,14 @@ func (mb *msgBlock) cacheLookupNoCopy(seq uint64, sm *StoreMsg) (*StoreMsg, erro // Will do a lookup from cache. // Lock should be held. func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*StoreMsg, error) { - if seq < atomic.LoadUint64(&mb.first.seq) || seq > atomic.LoadUint64(&mb.last.seq) { + fseq, lseq := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq) + switch { + case lseq == fseq-1: + // The block is empty, no messages have been written yet. This works because + // newMsgBlockForWrite sets fseq=fs.State.LastSeq+1 and lseq=fs.State.LastSeq. + return nil, ErrStoreMsgNotFound + case seq < fseq || seq > lseq: + // Sequence is out of range for this block. return nil, ErrStoreMsgNotFound } @@ -9131,7 +9156,7 @@ func (fs *fileStore) Truncate(seq uint64) error { } mb.mu.Lock() } - fs.removeMsgBlock(mb) + fs.forceRemoveMsgBlock(mb) mb.mu.Unlock() } @@ -9153,7 +9178,7 @@ func (fs *fileStore) Truncate(seq uint64) error { } smb.mu.Lock() } - fs.removeMsgBlock(smb) + fs.forceRemoveMsgBlock(smb) smb.mu.Unlock() goto SKIP } @@ -9195,7 +9220,7 @@ SKIP: if !hasWrittenTombstones { fs.lmb = smb tmb.mu.Lock() - fs.removeMsgBlock(tmb) + fs.forceRemoveMsgBlock(tmb) tmb.mu.Unlock() } @@ -9275,8 +9300,8 @@ func (fs *fileStore) removeMsgBlockFromList(mb *msgBlock) { // Both locks should be held. func (fs *fileStore) removeMsgBlock(mb *msgBlock) { // Check for us being last message block + lseq, lts := atomic.LoadUint64(&mb.last.seq), mb.last.ts if mb == fs.lmb { - lseq, lts := atomic.LoadUint64(&mb.last.seq), mb.last.ts // Creating a new message write block requires that the lmb lock is not held. mb.mu.Unlock() // Write the tombstone to remember since this was last block. @@ -9284,8 +9309,17 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) { fs.writeTombstone(lseq, lts) } mb.mu.Lock() + } else if lseq == fs.state.LastSeq { + // Need to write a tombstone for the last sequence if we're removing the block containing it. + fs.writeTombstone(lseq, lts) } - // Only delete message block after (potentially) writing a new lmb. + // Only delete message block after (potentially) writing a tombstone. + fs.forceRemoveMsgBlock(mb) +} + +// Removes the msgBlock, without writing tombstones to ensure the last sequence is preserved. +// Both locks should be held. +func (fs *fileStore) forceRemoveMsgBlock(mb *msgBlock) { mb.dirtyCloseWithRemove(true) fs.removeMsgBlockFromList(mb) } @@ -10042,11 +10076,7 @@ func (fs *fileStore) writeTTLState() error { buf := fs.ttls.Encode(fs.state.LastSeq + 1) fs.mu.RUnlock() - <-dios - err := os.WriteFile(fn, buf, defaultFilePerms) - dios <- struct{}{} - - return err + return fs.writeFileWithOptionalSync(fn, buf, defaultFilePerms) } func (fs *fileStore) writeMsgSchedulingState() error { @@ -10060,11 +10090,7 @@ func (fs *fileStore) writeMsgSchedulingState() error { buf := fs.scheduling.encode(fs.state.LastSeq + 1) fs.mu.RUnlock() - <-dios - err := os.WriteFile(fn, buf, defaultFilePerms) - dios <- struct{}{} - - return err + return fs.writeFileWithOptionalSync(fn, buf, defaultFilePerms) } // Stop the current filestore. @@ -11738,29 +11764,47 @@ func (alg StoreCompression) Decompress(buf []byte) ([]byte, error) { // sets O_SYNC on the open file if SyncAlways is set. The dios semaphore is // handled automatically by this function, so don't wrap calls to it in dios. func (fs *fileStore) writeFileWithOptionalSync(name string, data []byte, perm fs.FileMode) error { - if fs.fcfg.SyncAlways { - return writeFileWithSync(name, data, perm) - } - <-dios - defer func() { - dios <- struct{}{} - }() - return os.WriteFile(name, data, perm) + return writeAtomically(name, data, perm, fs.fcfg.SyncAlways) } func writeFileWithSync(name string, data []byte, perm fs.FileMode) error { + return writeAtomically(name, data, perm, true) +} + +func writeAtomically(name string, data []byte, perm fs.FileMode, sync bool) error { + tmp := name + ".tmp" + flags := os.O_CREATE | os.O_WRONLY | os.O_TRUNC + if sync { + flags = flags | os.O_SYNC + } <-dios defer func() { dios <- struct{}{} }() - flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC | os.O_SYNC - f, err := os.OpenFile(name, flags, perm) + f, err := os.OpenFile(tmp, flags, perm) if err != nil { return err } - if _, err = f.Write(data); err != nil { + if _, err := f.Write(data); err != nil { _ = f.Close() + _ = os.Remove(tmp) + return err + } + if err := f.Close(); err != nil { + _ = os.Remove(tmp) return err } - return f.Close() + if err := os.Rename(tmp, name); err != nil { + _ = os.Remove(tmp) + return err + } + if sync { + // To ensure that the file rename was persisted on all filesystems, + // also try to flush the directory metadata. + if d, err := os.Open(filepath.Dir(name)); err == nil { + _ = d.Sync() + _ = d.Close() + } + } + return nil } diff --git a/server/filestore_test.go b/server/filestore_test.go index 80b4ce5a0fb..42b3f059ff9 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -4182,6 +4182,7 @@ func TestFileStoreEncrypted(t *testing.T) { err = o.Update(state) require_NoError(t, err) + o.Stop() fs.Stop() fs, err = newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, created, prf(&fcfg), nil) require_NoError(t, err) @@ -4982,36 +4983,41 @@ func TestFileStoreSubjectsTotals(t *testing.T) { func TestFileStoreConsumerStoreEncodeAfterRestart(t *testing.T) { testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { - fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil) - require_NoError(t, err) - defer fs.Stop() + state := &ConsumerState{} - o, err := fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit}) - require_NoError(t, err) + func() { // for defers + fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() - state := &ConsumerState{} - state.Delivered.Consumer = 22 - state.Delivered.Stream = 22 - state.AckFloor.Consumer = 11 - state.AckFloor.Stream = 11 - err = o.Update(state) - require_NoError(t, err) + o, err := fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit}) + require_NoError(t, err) + defer o.Stop() - fs.Stop() + state.Delivered.Consumer = 22 + state.Delivered.Stream = 22 + state.AckFloor.Consumer = 11 + state.AckFloor.Stream = 11 + err = o.Update(state) + require_NoError(t, err) + }() - fs, err = newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil) - require_NoError(t, err) - defer fs.Stop() + func() { // for defers + fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() - o, err = fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit}) - require_NoError(t, err) + o, err := fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit}) + require_NoError(t, err) + defer o.Stop() - if o.(*consumerFileStore).state.Delivered != state.Delivered { - t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state) - } - if o.(*consumerFileStore).state.AckFloor != state.AckFloor { - t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state) - } + if o.(*consumerFileStore).state.Delivered != state.Delivered { + t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state) + } + if o.(*consumerFileStore).state.AckFloor != state.AckFloor { + t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state) + } + }() }) } @@ -10586,3 +10592,263 @@ func BenchmarkFileStoreGetSeqFromTime(b *testing.B) { } }) } + +func TestFileStoreCacheLookupOnEmptyBlock(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + fs.mu.RLock() + lmb := fs.lmb + fs.mu.RUnlock() + + // First make sure that we haven't got a strong reference to the cache. + require_NotNil(t, lmb) + lmb.finishedWithCache() + require_True(t, lmb.cache == nil) + + // Specifically we want ErrStoreMsgNotFound, not errNoCache. + _, err = lmb.cacheLookup(atomic.LoadUint64(&lmb.first.seq), nil) + require_Error(t, err, ErrStoreMsgNotFound) + + // Now make sure that we didn't strengthen the reference. This proves + // that we short-circuited properly. + require_True(t, lmb.cache == nil) + }) +} + +func TestFileStoreEraseMsgDoesNotLoseTombstones(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + secret := []byte("secret!") + // The first message will remain throughout. + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + // The second message wil be removed, so a tombstone will be placed. + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + // The third message is secret and will be erased. + _, _, err = fs.StoreMsg("foo", nil, secret, 0) + require_NoError(t, err) + + // Removing the second message places a tombstone. + _, err = fs.RemoveMsg(2) + require_NoError(t, err) + + // A fourth message gets placed after the tombstone. + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + + // Now we erase the third message. + // This erases this message and should not lose the tombstone that comes after it. + _, err = fs.EraseMsg(3) + require_NoError(t, err) + + before := fs.State() + require_Equal(t, before.Msgs, 2) + require_Equal(t, before.FirstSeq, 1) + require_Equal(t, before.LastSeq, 4) + require_True(t, slices.Equal(before.Deleted, []uint64{2, 3})) + + _, err = fs.LoadMsg(2, nil) + require_Error(t, err, errDeletedMsg) + _, err = fs.LoadMsg(3, nil) + require_Error(t, err, errDeletedMsg) + + // Make sure we can recover properly with no index.db present. + fs.Stop() + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) + + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + if state := fs.State(); !reflect.DeepEqual(state, before) { + t.Fatalf("Expected state\n of %+v, \ngot %+v without index.db state", before, state) + } + + _, err = fs.LoadMsg(2, nil) + require_Error(t, err, errDeletedMsg) + _, err = fs.LoadMsg(3, nil) + require_Error(t, err, errDeletedMsg) + }) +} + +func TestFileStoreTombstonesNoFirstSeqRollback(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fcfg.BlockSize = 10 * 33 // 10 messages per block. + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + for i := 0; i < 20; i++ { + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + } + + before := fs.State() + require_Equal(t, before.Msgs, 20) + require_Equal(t, before.FirstSeq, 1) + require_Equal(t, before.LastSeq, 20) + + // Expect 2 blocks with messages. + fs.mu.RLock() + lblks := len(fs.blks) + fs.mu.RUnlock() + require_Equal(t, lblks, 2) + + // Write some tombstones for all messages, these will be in multiple blocks. + for seq := uint64(1); seq <= 20; seq++ { + _, err = fs.RemoveMsg(seq) + require_NoError(t, err) + } + + before = fs.State() + require_Equal(t, before.Msgs, 0) + require_Equal(t, before.FirstSeq, 21) + require_Equal(t, before.LastSeq, 20) + + // Expect 1 block purely with tombstones. + fs.mu.RLock() + lblks = len(fs.blks) + fs.mu.RUnlock() + require_Equal(t, lblks, 1) + + // Make sure we can recover properly with no index.db present. + fs.Stop() + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) + + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + if state := fs.State(); !reflect.DeepEqual(state, before) { + t.Fatalf("Expected state\n of %+v, \ngot %+v without index.db state", before, state) + } + }) +} + +func TestFileStoreTombstonesSelectNextFirstCleanup(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fcfg.BlockSize = 10 * 33 // 10 messages per block. + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + // Write a bunch of messages in multiple blocks. + for i := 0; i < 50; i++ { + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + } + + for seq := uint64(2); seq <= 49; seq++ { + _, err = fs.RemoveMsg(seq) + require_NoError(t, err) + } + + _, err = fs.newMsgBlockForWrite() + require_NoError(t, err) + for i := 0; i < 50; i++ { + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + } + + for seq := uint64(50); seq <= 100; seq++ { + _, err = fs.RemoveMsg(seq) + require_NoError(t, err) + } + + before := fs.State() + require_Equal(t, before.Msgs, 1) + require_Equal(t, before.FirstSeq, 1) + require_Equal(t, before.LastSeq, 100) + + _, err = fs.RemoveMsg(1) + require_NoError(t, err) + + before = fs.State() + require_Equal(t, before.Msgs, 0) + require_Equal(t, before.FirstSeq, 101) + require_Equal(t, before.LastSeq, 100) + + // Make sure we can recover properly with no index.db present. + fs.Stop() + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) + + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + if state := fs.State(); !reflect.DeepEqual(state, before) { + t.Fatalf("Expected state\n of %+v, \ngot %+v without index.db state", before, state) + } + }) +} + +func TestFileStoreTombstonesSelectNextFirstCleanupOnRecovery(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fcfg.BlockSize = 10 * 33 // 10 messages per block. + cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + // Write a bunch of messages in multiple blocks. + for i := 0; i < 50; i++ { + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + } + + for seq := uint64(2); seq <= 49; seq++ { + _, err = fs.RemoveMsg(seq) + require_NoError(t, err) + } + + _, err = fs.newMsgBlockForWrite() + require_NoError(t, err) + for i := 0; i < 50; i++ { + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + } + + for seq := uint64(50); seq <= 100; seq++ { + _, err = fs.RemoveMsg(seq) + require_NoError(t, err) + } + + before := fs.State() + require_Equal(t, before.Msgs, 1) + require_Equal(t, before.FirstSeq, 1) + require_Equal(t, before.LastSeq, 100) + + // Explicitly write tombstone instead of calling fs.RemoveMsg, + // so we need to recover from a hard kill. + require_NoError(t, fs.writeTombstone(1, 0)) + before = StreamState{FirstSeq: 101, FirstTime: time.Time{}, LastSeq: 100, LastTime: before.LastTime} + + // Make sure we can recover properly with no index.db present. + fs.Stop() + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) + + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + if state := fs.State(); !reflect.DeepEqual(state, before) { + t.Fatalf("Expected state\n of %+v, \ngot %+v without index.db state", before, state) + } + }) +} diff --git a/server/jetstream_api.go b/server/jetstream_api.go index b085080cf27..31fcf056762 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2655,15 +2655,16 @@ func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Ac } js, cc := s.getJetStreamCluster() - if js == nil || cc == nil || cc.meta == nil { + if js == nil || cc == nil { return } - // Extra checks here but only leader is listening. js.mu.RLock() isLeader := cc.isLeader() + meta := cc.meta js.mu.RUnlock() + // Extra checks here but only leader is listening. if !isLeader { return } @@ -2690,7 +2691,7 @@ func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Ac var found string js.mu.RLock() - for _, p := range cc.meta.Peers() { + for _, p := range meta.Peers() { // If Peer is specified, it takes precedence if req.Peer != _EMPTY_ { if p.ID == req.Peer { @@ -2715,7 +2716,7 @@ func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Ac // So we have a valid peer. js.mu.Lock() - cc.meta.ProposeRemovePeer(found) + meta.ProposeRemovePeer(found) js.mu.Unlock() resp.Success = true @@ -2766,7 +2767,7 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ } js, cc := s.getJetStreamCluster() - if js == nil || cc == nil || cc.meta == nil { + if js == nil || cc == nil { return } @@ -2930,7 +2931,7 @@ func (s *Server) jsLeaderServerStreamCancelMoveRequest(sub *subscription, c *cli } js, cc := s.getJetStreamCluster() - if js == nil || cc == nil || cc.meta == nil { + if js == nil || cc == nil { return } @@ -3090,7 +3091,13 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac } _, cc := s.getJetStreamCluster() - if cc == nil || cc.meta == nil || !cc.isLeader() { + + js.mu.RLock() + isLeader := cc.isLeader() + meta := cc.meta + js.mu.RUnlock() + + if !isLeader { return } @@ -3108,11 +3115,11 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac for _, oca := range osa.consumers { oca.deleted = true ca := &consumerAssignment{Group: oca.Group, Stream: oca.Stream, Name: oca.Name, Config: oca.Config, Subject: subject, Client: oca.Client} - cc.meta.Propose(encodeDeleteConsumerAssignment(ca)) + meta.Propose(encodeDeleteConsumerAssignment(ca)) nc++ } sa := &streamAssignment{Group: osa.Group, Config: osa.Config, Subject: subject, Client: osa.Client} - cc.meta.Propose(encodeDeleteStreamAssignment(sa)) + meta.Propose(encodeDeleteStreamAssignment(sa)) ns++ } js.mu.RUnlock() @@ -3143,13 +3150,14 @@ func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Accoun } js, cc := s.getJetStreamCluster() - if js == nil || cc == nil || cc.meta == nil { + if js == nil || cc == nil { return } // Extra checks here but only leader is listening. js.mu.RLock() isLeader := cc.isLeader() + meta := cc.meta js.mu.RUnlock() if !isLeader { @@ -3171,14 +3179,14 @@ func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Accoun s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } - if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(cc.meta, req.Placement); resp.Error != nil { + if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(meta, req.Placement); resp.Error != nil { s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } } // Call actual stepdown. - err = cc.meta.StepDown(preferredLeader) + err = meta.StepDown(preferredLeader) if err != nil { resp.Error = NewJSRaftGeneralError(err, Unless(err)) } else { @@ -4973,6 +4981,10 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, meta := cc.meta js.mu.RUnlock() + if meta == nil { + return + } + // Since these could wait on the Raft group lock, don't do so under the JS lock. ourID := meta.ID() groupLeaderless := meta.Leaderless() @@ -5299,6 +5311,7 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account nca := *ca ncfg := *ca.Config nca.Config = &ncfg + meta := cc.meta js.mu.RUnlock() pauseUTC := req.PauseUntil.UTC() if !pauseUTC.IsZero() { @@ -5312,7 +5325,7 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account setStaticConsumerMetadata(nca.Config) eca := encodeAddConsumerAssignment(&nca) - cc.meta.Propose(eca) + meta.Propose(eca) resp.PauseUntil = pauseUTC if resp.Paused = time.Now().Before(pauseUTC); resp.Paused { diff --git a/server/jetstream_batching.go b/server/jetstream_batching.go index 16daf30648a..fc97f1f5453 100644 --- a/server/jetstream_batching.go +++ b/server/jetstream_batching.go @@ -303,7 +303,7 @@ func checkMsgHeadersPreClusteredProposal( if msgId := getMsgId(hdr); msgId != _EMPTY_ { // Dedupe if staged. if _, ok = diff.msgIds[msgId]; ok { - return hdr, msg, 0, nil, errMsgIdDuplicate + return hdr, msg, 0, NewJSAtomicPublishContainsDuplicateMessageError(), errMsgIdDuplicate } mset.ddMu.Lock() if dde := mset.checkMsgId(msgId); dde != nil { @@ -311,7 +311,7 @@ func checkMsgHeadersPreClusteredProposal( mset.ddMu.Unlock() // Should not return an invalid sequence, in that case error. if seq > 0 { - return hdr, msg, seq, nil, errMsgIdDuplicate + return hdr, msg, seq, NewJSAtomicPublishContainsDuplicateMessageError(), errMsgIdDuplicate } else { return hdr, msg, 0, NewJSStreamDuplicateMessageConflictError(), errMsgIdDuplicate } diff --git a/server/jetstream_batching_test.go b/server/jetstream_batching_test.go index c00d3c5fdf6..e64e958fc68 100644 --- a/server/jetstream_batching_test.go +++ b/server/jetstream_batching_test.go @@ -397,24 +397,38 @@ func TestJetStreamAtomicBatchPublishDedupeNotAllowed(t *testing.T) { _, err := jsStreamCreate(t, nc, cfg) require_NoError(t, err) + _, err = js.Publish("foo", nil, nats.MsgId("pre-existing")) + require_NoError(t, err) + + var pubAck JSPubAckResponse m := nats.NewMsg("foo") + m.Header.Set("Nats-Msg-Id", "pre-existing") + m.Header.Set("Nats-Batch-Id", "uuid") + m.Header.Set("Nats-Batch-Sequence", "1") + m.Header.Set("Nats-Batch-Commit", "1") + rmsg, err := nc.RequestMsg(m, time.Second) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(rmsg.Data, &pubAck)) + require_NotNil(t, pubAck.Error) + require_Error(t, pubAck.Error, NewJSAtomicPublishContainsDuplicateMessageError()) + + m = nats.NewMsg("foo") m.Header.Set("Nats-Batch-Id", "uuid") m.Header.Set("Nats-Batch-Sequence", "1") m.Header.Set("Nats-Msg-Id", "msgId1") require_NoError(t, nc.PublishMsg(m)) - m.Header.Set("Nats-Batch-Sequence", "2") - m.Header.Set("Nats-Msg-Id", "pre-existing") - require_NoError(t, nc.PublishMsg(m)) - var pubAck JSPubAckResponse - m.Header.Set("Nats-Batch-Sequence", "3") + pubAck = JSPubAckResponse{} + m.Header.Set("Nats-Batch-Sequence", "2") m.Header.Set("Nats-Msg-Id", "msgId2") m.Header.Set("Nats-Batch-Commit", "1") - rmsg, err := nc.RequestMsg(m, time.Second) + rmsg, err = nc.RequestMsg(m, time.Second) require_NoError(t, err) require_NoError(t, json.Unmarshal(rmsg.Data, &pubAck)) - require_NotNil(t, pubAck.Error) - require_Error(t, pubAck.Error, NewJSAtomicPublishUnsupportedHeaderBatchError("Nats-Msg-Id")) + require_True(t, pubAck.Error == nil) + require_Equal(t, pubAck.BatchSize, 2) + require_Equal(t, pubAck.Sequence, 3) + require_Equal(t, pubAck.BatchId, "uuid") } for _, storage := range []StorageType{FileStorage, MemoryStorage} { @@ -727,7 +741,6 @@ func TestJetStreamAtomicBatchPublishDenyHeaders(t *testing.T) { // We might support these headers later on, but for now error. for key, value := range map[string]string{ - "Nats-Msg-Id": "msgId", "Nats-Expected-Last-Msg-Id": "msgId", } { t.Run(key, func(t *testing.T) { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 725b842ab8d..c0d0c99a988 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1413,13 +1413,13 @@ func (js *jetStream) monitorCluster() { go checkHealth() continue } - if didSnap, didStreamRemoval, _, err := js.applyMetaEntries(ce.Entries, ru); err == nil { + if didSnap, err := js.applyMetaEntries(ce.Entries, ru); err == nil { var nb uint64 // Some entries can fail without an error when shutting down, don't move applied forward. if !js.isShuttingDown() { _, nb = n.Applied(ce.Index) } - if js.hasPeerEntries(ce.Entries) || didStreamRemoval || (didSnap && !isLeader) { + if js.hasPeerEntries(ce.Entries) || (didSnap && !isLeader) { doSnapshot() } else if nb > compactSizeMin && time.Since(lastSnapTime) > minSnapDelta { doSnapshot() @@ -1998,8 +1998,8 @@ func (ca *consumerAssignment) recoveryKey() string { return ca.Client.serviceAccount() + ksep + ca.Stream + ksep + ca.Name } -func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bool, bool, bool, error) { - var didSnap, didRemoveStream, didRemoveConsumer bool +func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bool, error) { + var didSnap bool isRecovering := js.isMetaRecovering() for _, e := range entries { @@ -2021,7 +2021,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo sa, err := decodeStreamAssignment(js.srv, buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) - return didSnap, didRemoveStream, didRemoveConsumer, err + return didSnap, err } if isRecovering { js.setStreamAssignmentRecovering(sa) @@ -2035,7 +2035,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo sa, err := decodeStreamAssignment(js.srv, buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) - return didSnap, didRemoveStream, didRemoveConsumer, err + return didSnap, err } if isRecovering { js.setStreamAssignmentRecovering(sa) @@ -2047,13 +2047,12 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo delete(ru.removeConsumers, key) } else { js.processStreamRemoval(sa) - didRemoveStream = true } case assignConsumerOp: ca, err := decodeConsumerAssignment(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:]) - return didSnap, didRemoveStream, didRemoveConsumer, err + return didSnap, err } if isRecovering { js.setConsumerAssignmentRecovering(ca) @@ -2073,7 +2072,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo ca, err := decodeConsumerAssignmentCompressed(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode compressed consumer assignment: %q", buf[1:]) - return didSnap, didRemoveStream, didRemoveConsumer, err + return didSnap, err } if isRecovering { js.setConsumerAssignmentRecovering(ca) @@ -2093,7 +2092,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo ca, err := decodeConsumerAssignment(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:]) - return didSnap, didRemoveStream, didRemoveConsumer, err + return didSnap, err } if isRecovering { js.setConsumerAssignmentRecovering(ca) @@ -2108,13 +2107,12 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo } } else { js.processConsumerRemoval(ca) - didRemoveConsumer = true } case updateStreamOp: sa, err := decodeStreamAssignment(js.srv, buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) - return didSnap, didRemoveStream, didRemoveConsumer, err + return didSnap, err } if isRecovering { js.setStreamAssignmentRecovering(sa) @@ -2124,16 +2122,13 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo delete(ru.removeStreams, key) } else { js.processUpdateStreamAssignment(sa) - // Since an update can be lowering replica count, we want upper layer to treat - // similar to a removal and snapshot to collapse old entries. - didRemoveStream = true } default: panic(fmt.Sprintf("JetStream Cluster Unknown meta entry op type: %v", entryOp(buf[0]))) } } } - return didSnap, didRemoveStream, didRemoveConsumer, nil + return didSnap, nil } func (rg *raftGroup) isMember(id string) bool { @@ -8890,7 +8885,6 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ ) diff := &batchStagedDiff{} if hdr, msg, dseq, apiErr, err = checkMsgHeadersPreClusteredProposal(diff, mset, subject, hdr, msg, sourced, name, jsa, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, discard, discardNewPer, maxMsgSize, maxMsgs, maxMsgsPer, maxBytes); err != nil { - // TODO(mvv): reset in-memory expected header maps mset.clMu.Unlock() if err == errMsgIdDuplicate && dseq > 0 { var buf [256]byte @@ -8921,12 +8915,10 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ } // Do proposal. - err = node.Propose(esm) - // TODO(mvv): reset in-memory expected header maps, if err!=nil - if err == nil { - mset.clseq++ - mset.trackReplicationTraffic(node, len(esm), r) - } + _ = node.Propose(esm) + // The proposal can fail, but we always account for trying. + mset.clseq++ + mset.trackReplicationTraffic(node, len(esm), r) // Check to see if we are being overrun. // TODO(dlc) - Make this a limit where we drop messages to protect ourselves, but allow to be configured. diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 512296a9d6a..50b73756b11 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6641,13 +6641,13 @@ func TestJetStreamClusterMetaRecoveryUpdatesDeletesConsumers(t *testing.T) { } // Push recovery entries that create the stream & consumer. - _, _, _, err := js.applyMetaEntries(create, ru) + _, err := js.applyMetaEntries(create, ru) require_NoError(t, err) require_Len(t, len(ru.updateConsumers), 1) // Now push another recovery entry that deletes the stream. The // entry that creates the consumer should now be gone. - _, _, _, err = js.applyMetaEntries(delete, ru) + _, err = js.applyMetaEntries(delete, ru) require_NoError(t, err) require_Len(t, len(ru.removeStreams), 1) require_Len(t, len(ru.updateConsumers), 0) @@ -6695,27 +6695,27 @@ func TestJetStreamClusterMetaRecoveryRecreateFileStreamAsMemory(t *testing.T) { } // We created a file-based stream first, but deleted it shortly after. - _, _, _, err := js.applyMetaEntries(createFileStream, ru) + _, err := js.applyMetaEntries(createFileStream, ru) require_NoError(t, err) require_Len(t, len(ru.addStreams), 1) require_Len(t, len(ru.removeStreams), 0) // Now push another recovery entry that deletes the stream. // The file-based stream should not have been created. - _, _, _, err = js.applyMetaEntries(deleteFileStream, ru) + _, err = js.applyMetaEntries(deleteFileStream, ru) require_NoError(t, err) require_Len(t, len(ru.addStreams), 0) require_Len(t, len(ru.removeStreams), 1) // Now stage a memory-based stream to be created. - _, _, _, err = js.applyMetaEntries(createMemoryStream, ru) + _, err = js.applyMetaEntries(createMemoryStream, ru) require_NoError(t, err) require_Len(t, len(ru.addStreams), 1) require_Len(t, len(ru.removeStreams), 0) require_Len(t, len(ru.updateConsumers), 0) // Also create a consumer on that memory-based stream. - _, _, _, err = js.applyMetaEntries(createConsumer, ru) + _, err = js.applyMetaEntries(createConsumer, ru) require_NoError(t, err) require_Len(t, len(ru.addStreams), 1) require_Len(t, len(ru.removeStreams), 0) @@ -6752,19 +6752,19 @@ func TestJetStreamClusterMetaRecoveryConsumerCreateAndRemove(t *testing.T) { } // Creating the consumer should append to update consumers list. - _, _, _, err := js.applyMetaEntries(createConsumer, ru) + _, err := js.applyMetaEntries(createConsumer, ru) require_NoError(t, err) require_Len(t, len(ru.updateConsumers[":TEST"]), 1) require_Len(t, len(ru.removeConsumers), 0) // Deleting the consumer should append to remove consumers list and remove from update list. - _, _, _, err = js.applyMetaEntries(deleteConsumer, ru) + _, err = js.applyMetaEntries(deleteConsumer, ru) require_NoError(t, err) require_Len(t, len(ru.removeConsumers[":TEST"]), 1) require_Len(t, len(ru.updateConsumers[":TEST"]), 0) // When re-creating the consumer, add to update list and remove from remove list. - _, _, _, err = js.applyMetaEntries(createConsumer, ru) + _, err = js.applyMetaEntries(createConsumer, ru) require_NoError(t, err) require_Len(t, len(ru.updateConsumers[":TEST"]), 1) require_Len(t, len(ru.removeConsumers[":TEST"]), 0) diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index dbc7f0499d7..d244ebd7ac2 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -8,6 +8,9 @@ const ( // JSAccountResourcesExceededErr resource limits exceeded for account JSAccountResourcesExceededErr ErrorIdentifier = 10002 + // JSAtomicPublishContainsDuplicateMessageErr atomic publish batch contains duplicate message id + JSAtomicPublishContainsDuplicateMessageErr ErrorIdentifier = 10201 + // JSAtomicPublishDisabledErr atomic publish is disabled JSAtomicPublishDisabledErr ErrorIdentifier = 10174 @@ -606,6 +609,7 @@ const ( var ( ApiErrors = map[ErrorIdentifier]*ApiError{ JSAccountResourcesExceededErr: {Code: 400, ErrCode: 10002, Description: "resource limits exceeded for account"}, + JSAtomicPublishContainsDuplicateMessageErr: {Code: 400, ErrCode: 10201, Description: "atomic publish batch contains duplicate message id"}, JSAtomicPublishDisabledErr: {Code: 400, ErrCode: 10174, Description: "atomic publish is disabled"}, JSAtomicPublishIncompleteBatchErr: {Code: 400, ErrCode: 10176, Description: "atomic publish batch is incomplete"}, JSAtomicPublishInvalidBatchCommitErr: {Code: 400, ErrCode: 10200, Description: "atomic publish batch commit is invalid"}, @@ -839,6 +843,16 @@ func NewJSAccountResourcesExceededError(opts ...ErrorOption) *ApiError { return ApiErrors[JSAccountResourcesExceededErr] } +// NewJSAtomicPublishContainsDuplicateMessageError creates a new JSAtomicPublishContainsDuplicateMessageErr error: "atomic publish batch contains duplicate message id" +func NewJSAtomicPublishContainsDuplicateMessageError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSAtomicPublishContainsDuplicateMessageErr] +} + // NewJSAtomicPublishDisabledError creates a new JSAtomicPublishDisabledErr error: "atomic publish is disabled" func NewJSAtomicPublishDisabledError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/server/leafnode.go b/server/leafnode.go index 71491f77a55..0049823d5cb 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -285,6 +285,11 @@ func validateLeafNode(o *Options) error { // If a remote has a websocket scheme, all need to have it. for _, rcfg := range o.LeafNode.Remotes { + // Validate proxy configuration + if _, err := validateLeafNodeProxyOptions(rcfg); err != nil { + return err + } + if len(rcfg.URLs) >= 2 { firstIsWS, ok := isWSURL(rcfg.URLs[0]), true for i := 1; i < len(rcfg.URLs); i++ { @@ -369,6 +374,60 @@ func validateLeafNodeAuthOptions(o *Options) error { return nil } +func validateLeafNodeProxyOptions(remote *RemoteLeafOpts) ([]string, error) { + var warnings []string + + if remote.Proxy.URL == _EMPTY_ { + return warnings, nil + } + + proxyURL, err := url.Parse(remote.Proxy.URL) + if err != nil { + return warnings, fmt.Errorf("invalid proxy URL: %v", err) + } + + if proxyURL.Scheme != "http" && proxyURL.Scheme != "https" { + return warnings, fmt.Errorf("proxy URL scheme must be http or https, got: %s", proxyURL.Scheme) + } + + if proxyURL.Host == _EMPTY_ { + return warnings, fmt.Errorf("proxy URL must specify a host") + } + + if remote.Proxy.Timeout < 0 { + return warnings, fmt.Errorf("proxy timeout must be >= 0") + } + + if (remote.Proxy.Username == _EMPTY_) != (remote.Proxy.Password == _EMPTY_) { + return warnings, fmt.Errorf("proxy username and password must both be specified or both be empty") + } + + if len(remote.URLs) > 0 { + hasWebSocketURL := false + hasNonWebSocketURL := false + + for _, remoteURL := range remote.URLs { + if remoteURL.Scheme == wsSchemePrefix || remoteURL.Scheme == wsSchemePrefixTLS { + hasWebSocketURL = true + if (remoteURL.Scheme == wsSchemePrefixTLS) && + remote.TLSConfig == nil && !remote.TLS { + return warnings, fmt.Errorf("proxy is configured but remote URL %s requires TLS and no TLS configuration is provided. When using proxy with TLS endpoints, ensure TLS is properly configured for the leafnode remote", remoteURL.String()) + } + } else { + hasNonWebSocketURL = true + } + } + + if !hasWebSocketURL { + warnings = append(warnings, "proxy configuration will be ignored: proxy settings only apply to WebSocket connections (ws:// or wss://), but all configured URLs use TCP connections (nats://)") + } else if hasNonWebSocketURL { + warnings = append(warnings, "proxy configuration will only be used for WebSocket URLs: proxy settings do not apply to TCP connections (nats://)") + } + } + + return warnings, nil +} + // Update remote LeafNode TLS configurations after a config reload. func (s *Server) updateRemoteLeafNodesTLSConfig(opts *Options) { max := len(opts.LeafNode.Remotes) @@ -502,6 +561,67 @@ func (s *Server) setLeafNodeNonExportedOptions() { const sharedSysAccDelay = 250 * time.Millisecond +// establishHTTPProxyTunnel establishes an HTTP CONNECT tunnel through a proxy server +func establishHTTPProxyTunnel(proxyURL, targetHost string, timeout time.Duration, username, password string) (net.Conn, error) { + proxyAddr, err := url.Parse(proxyURL) + if err != nil { + // This should not happen since proxy URL is validated during configuration parsing + return nil, fmt.Errorf("unexpected proxy URL parse error (URL was pre-validated): %v", err) + } + + // Connect to the proxy server + conn, err := natsDialTimeout("tcp", proxyAddr.Host, timeout) + if err != nil { + return nil, fmt.Errorf("failed to connect to proxy: %v", err) + } + + // Set deadline for the entire proxy handshake + if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil { + conn.Close() + return nil, fmt.Errorf("failed to set deadline: %v", err) + } + + req := &http.Request{ + Method: http.MethodConnect, + URL: &url.URL{Opaque: targetHost}, // Opaque is required for CONNECT + Host: targetHost, + Header: make(http.Header), + } + + // Add proxy authentication if provided + if username != "" && password != "" { + req.Header.Set("Proxy-Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(username+":"+password))) + } + + if err := req.Write(conn); err != nil { + conn.Close() + return nil, fmt.Errorf("failed to write CONNECT request: %v", err) + } + + resp, err := http.ReadResponse(bufio.NewReader(conn), req) + if err != nil { + conn.Close() + return nil, fmt.Errorf("failed to read proxy response: %v", err) + } + + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + conn.Close() + return nil, fmt.Errorf("proxy CONNECT failed: %s", resp.Status) + } + + // Close the response body + resp.Body.Close() + + // Clear the deadline now that we've finished the proxy handshake + if err := conn.SetDeadline(time.Time{}); err != nil { + conn.Close() + return nil, fmt.Errorf("failed to clear deadline: %v", err) + } + + return conn, nil +} + func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) { defer s.grWG.Done() @@ -541,6 +661,19 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v" + // Capture proxy configuration once before the loop with proper locking + remote.RLock() + proxyURL := remote.Proxy.URL + proxyUsername := remote.Proxy.Username + proxyPassword := remote.Proxy.Password + proxyTimeout := remote.Proxy.Timeout + remote.RUnlock() + + // Set default proxy timeout if not specified + if proxyTimeout == 0 { + proxyTimeout = dialTimeout + } + attempts := 0 for s.isRunning() && s.remoteLeafNodeStillValid(remote) { @@ -557,7 +690,25 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) err = ErrLeafNodeDisabled } else { s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr) - conn, err = natsDialTimeout("tcp", url, dialTimeout) + + // Check if proxy is configured first, then check if URL supports it + if proxyURL != _EMPTY_ && isWSURL(rURL) { + // Use proxy for WebSocket connections - use original hostname, resolved IP for connection + targetHost := rURL.Host + // If URL doesn't include port, add the default port for the scheme + if rURL.Port() == _EMPTY_ { + defaultPort := "80" + if rURL.Scheme == wsSchemePrefixTLS { + defaultPort = "443" + } + targetHost = net.JoinHostPort(rURL.Hostname(), defaultPort) + } + + conn, err = establishHTTPProxyTunnel(proxyURL, targetHost, proxyTimeout, proxyUsername, proxyPassword) + } else { + // Direct connection + conn, err = natsDialTimeout("tcp", url, dialTimeout) + } } } if err != nil { @@ -1287,6 +1438,13 @@ func (c *client) processLeafnodeInfo(info *Info) { // otherwise if there is no TLS configuration block for the remote, // the solicit side will not attempt to perform the TLS handshake. if firstINFO && info.TLSRequired { + // Check for TLS/proxy configuration mismatch + if remote.Proxy.URL != _EMPTY_ && !remote.TLS && remote.TLSConfig == nil { + c.mu.Unlock() + c.Errorf("TLS configuration mismatch: Hub requires TLS but leafnode remote is not configured for TLS. When using a proxy, ensure TLS leafnode configuration matches the Hub requirements.") + c.closeConnection(TLSHandshakeError) + return + } remote.TLS = true } if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil { diff --git a/server/leafnode_proxy_test.go b/server/leafnode_proxy_test.go new file mode 100644 index 00000000000..aab6e81211c --- /dev/null +++ b/server/leafnode_proxy_test.go @@ -0,0 +1,885 @@ +// Copyright 2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "errors" + "fmt" + "net" + "net/url" + "strings" + "testing" + "time" +) + +// Basic HTTP proxy for testing +type testHTTPProxy struct { + listener net.Listener + port int + username string + password string + started bool + closeDelay time.Duration // Delay before closing connections for robustness + connections []net.Conn // Track connections for cleanup +} + +func createTestHTTPProxy(username, password string) *testHTTPProxy { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + panic(err) + } + port := l.Addr().(*net.TCPAddr).Port + + proxy := &testHTTPProxy{ + listener: l, + port: port, + username: username, + password: password, + closeDelay: 100 * time.Millisecond, // Default delay for test robustness + connections: make([]net.Conn, 0), + } + + return proxy +} + +func (p *testHTTPProxy) setCloseDelay(delay time.Duration) { + p.closeDelay = delay +} + +func (p *testHTTPProxy) start() { + if p.started { + return + } + p.started = true + + go func() { + for { + conn, err := p.listener.Accept() + if err != nil { + return + } + p.connections = append(p.connections, conn) + go p.handleConnection(conn) + } + }() +} + +func (p *testHTTPProxy) handleConnection(conn net.Conn) { + defer func() { + if p.closeDelay > 0 { + time.Sleep(p.closeDelay) + } + conn.Close() + }() + + // Set read timeout to prevent hanging on malformed requests + conn.SetReadDeadline(time.Now().Add(10 * time.Second)) + + // Read the CONNECT request + buffer := make([]byte, 4096) + n, err := conn.Read(buffer) + if err != nil { + return + } + + request := string(buffer[:n]) + lines := strings.Split(request, "\r\n") + + if len(lines) == 0 || !strings.HasPrefix(lines[0], "CONNECT ") { + conn.Write([]byte("HTTP/1.1 400 Bad Request\r\n\r\n")) + return + } + + // Check authentication if required + if p.username != _EMPTY_ || p.password != _EMPTY_ { + authFound := false + for _, line := range lines { + if strings.HasPrefix(line, "Proxy-Authorization: Basic ") { + authFound = true + break + } + } + if !authFound { + conn.Write([]byte("HTTP/1.1 407 Proxy Authentication Required\r\n\r\n")) + return + } + } + + // Extract target host from CONNECT line + parts := strings.Fields(lines[0]) + if len(parts) < 3 { + conn.Write([]byte("HTTP/1.1 400 Bad Request\r\n\r\n")) + return + } + + targetHost := parts[1] + + // Connect to target with timeout + target, err := net.DialTimeout("tcp", targetHost, 5*time.Second) + if err != nil { + conn.Write([]byte("HTTP/1.1 502 Bad Gateway\r\n\r\n")) + return + } + defer target.Close() + + // Send success response + conn.Write([]byte("HTTP/1.1 200 Connection established\r\n\r\n")) + + // Clear read deadline for ongoing connection + conn.SetReadDeadline(time.Time{}) + + // Relay data between client and target with proper error handling + done := make(chan bool, 2) + + // Client to target + go func() { + defer func() { + done <- true + target.Close() + }() + buffer := make([]byte, 32*1024) + for { + conn.SetReadDeadline(time.Now().Add(30 * time.Second)) + n, err := conn.Read(buffer) + if err != nil { + return + } + target.SetWriteDeadline(time.Now().Add(10 * time.Second)) + _, err = target.Write(buffer[:n]) + if err != nil { + return + } + } + }() + + // Target to client + go func() { + defer func() { + done <- true + conn.Close() + }() + buffer := make([]byte, 32*1024) + for { + target.SetReadDeadline(time.Now().Add(30 * time.Second)) + n, err := target.Read(buffer) + if err != nil { + return + } + conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + _, err = conn.Write(buffer[:n]) + if err != nil { + return + } + } + }() + + // Wait for either direction to finish + <-done +} + +func (p *testHTTPProxy) stop() { + if p.listener != nil { + p.listener.Close() + } + // Close all tracked connections with delay for robustness + for _, conn := range p.connections { + go func(c net.Conn) { + if p.closeDelay > 0 { + time.Sleep(p.closeDelay) + } + c.Close() + }(conn) + } +} + +func (p *testHTTPProxy) url() string { + return fmt.Sprintf("http://127.0.0.1:%d", p.port) +} + +func TestLeafNodeHttpProxyConfigParsing(t *testing.T) { + // Test valid proxy configuration + conf := ` + leafnodes { + remotes = [ + { + url: "ws://127.0.0.1:7422" + proxy { + url: "http://proxy.example.com:8080" + username: "user" + password: "pass" + timeout: "10s" + } + } + ] + } + ` + + configFile := createConfFile(t, []byte(conf)) + + opts, err := ProcessConfigFile(configFile) + if err != nil { + t.Fatalf("Error parsing config: %v", err) + } + + if len(opts.LeafNode.Remotes) != 1 { + t.Fatalf("Expected 1 remote, got %d", len(opts.LeafNode.Remotes)) + } + + remote := opts.LeafNode.Remotes[0] + if remote.Proxy.URL != "http://proxy.example.com:8080" { + t.Errorf("Expected proxy URL 'http://proxy.example.com:8080', got '%s'", remote.Proxy.URL) + } + if remote.Proxy.Username != "user" { + t.Errorf("Expected proxy username 'user', got '%s'", remote.Proxy.Username) + } + if remote.Proxy.Password != "pass" { + t.Errorf("Expected proxy password 'pass', got '%s'", remote.Proxy.Password) + } + if remote.Proxy.Timeout != 10*time.Second { + t.Errorf("Expected proxy timeout 10s, got %v", remote.Proxy.Timeout) + } +} + +func TestLeafNodeHttpProxyConfigWarnings(t *testing.T) { + testCases := []struct { + name string + config string + expectWarning bool + warningMatch string + }{ + { + name: "proxy with only TCP URLs", + config: ` + leafnodes { + remotes = [ + { + url: "nats://127.0.0.1:7422" + proxy { + url: "http://proxy.example.com:8080" + } + } + ] + } + `, + expectWarning: true, + warningMatch: "proxy configuration will be ignored", + }, + { + name: "proxy with mixed TCP and WebSocket URLs", + config: ` + leafnodes { + remotes = [ + { + urls: ["nats://127.0.0.1:7422", "ws://127.0.0.1:8080"] + proxy { + url: "http://proxy.example.com:8080" + } + } + ] + } + `, + expectWarning: true, + warningMatch: "proxy configuration will only be used for WebSocket URLs", + }, + { + name: "proxy with only WebSocket URLs", + config: ` + leafnodes { + remotes = [ + { + url: "ws://127.0.0.1:7422" + proxy { + url: "http://proxy.example.com:8080" + } + } + ] + } + `, + expectWarning: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + configFile := createConfFile(t, []byte(tc.config)) + + opts, err := ProcessConfigFile(configFile) + + if tc.expectWarning { + // With ProcessConfigFile, warnings don't cause errors + // The configuration should be valid but might log warnings + if err != nil { + t.Fatalf("Expected valid configuration with warnings, but got error: %v", err) + } + if opts == nil { + t.Fatal("Expected valid options but got nil") + } + // Note: With ProcessConfigFile, warnings are filtered out and not returned as errors + // The test verifies that the configuration is valid despite having warning conditions + } else { + // No warnings expected - should parse successfully + if err != nil { + t.Fatalf("Expected no error but got: %v", err) + } + if opts == nil { + t.Fatal("Expected valid options but got nil") + } + } + }) + } +} + +func TestLeafNodeHttpProxyConnection(t *testing.T) { + // Create a hub server with WebSocket support using config file + hubConfig := createConfFile(t, []byte(` + listen: "127.0.0.1:-1" + websocket { + listen: "127.0.0.1:-1" + no_tls: true + } + leafnodes { + listen: "127.0.0.1:-1" + } + `)) + + hub, hubOpts := RunServerWithConfig(hubConfig) + defer hub.Shutdown() + + // Create HTTP proxy + proxy := createTestHTTPProxy(_EMPTY_, _EMPTY_) + proxy.start() + defer proxy.stop() + + // Create spoke server with proxy configuration via config file + configContent := fmt.Sprintf(` + listen: "127.0.0.1:-1" + leafnodes { + reconnect_interval: "50ms" + remotes = [ + { + url: "ws://127.0.0.1:%d" + proxy { + url: "%s" + timeout: 5s + } + } + ] + } + `, hubOpts.Websocket.Port, proxy.url()) + + configFile := createConfFile(t, []byte(configContent)) + + spoke, _ := RunServerWithConfig(configFile) + defer spoke.Shutdown() + + // Verify leafnode connections are established + checkLeafNodeConnected(t, spoke) + checkLeafNodeConnected(t, hub) +} + +func TestLeafNodeHttpProxyWithAuthentication(t *testing.T) { + // Create a hub server with WebSocket support using config file + hubConfig := createConfFile(t, []byte(` + listen: "127.0.0.1:-1" + websocket { + listen: "127.0.0.1:-1" + no_tls: true + } + leafnodes { + listen: "127.0.0.1:-1" + } + `)) + + hub, hubOpts := RunServerWithConfig(hubConfig) + defer hub.Shutdown() + + // Create HTTP proxy with authentication + proxy := createTestHTTPProxy("testuser", "testpass") + proxy.start() + defer proxy.stop() + + // Create spoke server with proxy configuration including auth via config file + configContent := fmt.Sprintf(` + listen: "127.0.0.1:-1" + leafnodes { + reconnect_interval: "50ms" + remotes = [ + { + url: "ws://127.0.0.1:%d" + proxy { + url: "%s" + username: "testuser" + password: "testpass" + timeout: 5s + } + } + ] + } + `, hubOpts.Websocket.Port, proxy.url()) + + configFile := createConfFile(t, []byte(configContent)) + + spoke, _ := RunServerWithConfig(configFile) + defer spoke.Shutdown() + + // Verify leafnode connections are established + checkLeafNodeConnected(t, spoke) + checkLeafNodeConnected(t, hub) +} + +func TestLeafNodeHttpProxyTLSMismatchDetection(t *testing.T) { + // This test simulates the TLS mismatch scenario described in the feedback: + // - Leafnode configured with proxy but no TLS + // - Hub requires TLS + // - Connection should fail with appropriate error message + + // Create hub server with TLS required using config file + hubConfig := createConfFile(t, []byte(` + listen: "127.0.0.1:-1" + leafnodes { + listen: "127.0.0.1:-1" + tls: { + cert_file: "../test/configs/certs/server-cert.pem" + key_file: "../test/configs/certs/server-key.pem" + } + } + `)) + + hub, hubOpts := RunServerWithConfig(hubConfig) + defer hub.Shutdown() + + // Create HTTP proxy + proxy := createTestHTTPProxy(_EMPTY_, _EMPTY_) + proxy.start() + defer proxy.stop() + + // Create spoke server with proxy but no TLS configuration (intentional mismatch) + spokeConfigContent := fmt.Sprintf(` + listen: "127.0.0.1:-1" + leafnodes { + reconnect_interval: "50ms" + remotes = [ + { + url: "ws://127.0.0.1:%d" + proxy { + url: "%s" + timeout: 5s + } + # Intentionally no TLS configuration to create mismatch + } + ] + } + `, hubOpts.LeafNode.Port, proxy.url()) + + spokeConfig := createConfFile(t, []byte(spokeConfigContent)) + spoke, _ := RunServerWithConfig(spokeConfig) + defer spoke.Shutdown() + + // Wait and verify that connection was NOT established due to TLS mismatch + // First attempt happens during RunServerWithConfig(), then retries every 50ms + time.Sleep(250 * time.Millisecond) + + if spoke.NumLeafNodes() != 0 { + t.Errorf("Expected 0 leafnode connections due to TLS mismatch, got %d", spoke.NumLeafNodes()) + } +} + +func TestLeafNodeHttpProxyTunnelBasic(t *testing.T) { + // Create HTTP proxy with longer delay for robustness + proxy := createTestHTTPProxy(_EMPTY_, _EMPTY_) + proxy.setCloseDelay(200 * time.Millisecond) + proxy.start() + defer proxy.stop() + + // Create a simple TCP server to connect to through proxy + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("Failed to create test server: %v", err) + } + defer listener.Close() + + targetPort := listener.Addr().(*net.TCPAddr).Port + targetHost := fmt.Sprintf("127.0.0.1:%d", targetPort) + + errCh := make(chan error, 1) + + // Accept one connection with proper timeout handling + go func() { + conn, err := listener.Accept() + if err != nil { + errCh <- fmt.Errorf("unable to accept: %v", err) + return + } + defer conn.Close() + + // Set read deadline to prevent hanging forever + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + + // Read the incoming data first + buffer := make([]byte, 1024) + n, err := conn.Read(buffer) + if err != nil { + errCh <- fmt.Errorf("server failed to read: %v", err) + return + } + + receivedMsg := string(buffer[:n]) + if receivedMsg != "Hello" { + errCh <- fmt.Errorf("server expected 'Hello', got '%s'", receivedMsg) + return + } + + // Send response + conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + _, err = conn.Write([]byte("Hello from target server")) + if err != nil { + errCh <- fmt.Errorf("server failed to write: %v", err) + return + } + + // Wait a bit to ensure the client has time to read before closing + time.Sleep(50 * time.Millisecond) + errCh <- nil + }() + + // Test establishing proxy tunnel with timeout + conn, err := establishHTTPProxyTunnel(proxy.url(), targetHost, 10*time.Second, _EMPTY_, _EMPTY_) + if err != nil { + t.Fatalf("Failed to establish proxy tunnel: %v", err) + } + defer conn.Close() + + // Test that we can communicate through the tunnel with deadlines + conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + _, err = conn.Write([]byte("Hello")) + if err != nil { + t.Fatalf("Failed to write to proxy tunnel: %v", err) + } + + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + buffer := make([]byte, 1024) + n, err := conn.Read(buffer) + if err != nil { + t.Fatalf("Failed to read from proxy tunnel: %v", err) + } + + response := string(buffer[:n]) + if response != "Hello from target server" { + t.Errorf("Unexpected response: '%s', expected 'Hello from target server'", response) + } + + select { + case err := <-errCh: + if err != nil { + t.Fatalf("%v", err) + } + case <-time.After(3 * time.Second): + t.Fatal("Server goroutine didn't complete in time, but test data was exchanged successfully") + } +} + +func TestLeafNodeHttpProxyTunnelWithAuth(t *testing.T) { + // Create HTTP proxy with authentication and delay for robustness + proxy := createTestHTTPProxy("testuser", "testpass") + proxy.setCloseDelay(200 * time.Millisecond) + proxy.start() + defer proxy.stop() + + // Create a simple TCP server to connect to through proxy + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("Failed to create test server: %v", err) + } + defer listener.Close() + + targetPort := listener.Addr().(*net.TCPAddr).Port + targetHost := fmt.Sprintf("127.0.0.1:%d", targetPort) + + errCh := make(chan error, 1) + + // Accept one connection with proper timeout handling + go func() { + conn, err := listener.Accept() + if err != nil { + errCh <- fmt.Errorf("unable to accept: %v", err) + return + } + defer conn.Close() + + // Set read deadline to prevent hanging + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + + // Read the incoming data first + buffer := make([]byte, 1024) + _, err = conn.Read(buffer) + if err != nil { + errCh <- fmt.Errorf("unable to read: %v", err) + return + } + + // Send response + conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + if _, err := conn.Write([]byte("Hello from authenticated server")); err != nil { + errCh <- fmt.Errorf("unable to write: %v", err) + return + } + + errCh <- nil + }() + + // Test establishing proxy tunnel with authentication and timeout + conn, err := establishHTTPProxyTunnel(proxy.url(), targetHost, 10*time.Second, "testuser", "testpass") + if err != nil { + t.Fatalf("Failed to establish proxy tunnel with auth: %v", err) + } + defer conn.Close() + + // Test that we can communicate through the tunnel with deadlines + conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + _, err = conn.Write([]byte("Hello")) + if err != nil { + t.Fatalf("Failed to write to proxy tunnel: %v", err) + } + + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + buffer := make([]byte, 1024) + n, err := conn.Read(buffer) + if err != nil { + t.Fatalf("Failed to read from proxy tunnel: %v", err) + } + + response := string(buffer[:n]) + if response != "Hello from authenticated server" { + t.Errorf("Unexpected response: %s", response) + } + + select { + case err := <-errCh: + if err != nil { + t.Fatalf("%v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Server goroutine didn't complete in time, but test data was exchanged successfully") + } +} + +func TestLeafNodeHttpProxyTunnelFailsWithoutAuth(t *testing.T) { + // Create HTTP proxy with authentication required and delay for robustness + proxy := createTestHTTPProxy("testuser", "testpass") + proxy.setCloseDelay(200 * time.Millisecond) + proxy.start() + defer proxy.stop() + + // Try to establish tunnel without providing credentials (should fail quickly) + _, err := establishHTTPProxyTunnel(proxy.url(), "127.0.0.1:80", 10*time.Second, _EMPTY_, _EMPTY_) + if err == nil { + t.Fatal("Expected error when connecting without authentication") + } + + if !strings.Contains(err.Error(), "proxy CONNECT failed") { + t.Errorf("Expected proxy authentication error, got: %v", err) + } + + // Verify the error contains the expected HTTP response code + if !strings.Contains(err.Error(), "407") { + t.Errorf("Expected HTTP 407 error in response, got: %v", err) + } +} + +// TestLeafNodeProxyValidationProgrammatic tests proxy validation when configuring server programmatically +func TestLeafNodeHttpProxyValidationProgrammatic(t *testing.T) { + tests := []struct { + // name is the name of the test. + name string + + // setupOptions creates the Options configuration for the test. + setupOptions func() *Options + + // err is the expected error. nil means no error expected. + err error + }{ + { + name: "invalid proxy scheme", + setupOptions: func() *Options { + opts := &Options{} + opts.LeafNode.Remotes = []*RemoteLeafOpts{ + { + URLs: []*url.URL{{Scheme: wsSchemePrefix, Host: "127.0.0.1:7422"}}, + }, + } + opts.LeafNode.Remotes[0].Proxy.URL = "ftp://proxy.example.com:21" + return opts + }, + err: errors.New("proxy URL scheme must be http or https"), + }, + { + name: "empty proxy URL - no validation performed", + setupOptions: func() *Options { + opts := &Options{} + opts.LeafNode.Remotes = []*RemoteLeafOpts{ + { + URLs: []*url.URL{{Scheme: wsSchemePrefix, Host: "127.0.0.1:7422"}}, + }, + } + opts.LeafNode.Remotes[0].Proxy.URL = _EMPTY_ + return opts + }, + err: nil, // No error expected for empty URL + }, + { + name: "invalid proxy URL parse failure", + setupOptions: func() *Options { + opts := &Options{} + opts.LeafNode.Remotes = []*RemoteLeafOpts{ + { + URLs: []*url.URL{{Scheme: wsSchemePrefix, Host: "127.0.0.1:7422"}}, + }, + } + opts.LeafNode.Remotes[0].Proxy.URL = "ht!tp://invalid-url-with-bad-characters" + return opts + }, + err: errors.New("invalid proxy URL"), + }, + { + name: "missing proxy host", + setupOptions: func() *Options { + opts := &Options{} + opts.LeafNode.Remotes = []*RemoteLeafOpts{ + { + URLs: []*url.URL{{Scheme: wsSchemePrefix, Host: "127.0.0.1:7422"}}, + }, + } + opts.LeafNode.Remotes[0].Proxy.URL = "http://" + return opts + }, + err: errors.New("proxy URL must specify a host"), + }, + { + name: "username without password", + setupOptions: func() *Options { + opts := &Options{} + opts.LeafNode.Remotes = []*RemoteLeafOpts{ + { + URLs: []*url.URL{{Scheme: wsSchemePrefix, Host: "127.0.0.1:7422"}}, + }, + } + opts.LeafNode.Remotes[0].Proxy.URL = "http://proxy.example.com:8080" + opts.LeafNode.Remotes[0].Proxy.Username = "user" + return opts + }, + err: errors.New("proxy username and password must both be specified"), + }, + { + name: "password without username", + setupOptions: func() *Options { + opts := &Options{} + opts.LeafNode.Remotes = []*RemoteLeafOpts{ + { + URLs: []*url.URL{{Scheme: wsSchemePrefix, Host: "127.0.0.1:7422"}}, + }, + } + opts.LeafNode.Remotes[0].Proxy.URL = "http://proxy.example.com:8080" + opts.LeafNode.Remotes[0].Proxy.Password = "pass" + return opts + }, + err: errors.New("proxy username and password must both be specified"), + }, + { + name: "negative timeout value", + setupOptions: func() *Options { + opts := &Options{} + opts.LeafNode.Remotes = []*RemoteLeafOpts{ + { + URLs: []*url.URL{{Scheme: wsSchemePrefix, Host: "127.0.0.1:7422"}}, + }, + } + opts.LeafNode.Remotes[0].Proxy.URL = "http://proxy.example.com:8080" + opts.LeafNode.Remotes[0].Proxy.Timeout = -5 * time.Second + return opts + }, + err: errors.New("proxy timeout must be >= 0"), + }, + { + name: "zero timeout value - valid", + setupOptions: func() *Options { + opts := &Options{} + opts.LeafNode.Remotes = []*RemoteLeafOpts{ + { + URLs: []*url.URL{{Scheme: wsSchemePrefix, Host: "127.0.0.1:7422"}}, + }, + } + opts.LeafNode.Remotes[0].Proxy.URL = "http://proxy.example.com:8080" + opts.LeafNode.Remotes[0].Proxy.Timeout = 0 + return opts + }, + err: nil, // No error expected for zero timeout + }, + { + name: "valid proxy configuration with authentication", + setupOptions: func() *Options { + opts := &Options{} + opts.LeafNode.Remotes = []*RemoteLeafOpts{ + { + URLs: []*url.URL{{Scheme: wsSchemePrefix, Host: "127.0.0.1:7422"}}, + }, + } + opts.LeafNode.Remotes[0].Proxy.URL = "http://proxy.example.com:8080" + opts.LeafNode.Remotes[0].Proxy.Username = "user" + opts.LeafNode.Remotes[0].Proxy.Password = "pass" + opts.LeafNode.Remotes[0].Proxy.Timeout = 10 * time.Second + return opts + }, + err: nil, // No error expected + }, + { + name: "valid proxy configuration without authentication", + setupOptions: func() *Options { + opts := &Options{} + opts.LeafNode.Remotes = []*RemoteLeafOpts{ + { + URLs: []*url.URL{{Scheme: wsSchemePrefix, Host: "127.0.0.1:7422"}}, + }, + } + opts.LeafNode.Remotes[0].Proxy.URL = "https://proxy.example.com:3128" + opts.LeafNode.Remotes[0].Proxy.Timeout = 30 * time.Second + return opts + }, + err: nil, // No error expected + }, + } + + checkErr := func(t *testing.T, err, expectedErr error) { + t.Helper() + switch { + case err == nil && expectedErr == nil: + // OK + case err != nil && expectedErr == nil: + t.Errorf("Unexpected error after validating options: %s", err) + case err == nil && expectedErr != nil: + t.Errorf("Expected %q error after validating invalid options but got nothing", expectedErr) + case err != nil && expectedErr != nil: + if !strings.Contains(err.Error(), expectedErr.Error()) { + t.Errorf("Expected error containing %q, got: %q", expectedErr.Error(), err.Error()) + } + } + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + opts := test.setupOptions() + err := validateLeafNode(opts) + checkErr(t, err, test.err) + }) + } +} diff --git a/server/opts.go b/server/opts.go index e041d432231..15307bbf7f7 100644 --- a/server/opts.go +++ b/server/opts.go @@ -241,6 +241,18 @@ type RemoteLeafOpts struct { NoMasking bool `json:"-"` } + // HTTP Proxy configuration for WebSocket connections + Proxy struct { + // URL of the HTTP proxy server (e.g., "http://proxy.example.com:8080") + URL string `json:"-"` + // Username for proxy authentication + Username string `json:"-"` + // Password for proxy authentication + Password string `json:"-"` + // Timeout for proxy connection + Timeout time.Duration `json:"-"` + } + tlsConfigOpts *TLSConfigOpts // If we are clustered and our local account has JetStream, if apps are accessing @@ -2990,6 +3002,48 @@ func parseRemoteLeafNodes(v any, errors *[]error, warnings *[]error) ([]*RemoteL remote.FirstInfoTimeout = parseDuration(k, tk, v, errors, warnings) case "disabled": remote.Disabled = v.(bool) + case "proxy": + proxyMap, ok := v.(map[string]any) + if !ok { + *errors = append(*errors, &configErr{tk, fmt.Sprintf("Expected proxy to be a map, got %T", v)}) + continue + } + // Capture the token for the "proxy" field itself, before the map iteration + proxyToken := tk + for pk, pv := range proxyMap { + tk, pv = unwrapValue(pv, <) + switch strings.ToLower(pk) { + case "url": + remote.Proxy.URL = pv.(string) + case "username": + remote.Proxy.Username = pv.(string) + case "password": + remote.Proxy.Password = pv.(string) + case "timeout": + remote.Proxy.Timeout = parseDuration("proxy timeout", tk, pv, errors, warnings) + default: + if !tk.IsUsedVariable() { + err := &unknownConfigFieldErr{ + field: pk, + configErr: configErr{ + token: tk, + }, + } + *errors = append(*errors, err) + continue + } + } + } + // Use the saved proxy token for validation errors, not the last field token + if warns, err := validateLeafNodeProxyOptions(remote); err != nil { + *errors = append(*errors, &configErr{proxyToken, err.Error()}) + continue + } else { + // Add any warnings about proxy configuration + for _, warn := range warns { + *warnings = append(*warnings, &configErr{proxyToken, warn}) + } + } default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/opts_test.go b/server/opts_test.go index 4f47cbaaf0c..2f3077fb826 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -3921,3 +3921,76 @@ func TestOptionsProxyRequired(t *testing.T) { require_NoError(t, err) checkUsersAndNkeys(o.LeafNode.Users, false, nil) } + +// TestNewServerFromConfigFunctionality tests the NewServerFromConfig() function +// to ensure it properly processes config files and creates servers correctly. +func TestNewServerFromConfigFunctionality(t *testing.T) { + // Test 1: Error handling - invalid configuration + confFileName := createConfFile(t, []byte(` + max_payload = 3000000000 + `)) + + opts1 := &Options{ + ConfigFile: confFileName, + } + + // Should fail due to oversized max_payload (same as TestLargeMaxPayload) + if _, err := NewServerFromConfig(opts1); err == nil { + t.Fatalf("Expected an error from too large of a max_payload entry") + } + + // Test 2: Config validation error - max_pending > max_payload + confFileName = createConfFile(t, []byte(` + max_payload = 100000 + max_pending = 50000 + `)) + + opts2 := &Options{ + ConfigFile: confFileName, + } + + // This should trigger validation error (same as TestLargeMaxPayload) + server, err := NewServerFromConfig(opts2) + if err == nil || !strings.Contains(err.Error(), "cannot be higher") { + if server != nil { + server.Shutdown() + } + t.Fatalf("Expected validation error, got: %v", err) + } +} + +// TestNewServerFromConfigVsLoadConfig tests that NewServerFromConfig produces +// equivalent results to the traditional LoadConfig approach. +func TestNewServerFromConfigVsLoadConfig(t *testing.T) { + confFileName := createConfFile(t, []byte(` + port = 4224 + max_payload = 4194304 + max_connections = 200 + ping_interval = "30s" + `)) + + // Method 1: Using LoadConfig (traditional approach) + opts1 := LoadConfig(confFileName) + + // Method 2: Using NewServerFromConfig (new approach for embedded servers) + opts2 := &Options{ConfigFile: confFileName} + + // Test 1: Both should be able to create servers successfully + server1, err := NewServer(opts1) + if err != nil { + t.Fatalf("Failed to create server with LoadConfig options: %v", err) + } + server1.Shutdown() + + server2, err := NewServerFromConfig(opts2) + if err != nil { + t.Fatalf("Failed to create server with NewServerFromConfig: %v", err) + } + server2.Shutdown() + + // Test 2: Both methods should produce equivalent results - normalize test environment fields + // LoadConfig sets these fields for testing, so we need to match them for fair comparison + opts2.NoSigs, opts2.NoLog = true, opts2.LogFile == _EMPTY_ + + checkOptionsEqual(t, opts1, opts2) +} diff --git a/server/server.go b/server/server.go index 2971fbe8eed..5c88609e2e4 100644 --- a/server/server.go +++ b/server/server.go @@ -697,6 +697,15 @@ func New(opts *Options) *Server { return s } +func NewServerFromConfig(opts *Options) (*Server, error) { + if opts.ConfigFile != _EMPTY_ && opts.configDigest == "" { + if err := opts.ProcessConfigFile(opts.ConfigFile); err != nil { + return nil, err + } + } + return NewServer(opts) +} + // NewServer will setup a new server struct after parsing the options. // Could return an error if options can not be validated. // The provided Options type should not be re-used afterwards. diff --git a/server/stream.go b/server/stream.go index 1bf64daba42..a3588dd844d 100644 --- a/server/stream.go +++ b/server/stream.go @@ -6461,7 +6461,8 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr rollback := func(seq uint64) { if isClustered { - // TODO(mvv): reset in-memory expected header maps + // Only need to move the clustered sequence back if the batch fails to commit. + // Other changes were staged but not applied, so this is the only thing we need to do. mset.clseq -= seq - 1 } mset.clMu.Unlock() @@ -6505,9 +6506,6 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr } // Reject unsupported headers. - if msgId := getMsgId(bhdr); msgId != _EMPTY_ { - return errorOnUnsupported(seq, JSMsgId) - } if getExpectedLastMsgId(hdr) != _EMPTY_ { return errorOnUnsupported(seq, JSExpectedLastMsgId) } @@ -6567,12 +6565,9 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr // Do a single multi proposal. This ensures we get to push all entries to the proposal queue in-order // and not interleaved with other proposals. diff.commit(mset) - if err := node.ProposeMulti(entries); err == nil { - mset.trackReplicationTraffic(node, sz, r) - } else { - // TODO(mvv): reset in-memory expected header maps - mset.clseq -= batchSeq - } + _ = node.ProposeMulti(entries) + // The proposal can fail, but we always account for trying. + mset.trackReplicationTraffic(node, sz, r) // Check to see if we are being overrun. // TODO(dlc) - Make this a limit where we drop messages to protect ourselves, but allow to be configured.