From 67a0316dc913d7c08faff810ef804b74ec1d05b1 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Tue, 3 Mar 2026 15:35:16 +0000 Subject: [PATCH] Separate info requests into their own JS API queue Signed-off-by: Neil Twigg --- server/events.go | 10 +++- server/jetstream.go | 38 +++++++------ server/jetstream_api.go | 91 +++++++++++++++++++++--------- server/jetstream_cluster_4_test.go | 5 +- server/monitor.go | 28 ++++++--- server/opts.go | 10 ++++ server/opts_test.go | 1 + server/server.go | 3 +- 8 files changed, 130 insertions(+), 56 deletions(-) diff --git a/server/events.go b/server/events.go index f9175e8136c..b17921b4103 100644 --- a/server/events.go +++ b/server/events.go @@ -1053,8 +1053,14 @@ func (s *Server) sendStatsz(subj string) { Size: mg.ClusterSize(), } } - if ipq := s.jsAPIRoutedReqs; ipq != nil && jStat.Meta != nil { - jStat.Meta.Pending = ipq.len() + if jStat.Meta != nil { + if ipq := s.jsAPIRoutedReqs; ipq != nil { + jStat.Meta.PendingRequests = ipq.len() + } + if ipq := s.jsAPIRoutedInfoReqs; ipq != nil { + jStat.Meta.PendingInfos = ipq.len() + } + jStat.Meta.Pending = jStat.Meta.PendingRequests + jStat.Meta.PendingInfos } } jStat.Limits = &s.getOpts().JetStreamLimits diff --git a/server/jetstream.go b/server/jetstream.go index 345ad9a7816..373467a5693 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -31,6 +31,7 @@ import ( "time" "github.com/minio/highwayhash" + "github.com/nats-io/nats-server/v2/server/gsl" "github.com/nats-io/nats-server/v2/server/sysmem" "github.com/nats-io/nats-server/v2/server/tpm" "github.com/nats-io/nkeys" @@ -102,22 +103,24 @@ type JetStreamAPIStats struct { // This is for internal accounting for JetStream for this server. type jetStream struct { // These are here first because of atomics on 32bit systems. - apiInflight int64 - apiTotal int64 - apiErrors int64 - memReserved int64 - storeReserved int64 - memUsed int64 - storeUsed int64 - queueLimit int64 - clustered int32 - mu sync.RWMutex - srv *Server - config JetStreamConfig - cluster *jetStreamCluster - accounts map[string]*jsAccount - apiSubs *Sublist - started time.Time + apiInflight int64 + apiTotal int64 + apiErrors int64 + memReserved int64 + storeReserved int64 + memUsed int64 + storeUsed int64 + queueLimit int64 + infoQueueLimit int64 + clustered int32 + mu sync.RWMutex + srv *Server + config JetStreamConfig + cluster *jetStreamCluster + accounts map[string]*jsAccount + apiSubs *Sublist + infoSubs *gsl.SimpleSublist // Subjects for info-specific queue. + started time.Time // System level request to purge a stream move accountPurge *subscription @@ -412,7 +415,7 @@ func (s *Server) initJetStreamEncryption() (err error) { // enableJetStream will start up the JetStream subsystem. func (s *Server) enableJetStream(cfg JetStreamConfig) error { - js := &jetStream{srv: s, config: cfg, accounts: make(map[string]*jsAccount), apiSubs: NewSublistNoCache()} + js := &jetStream{srv: s, config: cfg, accounts: make(map[string]*jsAccount), apiSubs: NewSublistNoCache(), infoSubs: gsl.NewSimpleSublist()} s.gcbMu.Lock() if s.gcbOutMax = s.getOpts().JetStreamMaxCatchup; s.gcbOutMax == 0 { s.gcbOutMax = defaultMaxTotalCatchupOutBytes @@ -421,6 +424,7 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error { // TODO: Not currently reloadable. atomic.StoreInt64(&js.queueLimit, s.getOpts().JetStreamRequestQueueLimit) + atomic.StoreInt64(&js.infoQueueLimit, s.getOpts().JetStreamInfoQueueLimit) s.js.Store(js) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index e7b27cc4f5d..3eddd82ac7f 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -859,11 +859,19 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub // Copy the state. Note the JSAPI only uses the hdr index to piece apart the // header from the msg body. No other references are needed. // Check pending and warn if getting backed up. - pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa}) - limit := atomic.LoadInt64(&js.queueLimit) + var queue *ipQueue[*jsAPIRoutedReq] + var limit int64 + if js.infoSubs.HasInterest(subject) { + queue = s.jsAPIRoutedInfoReqs + limit = atomic.LoadInt64(&js.infoQueueLimit) + } else { + queue = s.jsAPIRoutedReqs + limit = atomic.LoadInt64(&js.queueLimit) + } + pending, _ := queue.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa}) if pending >= int(limit) { - s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending) - drained := int64(s.jsAPIRoutedReqs.drain()) + s.rateLimitFormatWarnf("%s limit reached, dropping %d requests", queue.name, pending) + drained := int64(queue.drain()) atomic.AddInt64(&js.apiInflight, -drained) s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{ @@ -883,29 +891,45 @@ func (s *Server) processJSAPIRoutedRequests() { defer s.grWG.Done() s.mu.RLock() - queue := s.jsAPIRoutedReqs + queue, infoqueue := s.jsAPIRoutedReqs, s.jsAPIRoutedInfoReqs client := &client{srv: s, kind: JETSTREAM} s.mu.RUnlock() js := s.getJetStream() + processFromQueue := func(ipq *ipQueue[*jsAPIRoutedReq]) { + // Only pop one item at a time here, otherwise if the system is recovering + // from queue buildup, then one worker will pull off all the tasks and the + // others will be starved of work. + if r, ok := ipq.popOne(); ok && r != nil { + client.pa = r.pa + start := time.Now() + r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg) + if dur := time.Since(start); dur >= readLoopReportThreshold { + s.Warnf("Internal subscription on %q took too long: %v", r.subject, dur) + } + atomic.AddInt64(&js.apiInflight, -1) + } + } + for { + // First select case is prioritizing queue, we will only fall through + // to the second select case that considers infoqueue if queue is empty. + // This effectively means infos are deprioritized. select { case <-queue.ch: - // Only pop one item at a time here, otherwise if the system is recovering - // from queue buildup, then one worker will pull off all the tasks and the - // others will be starved of work. - for r, ok := queue.popOne(); ok && r != nil; r, ok = queue.popOne() { - client.pa = r.pa - start := time.Now() - r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg) - if dur := time.Since(start); dur >= readLoopReportThreshold { - s.Warnf("Internal subscription on %q took too long: %v", r.subject, dur) - } - atomic.AddInt64(&js.apiInflight, -1) - } + processFromQueue(queue) case <-s.quitCh: return + default: + select { + case <-infoqueue.ch: + processFromQueue(infoqueue) + case <-queue.ch: + processFromQueue(queue) + case <-s.quitCh: + return + } } } } @@ -924,7 +948,8 @@ func (s *Server) setJetStreamExportSubs() error { if mp > maxProcs { mp = maxProcs } - s.jsAPIRoutedReqs = newIPQueue[*jsAPIRoutedReq](s, "Routed JS API Requests") + s.jsAPIRoutedReqs = newIPQueue[*jsAPIRoutedReq](s, "JetStream API queue") + s.jsAPIRoutedInfoReqs = newIPQueue[*jsAPIRoutedReq](s, "JetStream API info queue") for i := 0; i < mp; i++ { s.startGoRoutine(s.processJSAPIRoutedRequests) } @@ -940,16 +965,13 @@ func (s *Server) setJetStreamExportSubs() error { } // API handles themselves. + // infopairs are deprioritized compared to pairs in processJSAPIRoutedRequests. pairs := []struct { subject string handler msgHandler }{ - {JSApiAccountInfo, s.jsAccountInfoRequest}, {JSApiStreamCreate, s.jsStreamCreateRequest}, {JSApiStreamUpdate, s.jsStreamUpdateRequest}, - {JSApiStreams, s.jsStreamNamesRequest}, - {JSApiStreamList, s.jsStreamListRequest}, - {JSApiStreamInfo, s.jsStreamInfoRequest}, {JSApiStreamDelete, s.jsStreamDeleteRequest}, {JSApiStreamPurge, s.jsStreamPurgeRequest}, {JSApiStreamSnapshot, s.jsStreamSnapshotRequest}, @@ -962,23 +984,40 @@ func (s *Server) setJetStreamExportSubs() error { {JSApiConsumerCreateEx, s.jsConsumerCreateRequest}, {JSApiConsumerCreate, s.jsConsumerCreateRequest}, {JSApiDurableCreate, s.jsConsumerCreateRequest}, - {JSApiConsumers, s.jsConsumerNamesRequest}, - {JSApiConsumerList, s.jsConsumerListRequest}, - {JSApiConsumerInfo, s.jsConsumerInfoRequest}, {JSApiConsumerDelete, s.jsConsumerDeleteRequest}, {JSApiConsumerPause, s.jsConsumerPauseRequest}, {JSApiConsumerUnpin, s.jsConsumerUnpinRequest}, } + infopairs := []struct { + subject string + handler msgHandler + }{ + {JSApiAccountInfo, s.jsAccountInfoRequest}, + {JSApiStreams, s.jsStreamNamesRequest}, + {JSApiStreamList, s.jsStreamListRequest}, + {JSApiStreamInfo, s.jsStreamInfoRequest}, + {JSApiConsumers, s.jsConsumerNamesRequest}, + {JSApiConsumerList, s.jsConsumerListRequest}, + {JSApiConsumerInfo, s.jsConsumerInfoRequest}, + } js.mu.Lock() defer js.mu.Unlock() - for _, p := range pairs { + // As well as populating js.apiSubs for the dispatch function to use, we + // will also populate js.infoSubs, so that the dispatch function can + // decide quickly whether or not the request is an info request or not. + for _, p := range append(infopairs, pairs...) { sub := &subscription{subject: []byte(p.subject), icb: p.handler} if err := js.apiSubs.Insert(sub); err != nil { return err } } + for _, p := range infopairs { + if err := js.infoSubs.Insert(p.subject, struct{}{}); err != nil { + return err + } + } return nil } diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index e74f3966743..79474e9edca 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -2977,10 +2977,13 @@ func TestJetStreamClusterAPILimitDefault(t *testing.T) { for _, s := range c.servers { s.optsMu.RLock() lim := s.opts.JetStreamRequestQueueLimit + ilim := s.opts.JetStreamInfoQueueLimit s.optsMu.RUnlock() require_Equal(t, lim, JSDefaultRequestQueueLimit) + require_Equal(t, ilim, JSDefaultRequestQueueLimit) require_Equal(t, atomic.LoadInt64(&s.getJetStream().queueLimit), JSDefaultRequestQueueLimit) + require_Equal(t, atomic.LoadInt64(&s.getJetStream().infoQueueLimit), JSDefaultRequestQueueLimit) } } @@ -5384,7 +5387,7 @@ func TestJetStreamClusterRoutedAPIRecoverPerformance(t *testing.T) { require_NoError(t, nc.PublishMsg(msg)) } checkFor(t, 5*time.Second, 25*time.Millisecond, func() error { - if queued := leader.jsAPIRoutedReqs.len(); queued != count { + if queued := leader.jsAPIRoutedInfoReqs.len(); queued != count { return fmt.Errorf("expected %d queued requests, got %d", count, queued) } return nil diff --git a/server/monitor.go b/server/monitor.go index c6a932be73b..fb76acacbcc 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1570,8 +1570,12 @@ func (s *Server) updateJszVarz(js *jetStream, v *JetStreamVarz, doConfig bool) { v.Meta.Replicas = ci.Replicas } if ipq := s.jsAPIRoutedReqs; ipq != nil { - v.Meta.Pending = ipq.len() + v.Meta.PendingRequests = ipq.len() } + if ipq := s.jsAPIRoutedInfoReqs; ipq != nil { + v.Meta.PendingInfos = ipq.len() + } + v.Meta.Pending = v.Meta.PendingRequests + v.Meta.PendingInfos } } } @@ -3010,13 +3014,15 @@ type MetaSnapshotStats struct { // MetaClusterInfo shows information about the meta group. type MetaClusterInfo struct { - Name string `json:"name,omitempty"` // Name is the name of the cluster - Leader string `json:"leader,omitempty"` // Leader is the server name of the cluster leader - Peer string `json:"peer,omitempty"` // Peer is unique ID of the leader - Replicas []*PeerInfo `json:"replicas,omitempty"` // Replicas is a list of known peers - Size int `json:"cluster_size"` // Size is the known size of the cluster - Pending int `json:"pending"` // Pending is how many RAFT messages are not yet processed - Snapshot *MetaSnapshotStats `json:"snapshot"` // Snapshot contains meta snapshot statistics + Name string `json:"name,omitempty"` // Name is the name of the cluster + Leader string `json:"leader,omitempty"` // Leader is the server name of the cluster leader + Peer string `json:"peer,omitempty"` // Peer is unique ID of the leader + Replicas []*PeerInfo `json:"replicas,omitempty"` // Replicas is a list of known peers + Size int `json:"cluster_size"` // Size is the known size of the cluster + Pending int `json:"pending"` // Pending is how many RAFT messages are not yet processed + PendingRequests int `json:"pending_requests"` // PendingRequests is how many CRUD operations are queued for processing + PendingInfos int `json:"pending_infos"` // PendingInfos is how many info operations are queued for processing + Snapshot *MetaSnapshotStats `json:"snapshot"` // Snapshot contains meta snapshot statistics } // JSInfo has detailed information on JetStream. @@ -3239,8 +3245,12 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) { jsi.Meta.Replicas = ci.Replicas } if ipq := s.jsAPIRoutedReqs; ipq != nil { - jsi.Meta.Pending = ipq.len() + jsi.Meta.PendingRequests = ipq.len() + } + if ipq := s.jsAPIRoutedInfoReqs; ipq != nil { + jsi.Meta.PendingInfos = ipq.len() } + jsi.Meta.Pending = jsi.Meta.PendingRequests + jsi.Meta.PendingInfos // Add meta snapshot stats jsi.Meta.Snapshot = &MetaSnapshotStats{ PendingEntries: entries, diff --git a/server/opts.go b/server/opts.go index eac71885e29..d42f4b1eded 100644 --- a/server/opts.go +++ b/server/opts.go @@ -387,6 +387,7 @@ type Options struct { JetStreamTpm JSTpmOpts JetStreamMaxCatchup int64 JetStreamRequestQueueLimit int64 + JetStreamInfoQueueLimit int64 JetStreamMetaCompact uint64 JetStreamMetaCompactSize uint64 JetStreamMetaCompactSync bool @@ -2641,6 +2642,12 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} } opts.JetStreamRequestQueueLimit = lim + case "info_queue_limit": + lim, ok := mv.(int64) + if !ok { + return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} + } + opts.JetStreamInfoQueueLimit = lim case "meta_compact": thres, ok := mv.(int64) if !ok || thres < 0 { @@ -6006,6 +6013,9 @@ func setBaselineOptions(opts *Options) { if opts.JetStreamRequestQueueLimit <= 0 { opts.JetStreamRequestQueueLimit = JSDefaultRequestQueueLimit } + if opts.JetStreamInfoQueueLimit <= 0 { + opts.JetStreamInfoQueueLimit = opts.JetStreamRequestQueueLimit + } } func getDefaultAuthTimeout(tls *tls.Config, tlsTimeout float64) float64 { diff --git a/server/opts_test.go b/server/opts_test.go index 942448a90ad..6e8bdf80dd3 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -76,6 +76,7 @@ func TestDefaultOptions(t *testing.T) { JetStreamMaxStore: -1, SyncInterval: 2 * time.Minute, JetStreamRequestQueueLimit: JSDefaultRequestQueueLimit, + JetStreamInfoQueueLimit: JSDefaultRequestQueueLimit, } opts := &Options{} diff --git a/server/server.go b/server/server.go index 707f79f6958..0fce7f51c96 100644 --- a/server/server.go +++ b/server/server.go @@ -367,7 +367,8 @@ type Server struct { syncOutSem chan struct{} // Queue to process JS API requests that come from routes (or gateways) - jsAPIRoutedReqs *ipQueue[*jsAPIRoutedReq] + jsAPIRoutedReqs *ipQueue[*jsAPIRoutedReq] + jsAPIRoutedInfoReqs *ipQueue[*jsAPIRoutedReq] // Delayed API responses. delayedAPIResponses *ipQueue[*delayedAPIResponse]