From 2c3782fe576088f7b47a7faef03d677ca3dbfdde Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Mon, 25 Aug 2025 12:12:02 +0200 Subject: [PATCH] [FIXED] Hold consumer lock when reading o.cfg.PauseUntil Signed-off-by: Maurice van Veen --- server/jetstream_api.go | 4 ++++ server/jetstream_cluster.go | 2 ++ 2 files changed, 6 insertions(+) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 42c2b1b2ef2..05b88aa94b4 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -4608,7 +4608,9 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun if o := stream.lookupConsumer(consumerName); o != nil { // If the consumer already exists then don't allow updating the PauseUntil, just set // it back to whatever the current configured value is. + o.mu.RLock() req.Config.PauseUntil = o.cfg.PauseUntil + o.mu.RUnlock() } // Initialize/update asset version metadata. @@ -4629,9 +4631,11 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.initialInfo()) s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + o.mu.RLock() if o.cfg.PauseUntil != nil && !o.cfg.PauseUntil.IsZero() && time.Now().Before(*o.cfg.PauseUntil) { o.sendPauseAdvisoryLocked(&o.cfg) } + o.mu.RUnlock() } // Request for the list of all consumer names. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 471132fc7c5..3abf587d3e1 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5500,9 +5500,11 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) err // Only send a pause advisory on consumer create if we're // actually paused. The timer would have been kicked by now // by the call to o.setLeader() above. + o.mu.RLock() if isLeader && o.cfg.PauseUntil != nil && !o.cfg.PauseUntil.IsZero() && time.Now().Before(*o.cfg.PauseUntil) { o.sendPauseAdvisoryLocked(&o.cfg) } + o.mu.RUnlock() return nil }