Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2151,6 +2151,10 @@ func (jsa *jsAccount) storageTotals() (uint64, uint64) {
}

func (jsa *jsAccount) limitsExceeded(storeType StorageType, tierName string, replicas int) (bool, *ApiError) {
return jsa.wouldExceedLimits(storeType, tierName, replicas, _EMPTY_, nil, nil)
}

func (jsa *jsAccount) wouldExceedLimits(storeType StorageType, tierName string, replicas int, subj string, hdr, msg []byte) (bool, *ApiError) {
jsa.usageMu.RLock()
defer jsa.usageMu.RUnlock()

Expand All @@ -2164,24 +2168,31 @@ func (jsa *jsAccount) limitsExceeded(storeType StorageType, tierName string, rep
return false, nil
}
r := int64(replicas)
if r < 1 || tierName == _EMPTY_ {
// Make sure replicas is correct.
if r < 1 {
r = 1
}
// This is for limits. If we have no tier, consider all to be flat, vs tiers like R3 where we want to scale limit by replication.
lr := r
if tierName == _EMPTY_ {
lr = 1
}

// Since tiers are flat we need to scale limit up by replicas when checking.
if storeType == MemoryStorage {
totalMem := inUse.total.mem
if selectedLimits.MemoryMaxStreamBytes > 0 && totalMem > selectedLimits.MemoryMaxStreamBytes*r {
totalMem := inUse.total.mem + (int64(memStoreMsgSize(subj, hdr, msg)) * r)
if selectedLimits.MemoryMaxStreamBytes > 0 && totalMem > selectedLimits.MemoryMaxStreamBytes*lr {
return true, nil
}
if selectedLimits.MaxMemory >= 0 && totalMem > selectedLimits.MaxMemory*r {
if selectedLimits.MaxMemory >= 0 && totalMem > selectedLimits.MaxMemory*lr {
return true, nil
}
} else {
totalStore := inUse.total.store
if selectedLimits.StoreMaxStreamBytes > 0 && totalStore > selectedLimits.StoreMaxStreamBytes*r {
totalStore := inUse.total.store + (int64(fileStoreMsgSize(subj, hdr, msg)) * r)
if selectedLimits.StoreMaxStreamBytes > 0 && totalStore > selectedLimits.StoreMaxStreamBytes*lr {
return true, nil
}
if selectedLimits.MaxStore >= 0 && totalStore > selectedLimits.MaxStore*r {
if selectedLimits.MaxStore >= 0 && totalStore > selectedLimits.MaxStore*lr {
return true, nil
}
}
Expand Down
46 changes: 5 additions & 41 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7501,7 +7501,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
mset.mu.RLock()
canRespond := !mset.cfg.NoAck && len(reply) > 0
name, stype, store := mset.cfg.Name, mset.cfg.Storage, mset.store
s, js, jsa, st, r, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, int64(mset.cfg.Replicas), mset.tier, mset.outq, mset.node
s, js, jsa, st, r, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node
maxMsgSize, lseq, clfs := int(mset.cfg.MaxMsgSize), mset.lseq, mset.clfs
isLeader, isSealed := mset.isLeader(), mset.cfg.Sealed
mset.mu.RUnlock()
Expand Down Expand Up @@ -7554,50 +7554,14 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
}

// Check here pre-emptively if we have exceeded our account limits.
var exceeded bool
jsa.usageMu.Lock()
jsaLimits, ok := jsa.limits[tierName]
if !ok {
jsa.usageMu.Unlock()
err := fmt.Errorf("no JetStream resource limits found account: %q", jsa.acc().Name)
s.RateLimitWarnf(err.Error())
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
resp.Error = NewJSNoLimitsError()
response, _ = json.Marshal(resp)
outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0))
if exceeded, err := jsa.wouldExceedLimits(st, tierName, r, subject, hdr, msg); exceeded {
if err == nil {
err = NewJSAccountResourcesExceededError()
}
return err
}
t, ok := jsa.usage[tierName]
if !ok {
t = &jsaStorage{}
jsa.usage[tierName] = t
}
// Make sure replicas is correct.
if r < 1 {
r = 1
}
// This is for limits. If we have no tier, consider all to be flat, vs tiers like R3 where we want to scale limit by replication.
lr := r
if tierName == _EMPTY_ {
lr = 1
}
// Tiers are flat, meaning the limit for R3 will be 100GB, not 300GB, so compare to total but adjust limits.
if st == MemoryStorage && jsaLimits.MaxMemory > 0 {
exceeded = t.total.mem+(int64(memStoreMsgSize(subject, hdr, msg))*r) > (jsaLimits.MaxMemory * lr)
} else if jsaLimits.MaxStore > 0 {
exceeded = t.total.store+(int64(fileStoreMsgSize(subject, hdr, msg))*r) > (jsaLimits.MaxStore * lr)
}
jsa.usageMu.Unlock()

// If we have exceeded our account limits go ahead and return.
if exceeded {
err := fmt.Errorf("JetStream resource limits exceeded for account: %q", jsa.acc().Name)
s.RateLimitWarnf(err.Error())
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
resp.Error = NewJSAccountResourcesExceededError()
resp.Error = err
response, _ = json.Marshal(resp)
outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0))
}
Expand Down
Loading