Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4279,8 +4279,9 @@ func getHeader(key string, hdr []byte) []byte {

// For bytes.HasPrefix below.
var (
jsRequestNextPreB = []byte(jsRequestNextPre)
jsDirectGetPreB = []byte(jsDirectGetPre)
jsRequestNextPreB = []byte(jsRequestNextPre)
jsDirectGetPreB = []byte(jsDirectGetPre)
jsConsumerInfoPreB = []byte(JSApiConsumerInfoPre)
)

// processServiceImport is an internal callback when a subscription matches an imported service
Expand All @@ -4300,12 +4301,16 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
}
}

var checkJS, checkConsumerInfo bool

acc.mu.RLock()
var checkJS bool
shouldReturn := si.invalid || acc.sl == nil
if !shouldReturn && !isResponse && si.to == jsAllAPI {
if bytes.HasPrefix(c.pa.subject, jsDirectGetPreB) || bytes.HasPrefix(c.pa.subject, jsRequestNextPreB) {
checkJS = true
} else if len(c.pa.psi) == 0 && bytes.HasPrefix(c.pa.subject, jsConsumerInfoPreB) {
// Only check if we are clustered and expecting a reply.
checkConsumerInfo = len(c.pa.reply) > 0 && c.srv.JetStreamIsClustered()
}
}
siAcc := si.acc
Expand All @@ -4320,6 +4325,15 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
return
}

// Here we will do a fast check for consumer info only to check if it does not exists. This will spread the
// load to all servers with connected clients since service imports are processed at point of entry.
// Only call for clustered setups.
if checkConsumerInfo && si.se != nil && si.se.acc == c.srv.SystemAccount() {
if c.srv.jsConsumerProcessMissing(c, acc) {
return
}
}

mt, traceOnly := c.isMsgTraceEnabled()

var nrr []byte
Expand Down
4 changes: 4 additions & 0 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error {
if err := s.enableJetStreamClustering(); err != nil {
return err
}
// Set our atomic bool to clustered.
s.jsClustered.Store(true)
}

// Mark when we are up and running.
Expand Down Expand Up @@ -1020,6 +1022,8 @@ func (s *Server) shutdownJetStream() {
cc.c = nil
}
cc.meta = nil
// Set our atomic bool to false.
s.jsClustered.Store(false)
}
js.mu.Unlock()

Expand Down
63 changes: 61 additions & 2 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ const (

// JSApiConsumerInfo is for obtaining general information about a consumer.
// Will return JSON response.
JSApiConsumerInfo = "$JS.API.CONSUMER.INFO.*.*"
JSApiConsumerInfoT = "$JS.API.CONSUMER.INFO.%s.%s"
JSApiConsumerInfoPre = "$JS.API.CONSUMER.INFO."
JSApiConsumerInfo = "$JS.API.CONSUMER.INFO.*.*"
JSApiConsumerInfoT = "$JS.API.CONSUMER.INFO.%s.%s"

// JSApiConsumerDelete is the endpoint to delete consumers.
// Will return JSON response.
Expand Down Expand Up @@ -1033,6 +1034,15 @@ func (s *Server) sendAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply
s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
}

// Use the account acc to send actual result from non-system account.
func (s *Server) sendAPIErrResponseFromAccount(ci *ClientInfo, acc *Account, subject, reply, request, response string) {
acc.trackAPIErr()
if reply != _EMPTY_ {
s.sendInternalAccountMsg(acc, reply, response)
}
s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
}

const errRespDelay = 500 * time.Millisecond

type delayedAPIResponse struct {
Expand Down Expand Up @@ -4591,6 +4601,55 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account,
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}

// This will be a quick check on point of entry for a consumer that does
// not exist. If that is the case we will return the response and return
// true which will shortcut the service import to alleviate pressure on
// the JS API queues.
func (s *Server) jsConsumerProcessMissing(c *client, acc *Account) bool {
subject := bytesToString(c.pa.subject)
streamName, consumerName := streamNameFromSubject(subject), consumerNameFromSubject(subject)

// Check to make sure the consumer is assigned.
// All JS servers will have the meta information.
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return false
}
js.mu.RLock()
sa, ca := js.assignments(acc.Name, streamName, consumerName)
js.mu.RUnlock()

// If we have a consumer assignment return false here and let normally processing takeover.
if ca != nil {
return false
}

// We can't find the consumer, so mimic what would be the errors below.
var resp = JSApiConsumerInfoResponse{ApiResponse: ApiResponse{Type: JSApiConsumerInfoResponseType}}

// Need to make subject and reply real here for queued response processing.
subject = string(c.pa.subject)
reply := string(c.pa.reply)

ci := c.getClientInfo(true)

if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
s.sendAPIErrResponseFromAccount(ci, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
}
} else if sa == nil {
resp.Error = NewJSStreamNotFoundError()
s.sendAPIErrResponseFromAccount(ci, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
} else {
// If we are here the consumer is not present.
resp.Error = NewJSConsumerNotFoundError()
s.sendAPIErrResponseFromAccount(ci, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
}

return true
}

// Request for information about an consumer.
func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
if c == nil || !s.JetStreamEnabled() {
Expand Down
15 changes: 10 additions & 5 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,7 @@ func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) {
}

func (s *Server) JetStreamIsClustered() bool {
js := s.getJetStream()
if js == nil {
return false
}
return js.isClustered()
return s.jsClustered.Load()
}

func (s *Server) JetStreamIsLeader() bool {
Expand Down Expand Up @@ -4731,6 +4727,15 @@ func (js *jetStream) consumerAssignment(account, stream, consumer string) *consu
return nil
}

// Return both the stream and consumer assignments.
Comment thread
derekcollison marked this conversation as resolved.
// Lock should be held.
func (js *jetStream) assignments(account, stream, consumer string) (*streamAssignment, *consumerAssignment) {
if sa := js.streamAssignment(account, stream); sa != nil {
return sa, sa.consumers[consumer]
}
return nil, nil
}

// consumerAssigned informs us if this server has this consumer assigned.
func (jsa *jsAccount) consumerAssigned(stream, consumer string) bool {
jsa.mu.RLock()
Expand Down
15 changes: 9 additions & 6 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3412,7 +3412,6 @@ func TestJetStreamClusterExtendedAccountInfo(t *testing.T) {
// Go client will lag so use direct for now.
getAccountInfo := func() *nats.AccountInfo {
t.Helper()

info, err := js.AccountInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
Expand All @@ -3437,10 +3436,13 @@ func TestJetStreamClusterExtendedAccountInfo(t *testing.T) {
js.ConsumerInfo("TEST-2", "NO-CONSUMER")
js.ConsumerInfo("TEST-3", "NO-CONSUMER")

ai = getAccountInfo()
if ai.API.Errors != 4 {
t.Fatalf("Expected 4 API calls to be errors, got %d", ai.API.Errors)
}
checkFor(t, 2*time.Second, 250*time.Millisecond, func() error {
ai = getAccountInfo()
if ai.API.Errors != 4 {
return fmt.Errorf("Expected 4 API calls to be errors, got %d", ai.API.Errors)
}
return nil
})
}

func TestJetStreamClusterPeerRemovalAPI(t *testing.T) {
Expand Down Expand Up @@ -4319,7 +4321,8 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
if err := js.DeleteConsumer("NO-Q", "dlc"); !notAvailableErr(err) {
t.Fatalf("Expected an 'unavailable' error, got %v", err)
}
if _, err := js.ConsumerInfo("NO-Q", "dlc"); !notAvailableErr(err) {
// Since we did not create the consumer our bypass will respond from the local server.
if _, err := js.ConsumerInfo("NO-Q", "dlc"); err != nats.ErrConsumerNotFound {
t.Fatalf("Expected an 'unavailable' error, got %v", err)
}
// Listers
Expand Down
57 changes: 57 additions & 0 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11252,3 +11252,60 @@ func TestNoRaceJetStreamClusterLargeMetaSnapshotTiming(t *testing.T) {
require_NoError(t, n.InstallSnapshot(snap))
t.Logf("Took %v to snap meta with size of %v\n", time.Since(start), friendlyBytes(len(snap)))
}

func TestNoRaceJetStreamClusterInfoOnMissingConsumers(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3F", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Create a stream just so the consumer info processing misses on the consumer only.
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

done := make(chan bool)
pending := make(chan int, 1)

// Check to make sure we never have any pending on the API queue.
go func() {
ml := c.leader()
for {
select {
case <-done:
return
case <-time.After(100 * time.Millisecond):
qlen := ml.jsAPIRoutedReqs.len() + int(ml.jsAPIRoutedReqs.inProgress())
if qlen > 0 {
pending <- qlen
return
}
}
}
}()

wg := sync.WaitGroup{}
wg.Add(500)
for i := 0; i < 500; i++ {
go func() {
defer wg.Done()
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
// Check for non-existent consumers.
for c := 0; c < 1000; c++ {
_, err := js.ConsumerInfo("TEST", fmt.Sprintf("C-%d", c))
require_Error(t, err)
}
}()
}
wg.Wait()
close(done)
if len(pending) > 0 {
t.Fatalf("Saw API pending of %d, expected always 0", <-pending)
}
}
14 changes: 7 additions & 7 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,10 @@ type Server struct {
listenerErr error
gacc *Account
sys *internal
sysAcc atomic.Pointer[Account]
js atomic.Pointer[jetStream]
isMetaLeader atomic.Bool
jsClustered atomic.Bool
accounts sync.Map
tmpAccounts sync.Map // Temporarily stores accounts that are being built
activeAccounts int32
Expand Down Expand Up @@ -1349,6 +1351,7 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error)
if err == nil && s.sys != nil && acc != s.sys.account {
// sys.account.clients (including internal client)/respmap/etc... are transferred separately
s.sys.account = acc
s.sysAcc.Store(acc)
}
if err != nil {
return awcsti, fmt.Errorf("error resolving system account: %v", err)
Expand Down Expand Up @@ -1704,13 +1707,7 @@ func (s *Server) SetSystemAccount(accName string) error {

// SystemAccount returns the system account if set.
func (s *Server) SystemAccount() *Account {
var sacc *Account
s.mu.RLock()
if s.sys != nil {
sacc = s.sys.account
}
s.mu.RUnlock()
return sacc
return s.sysAcc.Load()
}

// GlobalAccount returns the global account.
Expand Down Expand Up @@ -1782,6 +1779,9 @@ func (s *Server) setSystemAccount(acc *Account) error {
s.sys.wg.Add(1)
s.mu.Unlock()

// Store in atomic for fast lookup.
s.sysAcc.Store(acc)

// Register with the account.
s.sys.client.registerWithAccount(acc)

Expand Down