diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 36ef47660ae..860e37a4a2d 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -3437,8 +3437,10 @@ func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account } // First check if the stream and consumer is there. + js.mu.RLock() sa := js.streamAssignment(acc.Name, stream) if sa == nil { + js.mu.RUnlock() resp.Error = NewJSStreamNotFoundError(Unless(err)) s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return @@ -3446,10 +3448,12 @@ func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account ca, ok := sa.consumers[consumer] if !ok || ca == nil { + js.mu.RUnlock() resp.Error = NewJSConsumerNotFoundError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } + js.mu.RUnlock() // Then check if we are the leader. mset, err := acc.lookupStream(stream) @@ -4886,8 +4890,10 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account consumer := consumerNameFromSubject(subject) if isClustered { + js.mu.RLock() sa := js.streamAssignment(acc.Name, stream) if sa == nil { + js.mu.RUnlock() resp.Error = NewJSStreamNotFoundError(Unless(err)) s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return @@ -4895,12 +4901,14 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account ca, ok := sa.consumers[consumer] if !ok || ca == nil { + js.mu.RUnlock() resp.Error = NewJSConsumerNotFoundError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } nca := *ca + js.mu.RUnlock() pauseUTC := req.PauseUntil.UTC() if !pauseUTC.IsZero() { nca.Config.PauseUntil = &pauseUTC diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 15cf5809d44..83021cf98e2 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4246,8 +4246,10 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { return } + js.mu.Lock() sa := js.streamAssignment(accName, stream) if sa == nil { + js.mu.Unlock() s.Debugf("Consumer create failed, could not locate stream '%s > %s'", accName, stream) return } @@ -4259,7 +4261,6 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { var wasExisting bool // Check if we have an existing consumer assignment. - js.mu.Lock() if sa.consumers == nil { sa.consumers = make(map[string]*consumerAssignment) } else if oca := sa.consumers[ca.Name]; oca != nil { diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index c4df4e68bda..21a13769e25 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -2075,11 +2075,13 @@ func TestJetStreamClusterMaxConsumersMultipleConcurrentRequests(t *testing.T) { metaLeader := c.leader() mjs := metaLeader.getJetStream() + mjs.mu.RLock() sa := mjs.streamAssignment(globalAccountName, "MAXCC") require_NotNil(t, sa) for _, ca := range sa.consumers { require_False(t, ca.pending) } + mjs.mu.RUnlock() } func TestJetStreamClusterAccountMaxStreamsAndConsumersMultipleConcurrentRequests(t *testing.T) {