From 4e467efb1c7375033afd3a8d89d78cb12d938a70 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 9 Jan 2025 15:02:33 +0000 Subject: [PATCH 1/2] Use `popOne` in JS API routed request workers Signed-off-by: Neil Twigg --- server/jetstream_api.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 072e3e21b0b..3d7a965093b 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -922,8 +922,10 @@ func (s *Server) processJSAPIRoutedRequests() { for { select { case <-queue.ch: - reqs := queue.pop() - for _, r := range reqs { + // Only pop one item at a time here, otherwise if the system is recovering + // from queue buildup, then one worker will pull off all the tasks and the + // others will be starved of work. + for r, ok := queue.popOne(); ok && r != nil; r, ok = queue.popOne() { client.pa = r.pa start := time.Now() r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg) @@ -932,7 +934,6 @@ func (s *Server) processJSAPIRoutedRequests() { } atomic.AddInt64(&js.apiInflight, -1) } - queue.recycle(&reqs) case <-s.quitCh: return } From f1e52bd8974b54e80c1965fa4c1a1ac0c98ec215 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 9 Jan 2025 16:00:04 +0000 Subject: [PATCH 2/2] Add `TestJetStreamClusterRoutedAPIRecoverPerformance` Signed-off-by: Neil Twigg --- server/jetstream_cluster_4_test.go | 66 ++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 2e33ccfa1a0..ce97f7cd5ab 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -5265,3 +5265,69 @@ func TestJetStreamClusterOnlyPublishAdvisoriesWhenInterest(t *testing.T) { // it should succeed. require_True(t, s1.publishAdvisory(s1.GlobalAccount(), subj, "test")) } + +func TestJetStreamClusterRoutedAPIRecoverPerformance(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, _ := jsClientConnect(t, c.randomNonLeader()) + defer nc.Close() + + // We only run 16 JetStream API workers. + mp := runtime.GOMAXPROCS(0) + if mp > 16 { + mp = 16 + } + + leader := c.leader() + ljs := leader.js.Load() + + // Take the JS lock, which allows the JS API queue to build up. + ljs.mu.Lock() + defer ljs.mu.Unlock() + + count := JSDefaultRequestQueueLimit - 1 + ch := make(chan *nats.Msg, count) + + inbox := nc.NewRespInbox() + _, err := nc.ChanSubscribe(inbox, ch) + require_NoError(t, err) + + // To ensure a fair starting line, we need to submit as many tasks as + // there are JS workers whilst holding the JS lock. This will ensure that + // each JS API worker is properly wedged. + msg := &nats.Msg{ + Subject: fmt.Sprintf(JSApiConsumerInfoT, "Doesnt", "Exist"), + Reply: "no_one_here", + } + for i := 0; i < mp; i++ { + require_NoError(t, nc.PublishMsg(msg)) + } + + // Then we want to submit a fixed number of tasks, big enough to fill + // the queue, so that we can measure them. + msg = &nats.Msg{ + Subject: fmt.Sprintf(JSApiConsumerInfoT, "Doesnt", "Exist"), + Reply: inbox, + } + for i := 0; i < count; i++ { + require_NoError(t, nc.PublishMsg(msg)) + } + checkFor(t, 5*time.Second, 25*time.Millisecond, func() error { + if queued := leader.jsAPIRoutedReqs.len(); queued != count { + return fmt.Errorf("expected %d queued requests, got %d", count, queued) + } + return nil + }) + + // Now we're going to release the lock and start timing. The workers + // will now race to clear the queues and we'll wait to see how long + // it takes for them all to respond. + start := time.Now() + ljs.mu.Unlock() + for i := 0; i < count; i++ { + <-ch + } + ljs.mu.Lock() + t.Logf("Took %s to clear %d items", time.Since(start), count) +}