Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9ed66a6
Tweak cache expiry in `firstMatching` or `firstMatchingMulti`
neilalexander Oct 14, 2025
55bb241
Improved warning for Observer mode
roeschter Oct 14, 2025
78855d6
(2.12) [FIXED] Atomic batch: check correct header for unsupported error
MauriceVanVeen Oct 15, 2025
44f4057
[FIXED] Message Tracing: Hop header set properly per gateway
kozlovic Oct 18, 2025
aaf5af7
[IMPROVED] Interest desync after consumer create/update
MauriceVanVeen Oct 17, 2025
288652c
[FIXED] LeafNode proxy validation
kozlovic Oct 18, 2025
f30d860
(2.12) [FIXED] Leaf node token auth
MauriceVanVeen Oct 21, 2025
c9d0654
[IMPROVED] Filestore MaxBytes/Msgs update performance
MauriceVanVeen Oct 21, 2025
0a03552
(2.12) [FIXED] DirectGet batch deadlock with parallel write
MauriceVanVeen Oct 22, 2025
40e3cb5
[FIXED] Header search without alloc & bug fixes
MauriceVanVeen Oct 23, 2025
f1364b9
[FIXED] Consumer send 404 No Messages on EOS
MauriceVanVeen Oct 23, 2025
df48a14
(2.14) [FIXED] Consumer send 404 No Messages on EOS after delivering …
MauriceVanVeen Oct 23, 2025
5d8652b
Update Go dependencies
neilalexander Oct 27, 2025
d9a249c
Update GHA dependencies
neilalexander Oct 27, 2025
e017635
add expvarz
alexbozhenko Oct 24, 2025
9c39cba
[FIXED] NRG: Always only report leader after noop-entry
MauriceVanVeen Oct 22, 2025
0d53d5b
[FIXED] Gateway RS+/- blocks on account fetch
MauriceVanVeen Oct 20, 2025
d74cc25
Add `meta_compact` option to control JetStream meta group compaction/…
neilalexander Oct 28, 2025
c14e666
[FIXED] Default to allowing binary stream snapshots
MauriceVanVeen Oct 28, 2025
f07c0a7
[TEST] Add assert.Unreachable when using legacy JSON stream snapshot
MauriceVanVeen Oct 29, 2025
2b62e92
Don't snapshot on monitor goroutine or Raft node quit signal unless s…
neilalexander Oct 30, 2025
cbfc330
Parallelisation when enabling JetStream
neilalexander Oct 27, 2025
98eced2
Add `BenchmarkJetStreamParallelStartup`
neilalexander Oct 30, 2025
b786da5
[FIXED] Inconsistent index race condition
MauriceVanVeen Oct 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ jobs:

- name: Install cosign
# Use commit hash here to avoid a re-tagging attack, as this is a third-party action
# Commit d7543c93d881b35a8faa02e8e3605f69b7a1ce62 = tag v3.10.0
uses: sigstore/cosign-installer@d7543c93d881b35a8faa02e8e3605f69b7a1ce62
# Commit faadad0cce49287aee09b3a48701e75088a2c6ad = tag v4.0.0
uses: sigstore/cosign-installer@faadad0cce49287aee09b3a48701e75088a2c6ad

- name: Install syft
# Use commit hash here to avoid a re-tagging attack, as this is a third-party action
# Commit f8bdd1d8ac5e901a77a92f111440fdb1b593736b = tag v0.20.6
uses: anchore/sbom-action/download-syft@f8bdd1d8ac5e901a77a92f111440fdb1b593736b
# Commit 8e94d75ddd33f69f691467e42275782e4bfefe84 = tag v0.20.9
uses: anchore/sbom-action/download-syft@8e94d75ddd33f69f691467e42275782e4bfefe84
with:
syft-version: "v1.27.1"

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ toolchain go1.24.9
require (
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op
github.com/google/go-tpm v0.9.6
github.com/klauspost/compress v1.18.0
github.com/klauspost/compress v1.18.1
github.com/minio/highwayhash v1.0.3
github.com/nats-io/jwt/v2 v2.8.0
github.com/nats-io/nats.go v1.46.1
github.com/nats-io/nats.go v1.47.0
github.com/nats-io/nkeys v0.4.11
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.6.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-tpm v0.9.6 h1:Ku42PT4LmjDu1H5C5ISWLlpI1mj+Zq7sPGKoRw2XROA=
github.com/google/go-tpm v0.9.6/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co=
github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats.go v1.46.1 h1:bqQ2ZcxVd2lpYI97xYASeRTY3I5boe/IVmuUDPitHfo=
github.com/nats-io/nats.go v1.46.1/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nats.go v1.47.0 h1:YQdADw6J/UfGUd2Oy6tn4Hq6YHxCaJrVKayxxFqYrgM=
github.com/nats-io/nats.go v1.47.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
4 changes: 2 additions & 2 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3794,7 +3794,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
// If JetStream is enabled for this server we will call into configJetStream for the account
// regardless of enabled or disabled. It handles both cases.
if jsEnabled {
if err := s.configJetStream(a); err != nil {
if err := s.configJetStream(a, nil); err != nil {
s.Errorf("Error configuring jetstream for account [%s]: %v", tl, err.Error())
a.mu.Lock()
// Absent reload of js server cfg, this is going to be broken until js is disabled
Expand Down Expand Up @@ -4371,7 +4371,7 @@ func (dr *DirAccResolver) Start(s *Server) error {
s.Warnf("DirResolver - Error checking for JetStream support for account %q: %v", pubKey, err)
}
} else if jsa == nil {
if err = s.configJetStream(acc); err != nil {
if err = s.configJetStream(acc, nil); err != nil {
s.Errorf("DirResolver - Error configuring JetStream for account %q: %v", pubKey, err)
}
}
Expand Down
3 changes: 2 additions & 1 deletion server/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,8 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au
return ok
}

if c.kind == CLIENT {
// Check for the use of simple auth.
if c.kind == CLIENT || c.kind == LEAF {
if proxyRequired = opts.ProxyRequired; proxyRequired && !trustedProxy {
return setProxyAuthError(ErrAuthProxyRequired)
}
Expand Down
63 changes: 46 additions & 17 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4345,7 +4345,7 @@ func (c *client) setupResponseServiceImport(acc *Account, si *serviceImport, tra

// Will remove a header if present.
func removeHeaderIfPresent(hdr []byte, key string) []byte {
start := bytes.Index(hdr, []byte(key+":"))
start := getHeaderKeyIndex(key, hdr)
// key can't be first and we want to check that it is preceded by a '\n'
if start < 1 || hdr[start-1] != '\n' {
return hdr
Expand Down Expand Up @@ -4463,22 +4463,13 @@ func sliceHeader(key string, hdr []byte) []byte {
if len(hdr) == 0 {
return nil
}
index := bytes.Index(hdr, stringToBytes(key+":"))
hdrLen := len(hdr)
// Check that we have enough characters, this will handle the -1 case of the key not
// being found and will also handle not having enough characters for trailing CRLF.
if index < 2 {
return nil
}
// There should be a terminating CRLF.
if index >= hdrLen-1 || hdr[index-1] != '\n' || hdr[index-2] != '\r' {
index := getHeaderKeyIndex(key, hdr)
if index == -1 {
return nil
}
// The key should be immediately followed by a : separator.
// Skip over the key and the : separator.
index += len(key) + 1
if index >= hdrLen || hdr[index-1] != ':' {
return nil
}
hdrLen := len(hdr)
// Skip over whitespace before the value.
for index < hdrLen && hdr[index] == ' ' {
index++
Expand All @@ -4494,11 +4485,49 @@ func sliceHeader(key string, hdr []byte) []byte {
return hdr[start:index:index]
}

// getHeaderKeyIndex returns an index into the header slice for the given key.
// Returns -1 if not found.
func getHeaderKeyIndex(key string, hdr []byte) int {
if len(hdr) == 0 {
return -1
}
bkey := stringToBytes(key)
keyLen, hdrLen := len(key), len(hdr)
var offset int
for {
index := bytes.Index(hdr[offset:], bkey)
// Check that we have enough characters, this will handle the -1 case of the key not
// being found and will also handle not having enough characters for trailing CRLF.
if index < 2 {
return -1
}
index += offset
// There should be a terminating CRLF.
if index >= hdrLen-1 || hdr[index-1] != '\n' || hdr[index-2] != '\r' {
offset = index + keyLen
continue
}
// The key should be immediately followed by a : separator.
if index+keyLen >= hdrLen {
return -1
}
if hdr[index+keyLen] != ':' {
offset = index + keyLen
continue
}
return index
}
}

func setHeader(key, val string, hdr []byte) []byte {
prefix := []byte(key + ": ")
start := bytes.Index(hdr, prefix)
start := getHeaderKeyIndex(key, hdr)
if start >= 0 {
valStart := start + len(prefix)
valStart := start + len(key) + 1
// Preserve single whitespace if used.
hdrLen := len(hdr)
if valStart < hdrLen && hdr[valStart] == ' ' {
valStart++
}
valEnd := bytes.Index(hdr[valStart:], []byte("\r"))
if valEnd < 0 {
return hdr // malformed headers
Expand Down
101 changes: 100 additions & 1 deletion server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3195,7 +3195,7 @@ func TestSliceHeader(t *testing.T) {
require_True(t, bytes.Equal(sliced, copied))
}

func TestSliceHeaderOrdering(t *testing.T) {
func TestSliceHeaderOrderingPrefix(t *testing.T) {
hdr := []byte("NATS/1.0\r\n\r\n")

// These headers share the same prefix, the longer subject
Expand All @@ -3215,6 +3215,105 @@ func TestSliceHeaderOrdering(t *testing.T) {
require_True(t, bytes.Equal(sliced, copied))
}

func TestSliceHeaderOrderingSuffix(t *testing.T) {
hdr := []byte("NATS/1.0\r\n\r\n")

// These headers share the same suffix, the longer subject
// must not invalidate the existence of the shorter one.
hdr = genHeader(hdr, "Previous-Nats-Msg-Id", "user")
hdr = genHeader(hdr, "Nats-Msg-Id", "control")

sliced := sliceHeader("Nats-Msg-Id", hdr)
copied := getHeader("Nats-Msg-Id", hdr)

require_NotNil(t, sliced)
require_NotNil(t, copied)
require_True(t, bytes.Equal(sliced, copied))
require_Equal(t, string(copied), "control")
}

func TestRemoveHeaderIfPresentOrderingPrefix(t *testing.T) {
hdr := []byte("NATS/1.0\r\n\r\n")

// These headers share the same prefix, the longer subject
// must not invalidate the existence of the shorter one.
hdr = genHeader(hdr, JSExpectedLastSubjSeqSubj, "foo")
hdr = genHeader(hdr, JSExpectedLastSubjSeq, "24")

hdr = removeHeaderIfPresent(hdr, JSExpectedLastSubjSeq)
ehdr := genHeader(nil, JSExpectedLastSubjSeqSubj, "foo")
require_True(t, bytes.Equal(hdr, ehdr))
}

func TestRemoveHeaderIfPresentOrderingSuffix(t *testing.T) {
hdr := []byte("NATS/1.0\r\n\r\n")

// These headers share the same suffix, the longer subject
// must not invalidate the existence of the shorter one.
hdr = genHeader(hdr, "Previous-Nats-Msg-Id", "user")
hdr = genHeader(hdr, "Nats-Msg-Id", "control")

hdr = removeHeaderIfPresent(hdr, "Nats-Msg-Id")
ehdr := genHeader(nil, "Previous-Nats-Msg-Id", "user")
require_True(t, bytes.Equal(hdr, ehdr))
}

func TestSetHeaderOrderingPrefix(t *testing.T) {
for _, space := range []bool{true, false} {
title := "Normal"
if !space {
title = "Trimmed"
}
t.Run(title, func(t *testing.T) {
hdr := []byte("NATS/1.0\r\n\r\n")

// These headers share the same prefix, the longer subject
// must not invalidate the existence of the shorter one.
hdr = genHeader(hdr, JSExpectedLastSubjSeqSubj, "foo")
hdr = genHeader(hdr, JSExpectedLastSubjSeq, "24")
if !space {
hdr = bytes.ReplaceAll(hdr, []byte(" "), nil)
}

hdr = setHeader(JSExpectedLastSubjSeq, "12", hdr)
ehdr := genHeader(nil, JSExpectedLastSubjSeqSubj, "foo")
ehdr = genHeader(ehdr, JSExpectedLastSubjSeq, "12")
if !space {
ehdr = bytes.ReplaceAll(ehdr, []byte(" "), nil)
}
require_True(t, bytes.Equal(hdr, ehdr))
})
}
}

func TestSetHeaderOrderingSuffix(t *testing.T) {
for _, space := range []bool{true, false} {
title := "Normal"
if !space {
title = "Trimmed"
}
t.Run(title, func(t *testing.T) {
hdr := []byte("NATS/1.0\r\n\r\n")

// These headers share the same suffix, the longer subject
// must not invalidate the existence of the shorter one.
hdr = genHeader(hdr, "Previous-Nats-Msg-Id", "user")
hdr = genHeader(hdr, "Nats-Msg-Id", "control")
if !space {
hdr = bytes.ReplaceAll(hdr, []byte(" "), nil)
}

hdr = setHeader("Nats-Msg-Id", "other", hdr)
ehdr := genHeader(nil, "Previous-Nats-Msg-Id", "user")
ehdr = genHeader(ehdr, "Nats-Msg-Id", "other")
if !space {
ehdr = bytes.ReplaceAll(ehdr, []byte(" "), nil)
}
require_True(t, bytes.Equal(hdr, ehdr))
})
}
}

func TestInProcessAllowedConnectionType(t *testing.T) {
tmpl := `
listen: "127.0.0.1:-1"
Expand Down
22 changes: 18 additions & 4 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4519,7 +4519,8 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) {
var pre *waitingRequest
for wr := wq.head; wr != nil; {
// Check expiration.
if (eos && wr.noWait && wr.d > 0) || (!wr.expires.IsZero() && now.After(wr.expires)) {
expires := !wr.expires.IsZero() && now.After(wr.expires)
if (eos && wr.noWait) || expires {
rdWait := o.replicateDeliveries()
if rdWait {
// Check if we need to send the timeout after pending replicated deliveries, or can do so immediately.
Expand All @@ -4528,13 +4529,26 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) {
} else {
wd.pn, wd.pb = wr.n, wr.b
}
// If we still need to wait for replicated deliveries, remove from waiting list.
if rdWait {
wr = remove(pre, wr)
continue
}
}
if !rdWait {
// Normally it's a timeout.
if expires {
hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
wr = remove(pre, wr)
continue
} else if wr.expires.IsZero() || wr.d > 0 {
// But if we're NoWait without expiry, we've reached the end of the stream, and we've not delivered any messages.
// Return no messages instead, which is the same as if we'd rejected the pull request initially.
hdr := fmt.Appendf(nil, "NATS/1.0 404 No Messages\r\n\r\n")
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
wr = remove(pre, wr)
continue
}
wr = remove(pre, wr)
continue
}
// Now check interest.
interest := wr.acc.sl.HasInterest(wr.interest)
Expand Down
48 changes: 24 additions & 24 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -1735,18 +1735,18 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su
node := getHash(si.Name)
accountNRG := si.AccountNRG()
oldInfo, _ := s.nodeToInfo.Swap(node, nodeInfo{
si.Name,
si.Version,
si.Cluster,
si.Domain,
si.ID,
si.Tags,
cfg,
stats,
false,
si.JetStreamEnabled(),
si.BinaryStreamSnapshot(),
accountNRG,
name: si.Name,
version: si.Version,
cluster: si.Cluster,
domain: si.Domain,
id: si.ID,
tags: si.Tags,
cfg: cfg,
stats: stats,
offline: false,
js: si.JetStreamEnabled(),
binarySnapshots: si.BinaryStreamSnapshot(),
accountNRG: accountNRG,
})
if oldInfo == nil || accountNRG != oldInfo.(nodeInfo).accountNRG {
// One of the servers we received statsz from changed its mind about
Expand Down Expand Up @@ -1789,18 +1789,18 @@ func (s *Server) processNewServer(si *ServerInfo) {
// Only update if non-existent
if _, ok := s.nodeToInfo.Load(node); !ok {
s.nodeToInfo.Store(node, nodeInfo{
si.Name,
si.Version,
si.Cluster,
si.Domain,
si.ID,
si.Tags,
nil,
nil,
false,
si.JetStreamEnabled(),
si.BinaryStreamSnapshot(),
si.AccountNRG(),
name: si.Name,
version: si.Version,
cluster: si.Cluster,
domain: si.Domain,
id: si.ID,
tags: si.Tags,
cfg: nil,
stats: nil,
offline: false,
js: si.JetStreamEnabled(),
binarySnapshots: si.BinaryStreamSnapshot(),
accountNRG: si.AccountNRG(),
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,7 @@ func TestAccountReqMonitoring(t *testing.T) {
s.EnableJetStream(&JetStreamConfig{StoreDir: t.TempDir()})
unusedAcc, _ := createAccount(s)
acc, akp := createAccount(s)
acc.EnableJetStream(nil)
acc.EnableJetStream(nil, nil)
subsz := fmt.Sprintf(accDirectReqSubj, acc.Name, "SUBSZ")
connz := fmt.Sprintf(accDirectReqSubj, acc.Name, "CONNZ")
jsz := fmt.Sprintf(accDirectReqSubj, acc.Name, "JSZ")
Expand Down
Loading
Loading