Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/nats-io/nkeys v0.4.8
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.6.0
golang.org/x/crypto v0.30.0
golang.org/x/crypto v0.31.0
golang.org/x/sys v0.28.0
golang.org/x/time v0.8.0
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY=
golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Expand Down
35 changes: 15 additions & 20 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ type client struct {
last time.Time
lastIn time.Time

repliesSincePrune uint16
lastReplyPrune time.Time

headers bool

rtt time.Duration
Expand Down Expand Up @@ -420,6 +423,7 @@ const (
pruneSize = 32
routeTargetInit = 8
replyPermLimit = 4096
replyPruneTime = time.Second
)

// Represent read cache booleans with a bitmask
Expand Down Expand Up @@ -3526,9 +3530,11 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su

// If we are tracking dynamic publish permissions that track reply subjects,
// do that accounting here. We only look at client.replies which will be non-nil.
if client.replies != nil && len(reply) > 0 {
// Only reply subject permissions if the client is not already allowed to publish to the reply subject.
if client.replies != nil && len(reply) > 0 && !client.pubAllowedFullCheck(string(reply), true, true) {
client.replies[string(reply)] = &resp{time.Now(), 0}
if len(client.replies) > replyPermLimit {
client.repliesSincePrune++
if client.repliesSincePrune > replyPermLimit || time.Since(client.lastReplyPrune) > replyPruneTime {
client.pruneReplyPerms()
}
}
Expand Down Expand Up @@ -3652,6 +3658,9 @@ func (c *client) pruneReplyPerms() {
delete(c.replies, k)
}
}

c.repliesSincePrune = 0
c.lastReplyPrune = now
}

// pruneDenyCache will prune the deny cache via randomly
Expand Down Expand Up @@ -3720,7 +3729,7 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck, hasLock bool) bo
allowed = np == 0
}

// If we are currently not allowed but we are tracking reply subjects
// If we are tracking reply subjects
// dynamically, check to see if we are allowed here but avoid pcache.
// We need to acquire the lock though.
if !allowed && fullCheck && c.perms.resp != nil {
Expand Down Expand Up @@ -4160,9 +4169,8 @@ func getHeader(key string, hdr []byte) []byte {

// For bytes.HasPrefix below.
var (
jsRequestNextPreB = []byte(jsRequestNextPre)
jsDirectGetPreB = []byte(jsDirectGetPre)
jsConsumerInfoPreB = []byte(JSApiConsumerInfoPre)
jsRequestNextPreB = []byte(jsRequestNextPre)
jsDirectGetPreB = []byte(jsDirectGetPre)
)

// processServiceImport is an internal callback when a subscription matches an imported service
Expand All @@ -4182,16 +4190,12 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
}
}

var checkJS, checkConsumerInfo bool

acc.mu.RLock()
var checkJS bool
shouldReturn := si.invalid || acc.sl == nil
if !shouldReturn && !isResponse && si.to == jsAllAPI {
if bytes.HasPrefix(c.pa.subject, jsDirectGetPreB) || bytes.HasPrefix(c.pa.subject, jsRequestNextPreB) {
checkJS = true
} else if len(c.pa.psi) == 0 && bytes.HasPrefix(c.pa.subject, jsConsumerInfoPreB) {
// Only check if we are clustered and expecting a reply.
checkConsumerInfo = len(c.pa.reply) > 0 && c.srv.JetStreamIsClustered()
}
}
siAcc := si.acc
Expand All @@ -4205,15 +4209,6 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
return
}

// Here we will do a fast check for consumer info only to check if it does not exists. This will spread the
// load to all servers with connected clients since service imports are processed at point of entry.
// Only call for clustered setups.
if checkConsumerInfo && si.se != nil && si.se.acc == c.srv.SystemAccount() {
if c.srv.jsConsumerProcessMissing(c, acc) {
return
}
}

var nrr []byte
var rsi *serviceImport

Expand Down
17 changes: 7 additions & 10 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2667,23 +2667,20 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
TimeStamp: time.Now().UTC(),
}

// If we are replicated and we are not the leader or we are filtered, we need to pull certain data from our store.
isLeader := o.isLeader()
if rg != nil && rg.node != nil && o.store != nil && (!isLeader || o.isFiltered()) {
// If we are replicated, we need to pull certain data from our store.
if rg != nil && rg.node != nil && o.store != nil {
state, err := o.store.BorrowState()
if err != nil {
o.mu.Unlock()
return nil
}
if !isLeader {
info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream
info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream
// If we are the leader we could have o.sseq that is skipped ahead.
// To maintain consistency in reporting (e.g. jsz) we always take the state for our delivered/ackfloor stream sequence.
info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream
info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream
if !o.isLeader() {
info.NumAckPending = len(state.Pending)
info.NumRedelivered = len(state.Redelivered)
} else {
// Since we are filtered and we are the leader we could have o.sseq that is skipped ahead.
// To maintain consistency in reporting (e.g. jsz) we take the state for our delivered stream sequence.
info.Delivered.Stream = state.Delivered.Stream
}
}

Expand Down
Loading