Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,8 +1053,14 @@ func (s *Server) sendStatsz(subj string) {
Size: mg.ClusterSize(),
}
}
if ipq := s.jsAPIRoutedReqs; ipq != nil && jStat.Meta != nil {
jStat.Meta.Pending = ipq.len()
if jStat.Meta != nil {
if ipq := s.jsAPIRoutedReqs; ipq != nil {
jStat.Meta.PendingRequests = ipq.len()
}
if ipq := s.jsAPIRoutedInfoReqs; ipq != nil {
jStat.Meta.PendingInfos = ipq.len()
}
jStat.Meta.Pending = jStat.Meta.PendingRequests + jStat.Meta.PendingInfos
}
}
jStat.Limits = &s.getOpts().JetStreamLimits
Expand Down
38 changes: 21 additions & 17 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"time"

"github.com/minio/highwayhash"
"github.com/nats-io/nats-server/v2/server/gsl"
"github.com/nats-io/nats-server/v2/server/sysmem"
"github.com/nats-io/nats-server/v2/server/tpm"
"github.com/nats-io/nkeys"
Expand Down Expand Up @@ -102,22 +103,24 @@ type JetStreamAPIStats struct {
// This is for internal accounting for JetStream for this server.
type jetStream struct {
// These are here first because of atomics on 32bit systems.
apiInflight int64
apiTotal int64
apiErrors int64
memReserved int64
storeReserved int64
memUsed int64
storeUsed int64
queueLimit int64
clustered int32
mu sync.RWMutex
srv *Server
config JetStreamConfig
cluster *jetStreamCluster
accounts map[string]*jsAccount
apiSubs *Sublist
started time.Time
apiInflight int64
apiTotal int64
apiErrors int64
memReserved int64
storeReserved int64
memUsed int64
storeUsed int64
queueLimit int64
infoQueueLimit int64
clustered int32
mu sync.RWMutex
srv *Server
config JetStreamConfig
cluster *jetStreamCluster
accounts map[string]*jsAccount
apiSubs *Sublist
infoSubs *gsl.SimpleSublist // Subjects for info-specific queue.
started time.Time

// System level request to purge a stream move
accountPurge *subscription
Expand Down Expand Up @@ -412,7 +415,7 @@ func (s *Server) initJetStreamEncryption() (err error) {

// enableJetStream will start up the JetStream subsystem.
func (s *Server) enableJetStream(cfg JetStreamConfig) error {
js := &jetStream{srv: s, config: cfg, accounts: make(map[string]*jsAccount), apiSubs: NewSublistNoCache()}
js := &jetStream{srv: s, config: cfg, accounts: make(map[string]*jsAccount), apiSubs: NewSublistNoCache(), infoSubs: gsl.NewSimpleSublist()}
s.gcbMu.Lock()
if s.gcbOutMax = s.getOpts().JetStreamMaxCatchup; s.gcbOutMax == 0 {
s.gcbOutMax = defaultMaxTotalCatchupOutBytes
Expand All @@ -421,6 +424,7 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error {

// TODO: Not currently reloadable.
atomic.StoreInt64(&js.queueLimit, s.getOpts().JetStreamRequestQueueLimit)
atomic.StoreInt64(&js.infoQueueLimit, s.getOpts().JetStreamInfoQueueLimit)

s.js.Store(js)

Expand Down
91 changes: 65 additions & 26 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,11 +859,19 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
// Copy the state. Note the JSAPI only uses the hdr index to piece apart the
// header from the msg body. No other references are needed.
// Check pending and warn if getting backed up.
pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
limit := atomic.LoadInt64(&js.queueLimit)
var queue *ipQueue[*jsAPIRoutedReq]
var limit int64
if js.infoSubs.HasInterest(subject) {
queue = s.jsAPIRoutedInfoReqs
Comment thread
neilalexander marked this conversation as resolved.
limit = atomic.LoadInt64(&js.infoQueueLimit)
} else {
queue = s.jsAPIRoutedReqs
limit = atomic.LoadInt64(&js.queueLimit)
}
pending, _ := queue.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
if pending >= int(limit) {
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
drained := int64(s.jsAPIRoutedReqs.drain())
s.rateLimitFormatWarnf("%s limit reached, dropping %d requests", queue.name, pending)
drained := int64(queue.drain())
atomic.AddInt64(&js.apiInflight, -drained)

s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
Expand All @@ -883,29 +891,45 @@ func (s *Server) processJSAPIRoutedRequests() {
defer s.grWG.Done()

s.mu.RLock()
queue := s.jsAPIRoutedReqs
queue, infoqueue := s.jsAPIRoutedReqs, s.jsAPIRoutedInfoReqs
client := &client{srv: s, kind: JETSTREAM}
s.mu.RUnlock()

js := s.getJetStream()

processFromQueue := func(ipq *ipQueue[*jsAPIRoutedReq]) {
// Only pop one item at a time here, otherwise if the system is recovering
// from queue buildup, then one worker will pull off all the tasks and the
// others will be starved of work.
if r, ok := ipq.popOne(); ok && r != nil {
client.pa = r.pa
start := time.Now()
r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)
if dur := time.Since(start); dur >= readLoopReportThreshold {
s.Warnf("Internal subscription on %q took too long: %v", r.subject, dur)
}
atomic.AddInt64(&js.apiInflight, -1)
}
}

for {
// First select case is prioritizing queue, we will only fall through
// to the second select case that considers infoqueue if queue is empty.
// This effectively means infos are deprioritized.
select {
case <-queue.ch:
// Only pop one item at a time here, otherwise if the system is recovering
// from queue buildup, then one worker will pull off all the tasks and the
// others will be starved of work.
for r, ok := queue.popOne(); ok && r != nil; r, ok = queue.popOne() {
client.pa = r.pa
start := time.Now()
r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)
if dur := time.Since(start); dur >= readLoopReportThreshold {
s.Warnf("Internal subscription on %q took too long: %v", r.subject, dur)
}
atomic.AddInt64(&js.apiInflight, -1)
}
processFromQueue(queue)
case <-s.quitCh:
return
default:
select {
case <-infoqueue.ch:
processFromQueue(infoqueue)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like info queue processing could starve under continuous load? Is this a concern here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could, but would require all CPU cores to be locked on queue rather than infoqueue simultaneously.

case <-queue.ch:
processFromQueue(queue)
case <-s.quitCh:
return
}
}
}
}
Expand All @@ -924,7 +948,8 @@ func (s *Server) setJetStreamExportSubs() error {
if mp > maxProcs {
mp = maxProcs
}
s.jsAPIRoutedReqs = newIPQueue[*jsAPIRoutedReq](s, "Routed JS API Requests")
s.jsAPIRoutedReqs = newIPQueue[*jsAPIRoutedReq](s, "JetStream API queue")
s.jsAPIRoutedInfoReqs = newIPQueue[*jsAPIRoutedReq](s, "JetStream API info queue")
for i := 0; i < mp; i++ {
s.startGoRoutine(s.processJSAPIRoutedRequests)
}
Expand All @@ -940,16 +965,13 @@ func (s *Server) setJetStreamExportSubs() error {
}

// API handles themselves.
// infopairs are deprioritized compared to pairs in processJSAPIRoutedRequests.
pairs := []struct {
subject string
handler msgHandler
}{
{JSApiAccountInfo, s.jsAccountInfoRequest},
{JSApiStreamCreate, s.jsStreamCreateRequest},
{JSApiStreamUpdate, s.jsStreamUpdateRequest},
{JSApiStreams, s.jsStreamNamesRequest},
{JSApiStreamList, s.jsStreamListRequest},
{JSApiStreamInfo, s.jsStreamInfoRequest},
{JSApiStreamDelete, s.jsStreamDeleteRequest},
{JSApiStreamPurge, s.jsStreamPurgeRequest},
{JSApiStreamSnapshot, s.jsStreamSnapshotRequest},
Expand All @@ -962,23 +984,40 @@ func (s *Server) setJetStreamExportSubs() error {
{JSApiConsumerCreateEx, s.jsConsumerCreateRequest},
{JSApiConsumerCreate, s.jsConsumerCreateRequest},
{JSApiDurableCreate, s.jsConsumerCreateRequest},
{JSApiConsumers, s.jsConsumerNamesRequest},
{JSApiConsumerList, s.jsConsumerListRequest},
{JSApiConsumerInfo, s.jsConsumerInfoRequest},
{JSApiConsumerDelete, s.jsConsumerDeleteRequest},
{JSApiConsumerPause, s.jsConsumerPauseRequest},
{JSApiConsumerUnpin, s.jsConsumerUnpinRequest},
}
infopairs := []struct {
subject string
handler msgHandler
}{
{JSApiAccountInfo, s.jsAccountInfoRequest},
{JSApiStreams, s.jsStreamNamesRequest},
{JSApiStreamList, s.jsStreamListRequest},
{JSApiStreamInfo, s.jsStreamInfoRequest},
{JSApiConsumers, s.jsConsumerNamesRequest},
{JSApiConsumerList, s.jsConsumerListRequest},
{JSApiConsumerInfo, s.jsConsumerInfoRequest},
}

js.mu.Lock()
defer js.mu.Unlock()

for _, p := range pairs {
// As well as populating js.apiSubs for the dispatch function to use, we
// will also populate js.infoSubs, so that the dispatch function can
// decide quickly whether or not the request is an info request or not.
for _, p := range append(infopairs, pairs...) {
sub := &subscription{subject: []byte(p.subject), icb: p.handler}
if err := js.apiSubs.Insert(sub); err != nil {
return err
}
}
for _, p := range infopairs {
Comment thread
MauriceVanVeen marked this conversation as resolved.
if err := js.infoSubs.Insert(p.subject, struct{}{}); err != nil {
return err
}
}

return nil
}
Expand Down
5 changes: 4 additions & 1 deletion server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2977,10 +2977,13 @@ func TestJetStreamClusterAPILimitDefault(t *testing.T) {
for _, s := range c.servers {
s.optsMu.RLock()
lim := s.opts.JetStreamRequestQueueLimit
ilim := s.opts.JetStreamInfoQueueLimit
s.optsMu.RUnlock()

require_Equal(t, lim, JSDefaultRequestQueueLimit)
require_Equal(t, ilim, JSDefaultRequestQueueLimit)
require_Equal(t, atomic.LoadInt64(&s.getJetStream().queueLimit), JSDefaultRequestQueueLimit)
require_Equal(t, atomic.LoadInt64(&s.getJetStream().infoQueueLimit), JSDefaultRequestQueueLimit)
}
}

Expand Down Expand Up @@ -5384,7 +5387,7 @@ func TestJetStreamClusterRoutedAPIRecoverPerformance(t *testing.T) {
require_NoError(t, nc.PublishMsg(msg))
}
checkFor(t, 5*time.Second, 25*time.Millisecond, func() error {
if queued := leader.jsAPIRoutedReqs.len(); queued != count {
if queued := leader.jsAPIRoutedInfoReqs.len(); queued != count {
return fmt.Errorf("expected %d queued requests, got %d", count, queued)
}
return nil
Expand Down
28 changes: 19 additions & 9 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1570,8 +1570,12 @@ func (s *Server) updateJszVarz(js *jetStream, v *JetStreamVarz, doConfig bool) {
v.Meta.Replicas = ci.Replicas
}
if ipq := s.jsAPIRoutedReqs; ipq != nil {
v.Meta.Pending = ipq.len()
v.Meta.PendingRequests = ipq.len()
}
if ipq := s.jsAPIRoutedInfoReqs; ipq != nil {
v.Meta.PendingInfos = ipq.len()
}
v.Meta.Pending = v.Meta.PendingRequests + v.Meta.PendingInfos
}
}
}
Expand Down Expand Up @@ -3010,13 +3014,15 @@ type MetaSnapshotStats struct {

// MetaClusterInfo shows information about the meta group.
type MetaClusterInfo struct {
Name string `json:"name,omitempty"` // Name is the name of the cluster
Leader string `json:"leader,omitempty"` // Leader is the server name of the cluster leader
Peer string `json:"peer,omitempty"` // Peer is unique ID of the leader
Replicas []*PeerInfo `json:"replicas,omitempty"` // Replicas is a list of known peers
Size int `json:"cluster_size"` // Size is the known size of the cluster
Pending int `json:"pending"` // Pending is how many RAFT messages are not yet processed
Snapshot *MetaSnapshotStats `json:"snapshot"` // Snapshot contains meta snapshot statistics
Name string `json:"name,omitempty"` // Name is the name of the cluster
Leader string `json:"leader,omitempty"` // Leader is the server name of the cluster leader
Peer string `json:"peer,omitempty"` // Peer is unique ID of the leader
Replicas []*PeerInfo `json:"replicas,omitempty"` // Replicas is a list of known peers
Size int `json:"cluster_size"` // Size is the known size of the cluster
Pending int `json:"pending"` // Pending is how many RAFT messages are not yet processed
PendingRequests int `json:"pending_requests"` // PendingRequests is how many CRUD operations are queued for processing
PendingInfos int `json:"pending_infos"` // PendingInfos is how many info operations are queued for processing
Snapshot *MetaSnapshotStats `json:"snapshot"` // Snapshot contains meta snapshot statistics
}

// JSInfo has detailed information on JetStream.
Expand Down Expand Up @@ -3239,8 +3245,12 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
jsi.Meta.Replicas = ci.Replicas
}
if ipq := s.jsAPIRoutedReqs; ipq != nil {
jsi.Meta.Pending = ipq.len()
jsi.Meta.PendingRequests = ipq.len()
}
if ipq := s.jsAPIRoutedInfoReqs; ipq != nil {
jsi.Meta.PendingInfos = ipq.len()
}
jsi.Meta.Pending = jsi.Meta.PendingRequests + jsi.Meta.PendingInfos
// Add meta snapshot stats
jsi.Meta.Snapshot = &MetaSnapshotStats{
PendingEntries: entries,
Expand Down
10 changes: 10 additions & 0 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ type Options struct {
JetStreamTpm JSTpmOpts
JetStreamMaxCatchup int64
JetStreamRequestQueueLimit int64
JetStreamInfoQueueLimit int64
JetStreamMetaCompact uint64
JetStreamMetaCompactSize uint64
JetStreamMetaCompactSync bool
Expand Down Expand Up @@ -2641,6 +2642,12 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er
return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)}
}
opts.JetStreamRequestQueueLimit = lim
case "info_queue_limit":
lim, ok := mv.(int64)
if !ok {
return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)}
}
opts.JetStreamInfoQueueLimit = lim
case "meta_compact":
thres, ok := mv.(int64)
if !ok || thres < 0 {
Expand Down Expand Up @@ -6006,6 +6013,9 @@ func setBaselineOptions(opts *Options) {
if opts.JetStreamRequestQueueLimit <= 0 {
opts.JetStreamRequestQueueLimit = JSDefaultRequestQueueLimit
}
if opts.JetStreamInfoQueueLimit <= 0 {
opts.JetStreamInfoQueueLimit = opts.JetStreamRequestQueueLimit
}
}

func getDefaultAuthTimeout(tls *tls.Config, tlsTimeout float64) float64 {
Expand Down
1 change: 1 addition & 0 deletions server/opts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func TestDefaultOptions(t *testing.T) {
JetStreamMaxStore: -1,
SyncInterval: 2 * time.Minute,
JetStreamRequestQueueLimit: JSDefaultRequestQueueLimit,
JetStreamInfoQueueLimit: JSDefaultRequestQueueLimit,
}

opts := &Options{}
Expand Down
3 changes: 2 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ type Server struct {
syncOutSem chan struct{}

// Queue to process JS API requests that come from routes (or gateways)
jsAPIRoutedReqs *ipQueue[*jsAPIRoutedReq]
jsAPIRoutedReqs *ipQueue[*jsAPIRoutedReq]
jsAPIRoutedInfoReqs *ipQueue[*jsAPIRoutedReq]

// Delayed API responses.
delayedAPIResponses *ipQueue[*delayedAPIResponse]
Expand Down