diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 87ad2c60956..f110cf3f2a3 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4058,7 +4058,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) { js.mu.Unlock() // Need to stop the stream, we can't keep running with an old config. - acc, err := s.LookupAccount(accName) + acc, err := s.lookupOrFetchAccount(accName, isMember) if err != nil { return } @@ -4072,7 +4072,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) { } js.mu.Unlock() - acc, err := s.LookupAccount(accName) + acc, err := s.lookupOrFetchAccount(accName, isMember) if err != nil { ll := fmt.Sprintf("Account [%s] lookup for stream create failed: %v", accName, err) if isMember { @@ -4187,7 +4187,7 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) { js.mu.Unlock() // Need to stop the stream, we can't keep running with an old config. - acc, err := s.LookupAccount(accName) + acc, err := s.lookupOrFetchAccount(accName, isMember) if err != nil { return } @@ -4201,9 +4201,14 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) { } js.mu.Unlock() - acc, err := s.LookupAccount(accName) + acc, err := s.lookupOrFetchAccount(accName, isMember) if err != nil { - s.Warnf("Update Stream Account %s, error on lookup: %v", accName, err) + ll := fmt.Sprintf("Update Stream Account %s, error on lookup: %v", accName, err) + if isMember { + s.Warnf(ll) + } else { + s.Debugf(ll) + } return } @@ -4876,7 +4881,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { // Be conservative by protecting the whole stream, even if just one consumer is unsupported. // This ensures it's safe, even with Interest-based retention where it would otherwise // continue accepting but dropping messages. - acc, err := s.LookupAccount(accName) + acc, err := s.lookupOrFetchAccount(accName, isMember) if err != nil { return } @@ -4890,7 +4895,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { } js.mu.Unlock() - acc, err := s.LookupAccount(accName) + acc, err := s.lookupOrFetchAccount(accName, isMember) if err != nil { ll := fmt.Sprintf("Account [%s] lookup for consumer create failed: %v", accName, err) if isMember { @@ -5032,7 +5037,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state acc, err := s.LookupAccount(accName) if err != nil { - s.Warnf("JetStream cluster failed to lookup axccount %q: %v", accName, err) + s.Warnf("JetStream cluster failed to lookup account %q: %v", accName, err) return } diff --git a/server/jetstream_jwt_test.go b/server/jetstream_jwt_test.go index a365b9c269c..ce2a5ada195 100644 --- a/server/jetstream_jwt_test.go +++ b/server/jetstream_jwt_test.go @@ -2031,3 +2031,62 @@ func TestJetStreamJWTUpdateWithPreExistingStream(t *testing.T) { return nil }) } + +func TestJetStreamAccountResolverNoFetchIfNotMember(t *testing.T) { + _, spub := createKey(t) + sysClaim := jwt.NewAccountClaims(spub) + sysClaim.Name = "SYS" + sysJwt := encodeClaim(t, sysClaim, spub) + kp, _ := nkeys.CreateAccount() + aPub, _ := kp.PublicKey() + + templ := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 2GB, max_file_store: 2GB, store_dir: '%s'} + + leaf { + listen: 127.0.0.1:-1 + } + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } +` + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/" { + w.Write([]byte("ok")) + } else if strings.HasSuffix(r.URL.Path, spub) { + w.Write([]byte(sysJwt)) + } else { + // Simulate some time being spent, but doesn't respond. + time.Sleep(250 * time.Millisecond) + } + })) + defer ts.Close() + + c := createJetStreamClusterWithTemplateAndModHook(t, templ, "R3S", 3, + func(serverName, clusterName, storeDir, conf string) string { + return conf + fmt.Sprintf(` + operator: %s + system_account: %s + resolver: URL("%s")`, ojwt, spub, ts.URL) + }) + defer c.shutdown() + + s := c.leader() + js := s.getJetStream() + ci := &ClientInfo{Cluster: "R3S", Account: aPub} + cfg := &StreamConfig{Name: "TEST", Subjects: []string{"foo"}} + sa := &streamAssignment{Client: ci, Config: cfg} + start := time.Now() + // Simulate some meta operations where this server is not a member. + // The server should not fetch the account from the resolver. + for range 5 { + js.processStreamAssignment(sa) + } + require_LessThan(t, time.Since(start), 100*time.Millisecond) +}