Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/cov.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ jobs:

- name: Coveralls
# Use commit hash here to avoid a re-tagging attack, as this is a third-party action
# Commit 3dfc5567390f6fa9267c0ee9c251e4c8c3f18949 = tag v2
uses: coverallsapp/github-action@643bc377ffa44ace6394b2b5d0d3950076de9f63
# Commit 648a8eb78e6d50909eff900e4ec85cab4524a45b = tag v2.3.6
uses: coverallsapp/github-action@648a8eb78e6d50909eff900e4ec85cab4524a45b
with:
github-token: ${{ secrets.github_token }}
file: src/github.com/nats-io/nats-server/coverage.lcov
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.22.0
toolchain go1.22.8

require (
github.com/klauspost/compress v1.17.11
github.com/klauspost/compress v1.18.0
github.com/minio/highwayhash v1.0.3
github.com/nats-io/jwt/v2 v2.7.3
github.com/nats-io/nats.go v1.39.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE=
Expand Down
2 changes: 1 addition & 1 deletion server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2026,7 +2026,7 @@ func (a *Account) addServiceImportSub(si *serviceImport) error {
a.mu.Unlock()

cb := func(sub *subscription, c *client, acc *Account, subject, reply string, msg []byte) {
c.processServiceImport(si, acc, msg)
c.pa.delivered = c.processServiceImport(si, acc, msg)
}
sub, err := c.processSubEx([]byte(subject), nil, []byte(sid), cb, true, true, false)
if err != nil {
Expand Down
30 changes: 29 additions & 1 deletion server/accounts_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018-2024 The NATS Authors
// Copyright 2018-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -3670,3 +3670,31 @@ func TestAccountServiceAndStreamExportDoubleDelivery(t *testing.T) {
time.Sleep(200 * time.Millisecond)
require_Equal(t, msgs.Load(), 1)
}

func TestAccountServiceImportNoResponders(t *testing.T) {
// Setup NATS server.
cf := createConfFile(t, []byte(`
port: -1
accounts: {
accExp: {
users: [{user: accExp, password: accExp}]
exports: [{service: "foo"}]
}
accImp: {
users: [{user: accImp, password: accImp}]
imports: [{service: {account: accExp, subject: "foo"}}]
}
}
`))

s, _ := RunServerWithConfig(cf)
defer s.Shutdown()

// Connect to the import account. We will not setup any responders, so a request should
// error out with ErrNoResponders.
nc := natsConnect(t, s.ClientURL(), nats.UserInfo("accImp", "accImp"))
defer nc.Close()

_, err := nc.Request("foo", []byte("request"), 250*time.Millisecond)
require_Error(t, err, nats.ErrNoResponders)
}
24 changes: 19 additions & 5 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3472,17 +3472,25 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su
}
client.mu.Unlock()

// For service imports, track if we delivered.
didDeliver := true

// Internal account clients are for service imports and need the '\r\n'.
start := time.Now()
if client.kind == ACCOUNT {
sub.icb(sub, c, acc, string(subject), string(reply), msg)
// If we are a service import check to make sure we delivered the message somewhere.
if sub.si {
didDeliver = c.pa.delivered
}
} else {
sub.icb(sub, c, acc, string(subject), string(reply), msg[:msgSize])
}
if dur := time.Since(start); dur >= readLoopReportThreshold {
srv.Warnf("Internal subscription on %q took too long: %v", subject, dur)
}
return true

return didDeliver
}

// If we are a client and we detect that the consumer we are
Expand Down Expand Up @@ -4196,17 +4204,17 @@ var (

// processServiceImport is an internal callback when a subscription matches an imported service
// from another account. This includes response mappings as well.
func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byte) {
func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byte) bool {
// If we are a GW and this is not a direct serviceImport ignore.
isResponse := si.isRespServiceImport()
if (c.kind == GATEWAY || c.kind == ROUTER) && !isResponse {
return
return false
}
// Detect cycles and ignore (return) when we detect one.
if len(c.pa.psi) > 0 {
for i := len(c.pa.psi) - 1; i >= 0; i-- {
if psi := c.pa.psi[i]; psi.se == si.se {
return
return false
}
}
}
Expand All @@ -4227,7 +4235,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
// response service imports and rrMap entries which all will need to simply expire.
// TODO(dlc) - Come up with something better.
if shouldReturn || (checkJS && si.se != nil && si.se.acc == c.srv.SystemAccount()) {
return
return false
}

var nrr []byte
Expand Down Expand Up @@ -4375,6 +4383,10 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
c.in.rts = orts
c.pa = pacopy

// Before we undo didDeliver based on tracing and last mile, mark in the c.pa which informs us of no responders status.
// If we override due to tracing and traceOnly we do not want to send back a no responders.
c.pa.delivered = didDeliver

// Determine if we should remove this service import. This is for response service imports.
// We will remove if we did not deliver, or if we are a response service import and we are
// a singleton, or we have an EOF message.
Expand Down Expand Up @@ -4404,6 +4416,8 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
siAcc.removeRespServiceImport(rsi, reason)
}
}

return didDeliver
}

func (c *client) addSubToRouteTargets(sub *subscription) {
Expand Down
17 changes: 17 additions & 0 deletions server/config_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1817,6 +1817,23 @@ func TestConfigCheck(t *testing.T) {
errorLine: 9,
errorPos: 9,
},
{
name: "invalid duration for remote leafnode first info timeout",
config: `
leafnodes {
port: -1
remotes [
{
url: "nats://127.0.0.1:123"
first_info_timeout: abc
}
]
}
`,
err: fmt.Errorf("error parsing first_info_timeout: time: invalid duration %q", "abc"),
errorLine: 7,
errorPos: 8,
},
{
name: "show warnings on empty configs without values",
config: ``,
Expand Down
8 changes: 8 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5308,6 +5308,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
if ss, ok := mb.fss.Find(stringToBytes(subj)); ok && ss != nil {
ss.Msgs++
ss.Last = seq
ss.lastNeedsUpdate = false
} else {
mb.fss.Insert(stringToBytes(subj), SimpleState{Msgs: 1, First: seq, Last: seq})
}
Expand Down Expand Up @@ -6030,6 +6031,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
if ss, ok := mb.fss.Find(bsubj); ok && ss != nil {
ss.Msgs++
ss.Last = seq
ss.lastNeedsUpdate = false
} else {
mb.fss.Insert(bsubj, SimpleState{
Msgs: 1,
Expand Down Expand Up @@ -8057,8 +8059,11 @@ func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) {
}
if startSlot >= len(mb.cache.idx) {
ss.First = ss.Last
ss.firstNeedsUpdate = false
ss.lastNeedsUpdate = false
return
}

endSlot := int(ss.Last - mb.cache.fseq)
if endSlot < 0 {
endSlot = 0
Expand All @@ -8085,6 +8090,8 @@ func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) {
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
// Only need to reset ss.lastNeedsUpdate, ss.firstNeedsUpdate is already reset above.
ss.lastNeedsUpdate = false
return
}
buf := mb.cache.buf[li:]
Expand Down Expand Up @@ -8208,6 +8215,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error {
if ss, ok := mb.fss.Find(stringToBytes(sm.subj)); ok && ss != nil {
ss.Msgs++
ss.Last = seq
ss.lastNeedsUpdate = false
} else {
mb.fss.Insert(stringToBytes(sm.subj), SimpleState{Msgs: 1, First: seq, Last: seq})
}
Expand Down
6 changes: 5 additions & 1 deletion server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7655,7 +7655,11 @@ func TestJetStreamClusterRecreateConsumerFromMetaSnapshot(t *testing.T) {
return err
} else if o := mset.lookupConsumer("CONSUMER"); o == nil {
return errors.New("consumer doesn't exist")
} else if ccrg := o.raftNode().Group(); consumerRg == _EMPTY_ {
} else if rn := o.raftNode(); rn == nil {
return errors.New("consumer raft node doesn't exist")
} else if ccrg := rn.Group(); ccrg == _EMPTY_ {
return errors.New("consumer raft group doesn't exist")
} else if consumerRg == _EMPTY_ {
consumerRg = ccrg
} else if consumerRg != ccrg {
return errors.New("consumer raft groups don't match")
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7051,7 +7051,7 @@ func TestJetStreamClusterStreamDirectGetNotTooSoon(t *testing.T) {
defer nc.Close()

_, err = nc.Request(getSubj, nil, time.Second)
require_Error(t, err, nats.ErrTimeout)
require_Error(t, err, nats.ErrNoResponders)

// Now start all and make sure they all eventually have subs for direct access.
c.restartAll()
Expand Down
3 changes: 1 addition & 2 deletions server/jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6189,8 +6189,7 @@ func TestJWTAccountProtectedImport(t *testing.T) {

// ensure service fails
_, err = ncImp.Request(srvcSub, []byte("hello"), time.Second)
require_Error(t, err)
require_Contains(t, err.Error(), "timeout")
require_Error(t, err, nats.ErrNoResponders)
s.AccountResolver().Store(exportPub, exportJWTOn)
// ensure stream fails
err = ncExp.Publish(strmSub, []byte("hello"))
Expand Down
7 changes: 5 additions & 2 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name)

var tlsFirst bool
var infoTimeout time.Duration
if remote != nil {
solicited = true
remote.Lock()
Expand All @@ -1006,6 +1007,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
c.leaf.isSpoke = true
}
tlsFirst = remote.TLSHandshakeFirst
infoTimeout = remote.FirstInfoTimeout
remote.Unlock()
c.acc = acc
} else {
Expand Down Expand Up @@ -1063,7 +1065,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
}
}
// We need to wait for the info, but not for too long.
c.nc.SetReadDeadline(time.Now().Add(DEFAULT_LEAFNODE_INFO_WAIT))
c.nc.SetReadDeadline(time.Now().Add(infoTimeout))
}

// We will process the INFO from the readloop and finish by
Expand Down Expand Up @@ -2897,6 +2899,7 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot
compress := remote.Websocket.Compression
// By default the server will mask outbound frames, but it can be disabled with this option.
noMasking := remote.Websocket.NoMasking
infoTimeout := remote.FirstInfoTimeout
remote.RUnlock()
// Will do the client-side TLS handshake if needed.
tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
Expand Down Expand Up @@ -2949,14 +2952,14 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot
if noMasking {
req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue)
}
c.nc.SetDeadline(time.Now().Add(infoTimeout))
if err := req.Write(c.nc); err != nil {
return nil, WriteError, err
}

var resp *http.Response

br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE)
c.nc.SetReadDeadline(time.Now().Add(DEFAULT_LEAFNODE_INFO_WAIT))
resp, err = http.ReadResponse(br, req)
if err == nil &&
(resp.StatusCode != 101 ||
Expand Down
Loading