diff --git a/server/accounts.go b/server/accounts.go index 44afc2377ae..72f6a8c64e5 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -3801,13 +3801,18 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim } } - for i, c := range clients { + count := 0 + for _, c := range clients { a.mu.RLock() - exceeded := a.mconns != jwt.NoLimit && i >= int(a.mconns) + exceeded := a.mconns != jwt.NoLimit && count >= int(a.mconns) a.mu.RUnlock() - if exceeded { - c.maxAccountConnExceeded() - continue + // Only kick non-internal clients. + if !isInternalClient(c.kind) { + if exceeded { + c.maxAccountConnExceeded() + continue + } + count++ } c.mu.Lock() c.applyAccountLimits() diff --git a/server/client.go b/server/client.go index 82b8caf45e7..64c0c3b9429 100644 --- a/server/client.go +++ b/server/client.go @@ -56,6 +56,11 @@ const ( ACCOUNT ) +// Internal clients. kind should be SYSTEM, JETSTREAM or ACCOUNT +func isInternalClient(kind int) bool { + return kind == SYSTEM || kind == JETSTREAM || kind == ACCOUNT +} + // Extended type of a CLIENT connection. This is returned by c.clientType() // and indicate what type of client connection we are dealing with. // If invoked on a non CLIENT connection, NON_CLIENT type is returned. @@ -2782,7 +2787,7 @@ func (c *client) processSubEx(subject, queue, bsid []byte, cb msgHandler, noForw // This check does not apply to SYSTEM or JETSTREAM or ACCOUNT clients (because they don't have a `nc`...) // When a connection is closed though, we set c.subs to nil. So check for the map to not be nil. - if (c.isClosed() && (kind != SYSTEM && kind != JETSTREAM && kind != ACCOUNT)) || (c.subs == nil) { + if (c.isClosed() && !isInternalClient(kind)) || (c.subs == nil) { c.mu.Unlock() return nil, ErrConnectionClosed } @@ -4680,9 +4685,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, // Check for JetStream encoded reply subjects. // For now these will only be on $JS.ACK prefixed reply subjects. var remapped bool - if len(creply) > 0 && - c.kind != CLIENT && c.kind != SYSTEM && c.kind != JETSTREAM && c.kind != ACCOUNT && - bytes.HasPrefix(creply, []byte(jsAckPre)) { + if len(creply) > 0 && c.kind != CLIENT && !isInternalClient(c.kind) && bytes.HasPrefix(creply, []byte(jsAckPre)) { // We need to rewrite the subject and the reply. // But, we must be careful that the stream name, consumer name, and subject can contain '@' characters. // JS ACK contains at least 8 dots, find the first @ after this prefix. diff --git a/server/jwt_test.go b/server/jwt_test.go index 97f224b2e99..3e1de983f6c 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -7221,3 +7221,63 @@ func TestJWTUpdateAccountClaimsStreamAndServiceImportDeadlock(t *testing.T) { }) } } + +func TestJWTJetStreamClientsExcludedForMaxConnsUpdate(t *testing.T) { + sysKp, syspub := createKey(t) + sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) + sysCreds := newUser(t, sysKp) + + accKp, accPub := createKey(t) + accClaim := jwt.NewAccountClaims(accPub) + accClaim.Name = "acc" + accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{ + DiskStorage: 1100, MemoryStorage: 0, Consumer: 2, Streams: 2} + accClaim.Limits.Conn = 5 + accJwt1 := encodeClaim(t, accClaim, accPub) + accCreds := newUser(t, accKp) + + storeDir := t.TempDir() + + dirSrv := t.TempDir() + cf := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + server_name: s1 + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + leaf { + listen: 127.0.0.1:-1 + } + operator: %s + system_account: %s + resolver: { + type: full + dir: '%s' + } + `, storeDir, ojwt, syspub, dirSrv))) + + s, _ := RunServerWithConfig(cf) + defer s.Shutdown() + + updateJwt(t, s.ClientURL(), sysCreds, sysJwt, 1) + updateJwt(t, s.ClientURL(), sysCreds, accJwt1, 1) + + nc, js := jsClientConnectURL(t, s.ClientURL(), nats.UserCredentials(accCreds)) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 1, Subjects: []string{"foo"}}) + require_NoError(t, err) + + _, err = js.Publish("foo", nil) + require_NoError(t, err) + + accClaim.Limits.Conn = 1 + accJwt1 = encodeClaim(t, accClaim, accPub) + updateJwt(t, s.ClientURL(), sysCreds, accJwt1, 1) + + // Manually reconnect. + nc.Close() + nc, js = jsClientConnectURL(t, s.ClientURL(), nats.UserCredentials(accCreds)) + defer nc.Close() + + _, err = js.Publish("foo", nil) + require_NoError(t, err) +} diff --git a/server/server.go b/server/server.go index 8b688468771..ce878d74d36 100644 --- a/server/server.go +++ b/server/server.go @@ -1837,9 +1837,9 @@ func (s *Server) createInternalAccountClient() *client { return s.createInternalClient(ACCOUNT) } -// Internal clients. kind should be SYSTEM or JETSTREAM +// Internal clients. kind should be SYSTEM, JETSTREAM or ACCOUNT func (s *Server) createInternalClient(kind int) *client { - if kind != SYSTEM && kind != JETSTREAM && kind != ACCOUNT { + if !isInternalClient(kind) { return nil } now := time.Now()