Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}

mset.mu.RLock()
s, jsa, tierName, cfg, acc := mset.srv, mset.jsa, mset.tier, mset.cfg, mset.acc
s, jsa, cfg, acc := mset.srv, mset.jsa, mset.cfg, mset.acc
retention := cfg.Retention
mset.mu.RUnlock()

Expand All @@ -732,21 +732,19 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
return nil, NewJSConsumerConfigRequiredError()
}

jsa.usageMu.RLock()
selectedLimits, limitsFound := jsa.limits[tierName]
jsa.usageMu.RUnlock()
if !limitsFound {
selectedLimits, _, _, _ := acc.selectLimits(config.replicas(&cfg))
if selectedLimits == nil {
return nil, NewJSNoLimitsError()
}

srvLim := &s.getOpts().JetStreamLimits
// Make sure we have sane defaults. Do so with the JS lock, otherwise a
// badly timed meta snapshot can result in a race condition.
mset.js.mu.Lock()
setConsumerConfigDefaults(config, &mset.cfg, srvLim, &selectedLimits)
setConsumerConfigDefaults(config, &mset.cfg, srvLim, selectedLimits)
mset.js.mu.Unlock()

if err := checkConsumerCfg(config, srvLim, &cfg, acc, &selectedLimits, isRecovering); err != nil {
if err := checkConsumerCfg(config, srvLim, &cfg, acc, selectedLimits, isRecovering); err != nil {
return nil, err
}
sampleFreq := 0
Expand Down
47 changes: 31 additions & 16 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,7 +1483,11 @@ func (a *Account) maxBytesLimits(cfg *StreamConfig) (bool, int64) {
return false, 0
}
jsa.usageMu.RLock()
selectedLimits, _, ok := jsa.selectLimits(cfg)
var replicas int
if cfg != nil {
replicas = cfg.Replicas
}
selectedLimits, _, ok := jsa.selectLimits(replicas)
jsa.usageMu.RUnlock()
if !ok {
return false, 0
Expand Down Expand Up @@ -1633,7 +1637,7 @@ func diffCheckedLimits(a, b map[string]JetStreamAccountLimits) map[string]JetStr
func (jsa *jsAccount) reservedStorage(tier string) (mem, store uint64) {
for _, mset := range jsa.streams {
cfg := &mset.cfg
if tier == _EMPTY_ || tier == tierName(cfg) && cfg.MaxBytes > 0 {
if tier == _EMPTY_ || tier == tierName(cfg.Replicas) && cfg.MaxBytes > 0 {
switch cfg.Storage {
case FileStorage:
store += uint64(cfg.MaxBytes)
Expand All @@ -1650,7 +1654,7 @@ func (jsa *jsAccount) reservedStorage(tier string) (mem, store uint64) {
func reservedStorage(sas map[string]*streamAssignment, tier string) (mem, store uint64) {
for _, sa := range sas {
cfg := sa.Config
if tier == _EMPTY_ || tier == tierName(cfg) && cfg.MaxBytes > 0 {
if tier == _EMPTY_ || tier == tierName(cfg.Replicas) && cfg.MaxBytes > 0 {
switch cfg.Storage {
case FileStorage:
store += uint64(cfg.MaxBytes)
Expand Down Expand Up @@ -1738,17 +1742,29 @@ func (a *Account) JetStreamUsage() JetStreamAccountStats {
stats.ReservedMemory, stats.ReservedStore = reservedStorage(sas, _EMPTY_)
}
for _, sa := range sas {
stats.Consumers += len(sa.consumers)
if !defaultTier {
tier := tierName(sa.Config)
u, ok := stats.Tiers[tier]
if defaultTier {
stats.Consumers += len(sa.consumers)
} else {
stats.Streams++
streamTier := tierName(sa.Config.Replicas)
su, ok := stats.Tiers[streamTier]
if !ok {
u = JetStreamTier{}
su = JetStreamTier{}
}
su.Streams++
stats.Tiers[streamTier] = su

// Now consumers, check each since could be different tiers.
for _, ca := range sa.consumers {
stats.Consumers++
consumerTier := tierName(ca.Config.replicas(sa.Config))
cu, ok := stats.Tiers[consumerTier]
if !ok {
cu = JetStreamTier{}
}
cu.Consumers++
stats.Tiers[consumerTier] = cu
}
u.Streams++
stats.Streams++
u.Consumers += len(sa.consumers)
stats.Tiers[tier] = u
}
}
} else {
Expand Down Expand Up @@ -2132,9 +2148,8 @@ func (js *jetStream) limitsExceeded(storeType StorageType) bool {
return js.wouldExceedLimits(storeType, 0)
}

func tierName(cfg *StreamConfig) string {
func tierName(replicas int) string {
// TODO (mh) this is where we could select based off a placement tag as well "qos:tier"
replicas := cfg.Replicas
if replicas == 0 {
replicas = 1
}
Expand All @@ -2154,11 +2169,11 @@ func (jsa *jsAccount) jetStreamAndClustered() (*jetStream, bool) {
}

// jsa.usageMu read lock should be held.
func (jsa *jsAccount) selectLimits(cfg *StreamConfig) (JetStreamAccountLimits, string, bool) {
func (jsa *jsAccount) selectLimits(replicas int) (JetStreamAccountLimits, string, bool) {
if selectedLimits, ok := jsa.limits[_EMPTY_]; ok {
return selectedLimits, _EMPTY_, true
}
tier := tierName(cfg)
tier := tierName(replicas)
if selectedLimits, ok := jsa.limits[tier]; ok {
return selectedLimits, tier, true
}
Expand Down
6 changes: 5 additions & 1 deletion server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3319,7 +3319,11 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account,
}

func (acc *Account) jsNonClusteredStreamLimitsCheck(cfg *StreamConfig) *ApiError {
selectedLimits, tier, jsa, apiErr := acc.selectLimits(cfg)
var replicas int
if cfg != nil {
replicas = cfg.Replicas
}
selectedLimits, tier, jsa, apiErr := acc.selectLimits(replicas)
if apiErr != nil {
return apiErr
}
Expand Down
14 changes: 9 additions & 5 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6000,7 +6000,7 @@ func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) (*r
return nil, errs
}

func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, string, *jsAccount, *ApiError) {
func (acc *Account) selectLimits(replicas int) (*JetStreamAccountLimits, string, *jsAccount, *ApiError) {
// Grab our jetstream account info.
acc.mu.RLock()
jsa := acc.js
Expand All @@ -6011,7 +6011,7 @@ func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, st
}

jsa.usageMu.RLock()
selectedLimits, tierName, ok := jsa.selectLimits(cfg)
selectedLimits, tierName, ok := jsa.selectLimits(replicas)
jsa.usageMu.RUnlock()

if !ok {
Expand All @@ -6022,7 +6022,11 @@ func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, st

// Read lock needs to be held
func (js *jetStream) jsClusteredStreamLimitsCheck(acc *Account, cfg *StreamConfig) *ApiError {
selectedLimits, tier, _, apiErr := acc.selectLimits(cfg)
var replicas int
if cfg != nil {
replicas = cfg.Replicas
}
selectedLimits, tier, _, apiErr := acc.selectLimits(replicas)
if apiErr != nil {
return apiErr
}
Expand Down Expand Up @@ -7159,7 +7163,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
selectedLimits, _, _, apiErr := acc.selectLimits(&streamCfg)
selectedLimits, _, _, apiErr := acc.selectLimits(cfg.replicas(&streamCfg))
if apiErr != nil {
resp.Error = apiErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
Expand Down Expand Up @@ -7216,7 +7220,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
// If the consumer name is specified and we think it already exists, then
// we're likely updating an existing consumer, so don't count it. Otherwise
// we will incorrectly return NewJSMaximumConsumersLimitError for an update.
if oname != "" && cn == oname && sa.consumers[oname] != nil {
if oname != _EMPTY_ && cn == oname && sa.consumers[oname] != nil {
continue
}
}
Expand Down
72 changes: 72 additions & 0 deletions server/jetstream_jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1460,3 +1460,75 @@ func TestJetStreamJWTHAStorageLimitsOnScaleAndUpdate(t *testing.T) {
require_Equal(t, r3.ReservedMemory, 22*1024) // TEST9
require_Equal(t, r3.ReservedStore, 5*1024*1024) // TEST1-TEST6
}

func TestJetStreamJWTClusteredTiersR3StreamWithR1ConsumersAndAccounting(t *testing.T) {
sysKp, syspub := createKey(t)
sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub)
newUser(t, sysKp)

accKp, aExpPub := createKey(t)
accClaim := jwt.NewAccountClaims(aExpPub)
accClaim.Name = "acc"
accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{
DiskStorage: 1100, Consumer: 10, Streams: 1}
accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{
DiskStorage: 1100, Consumer: 1, Streams: 1}
accJwt := encodeClaim(t, accClaim, aExpPub)
accCreds := newUser(t, accKp)
tmlp := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
leaf {
listen: 127.0.0.1:-1
}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
` + fmt.Sprintf(`
operator: %s
system_account: %s
resolver = MEMORY
resolver_preload = {
%s : %s
%s : %s
}
`, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt)

c := createJetStreamClusterWithTemplate(t, tmlp, "cluster", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer(), nats.UserCredentials(accCreds))
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Replicas: 3,
})
require_NoError(t, err)

// Now make sure we can add in 10 R1 consumers.
for i := 1; i <= 10; i++ {
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Name: fmt.Sprintf("C-%d", i),
AckPolicy: nats.AckExplicitPolicy,
Replicas: 1,
})
require_NoError(t, err)
}

info, err := js.AccountInfo()
require_NoError(t, err)

// Make sure we account for these properly.
r1 := info.Tiers["R1"]
r3 := info.Tiers["R3"]

require_Equal(t, r1.Streams, 0)
require_Equal(t, r1.Consumers, 10)
require_Equal(t, r3.Streams, 1)
require_Equal(t, r3.Consumers, 0)
}
8 changes: 4 additions & 4 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
}
}
jsa.usageMu.RLock()
selected, tier, hasTier := jsa.selectLimits(&cfg)
selected, tier, hasTier := jsa.selectLimits(cfg.Replicas)
jsa.usageMu.RUnlock()
reserved := int64(0)
if !isClustered {
Expand Down Expand Up @@ -1672,9 +1672,9 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*Str
jsa.mu.RLock()
acc := jsa.account
jsa.usageMu.RLock()
selected, tier, hasTier := jsa.selectLimits(&cfg)
selected, tier, hasTier := jsa.selectLimits(cfg.Replicas)
if !hasTier && old.Replicas != cfg.Replicas {
selected, tier, hasTier = jsa.selectLimits(old)
selected, tier, hasTier = jsa.selectLimits(old.Replicas)
}
jsa.usageMu.RUnlock()
reserved := int64(0)
Expand Down Expand Up @@ -1909,7 +1909,7 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)

js := mset.js

if targetTier := tierName(cfg); mset.tier != targetTier {
if targetTier := tierName(cfg.Replicas); mset.tier != targetTier {
// In cases such as R1->R3, only one update is needed
jsa.usageMu.RLock()
_, ok := jsa.limits[targetTier]
Expand Down