Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4058,7 +4058,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) {
js.mu.Unlock()

// Need to stop the stream, we can't keep running with an old config.
acc, err := s.LookupAccount(accName)
acc, err := s.lookupOrFetchAccount(accName, isMember)
if err != nil {
return
}
Expand All @@ -4072,7 +4072,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) {
}
js.mu.Unlock()

acc, err := s.LookupAccount(accName)
acc, err := s.lookupOrFetchAccount(accName, isMember)
if err != nil {
ll := fmt.Sprintf("Account [%s] lookup for stream create failed: %v", accName, err)
if isMember {
Expand Down Expand Up @@ -4187,7 +4187,7 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) {
js.mu.Unlock()

// Need to stop the stream, we can't keep running with an old config.
acc, err := s.LookupAccount(accName)
acc, err := s.lookupOrFetchAccount(accName, isMember)
if err != nil {
return
}
Expand All @@ -4201,9 +4201,14 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) {
}
js.mu.Unlock()

acc, err := s.LookupAccount(accName)
acc, err := s.lookupOrFetchAccount(accName, isMember)
if err != nil {
s.Warnf("Update Stream Account %s, error on lookup: %v", accName, err)
ll := fmt.Sprintf("Update Stream Account %s, error on lookup: %v", accName, err)
if isMember {
s.Warnf(ll)
} else {
s.Debugf(ll)
}
return
}

Expand Down Expand Up @@ -4876,7 +4881,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
// Be conservative by protecting the whole stream, even if just one consumer is unsupported.
// This ensures it's safe, even with Interest-based retention where it would otherwise
// continue accepting but dropping messages.
acc, err := s.LookupAccount(accName)
acc, err := s.lookupOrFetchAccount(accName, isMember)
if err != nil {
return
}
Expand All @@ -4890,7 +4895,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
}
js.mu.Unlock()

acc, err := s.LookupAccount(accName)
acc, err := s.lookupOrFetchAccount(accName, isMember)
if err != nil {
ll := fmt.Sprintf("Account [%s] lookup for consumer create failed: %v", accName, err)
if isMember {
Expand Down Expand Up @@ -5032,7 +5037,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state

acc, err := s.LookupAccount(accName)
if err != nil {
s.Warnf("JetStream cluster failed to lookup axccount %q: %v", accName, err)
s.Warnf("JetStream cluster failed to lookup account %q: %v", accName, err)
return
}

Expand Down
59 changes: 59 additions & 0 deletions server/jetstream_jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2031,3 +2031,62 @@ func TestJetStreamJWTUpdateWithPreExistingStream(t *testing.T) {
return nil
})
}

func TestJetStreamAccountResolverNoFetchIfNotMember(t *testing.T) {
_, spub := createKey(t)
sysClaim := jwt.NewAccountClaims(spub)
sysClaim.Name = "SYS"
sysJwt := encodeClaim(t, sysClaim, spub)
kp, _ := nkeys.CreateAccount()
aPub, _ := kp.PublicKey()

templ := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 2GB, max_file_store: 2GB, store_dir: '%s'}

leaf {
listen: 127.0.0.1:-1
}

cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
`

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/" {
w.Write([]byte("ok"))
} else if strings.HasSuffix(r.URL.Path, spub) {
w.Write([]byte(sysJwt))
} else {
// Simulate some time being spent, but doesn't respond.
time.Sleep(250 * time.Millisecond)
}
}))
defer ts.Close()

c := createJetStreamClusterWithTemplateAndModHook(t, templ, "R3S", 3,
func(serverName, clusterName, storeDir, conf string) string {
return conf + fmt.Sprintf(`
operator: %s
system_account: %s
resolver: URL("%s")`, ojwt, spub, ts.URL)
})
defer c.shutdown()

s := c.leader()
js := s.getJetStream()
ci := &ClientInfo{Cluster: "R3S", Account: aPub}
cfg := &StreamConfig{Name: "TEST", Subjects: []string{"foo"}}
sa := &streamAssignment{Client: ci, Config: cfg}
start := time.Now()
// Simulate some meta operations where this server is not a member.
// The server should not fetch the account from the resolver.
for range 5 {
js.processStreamAssignment(sa)
}
require_LessThan(t, time.Since(start), 100*time.Millisecond)
}