Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ script: ./scripts/runTestsOnTravis.sh $TEST_SUITE
deploy:
provider: script
cleanup: true
script: curl -sL http://git.io/goreleaser | bash
script: curl -sfL https://goreleaser.com/static/run | VERSION=v1.26.2 bash
on:
tags: true
condition: ($TRAVIS_GO_VERSION =~ 1.22) && ($TEST_SUITE = "compile")
6 changes: 4 additions & 2 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2950,8 +2950,10 @@ func (c *client) addShadowSub(sub *subscription, ime *ime, enact bool) (*subscri
return nil, fmt.Errorf(errs)
}

// Update our route map here.
c.srv.updateRemoteSubscription(im.acc, &nsub, 1)
// Update our route map here. But only if we are not a leaf node or a hub leafnode.
if c.kind != LEAF || c.isHubLeafNode() {
c.srv.updateRemoteSubscription(im.acc, &nsub, 1)
}

return &nsub, nil
}
Expand Down
12 changes: 5 additions & 7 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}

mset.mu.RLock()
s, jsa, tierName, cfg, acc := mset.srv, mset.jsa, mset.tier, mset.cfg, mset.acc
s, jsa, cfg, acc := mset.srv, mset.jsa, mset.cfg, mset.acc
retention := cfg.Retention
mset.mu.RUnlock()

Expand All @@ -726,21 +726,19 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
return nil, NewJSConsumerConfigRequiredError()
}

jsa.usageMu.RLock()
selectedLimits, limitsFound := jsa.limits[tierName]
jsa.usageMu.RUnlock()
if !limitsFound {
selectedLimits, _, _, _ := acc.selectLimits(config.replicas(&cfg))
if selectedLimits == nil {
return nil, NewJSNoLimitsError()
}

srvLim := &s.getOpts().JetStreamLimits
// Make sure we have sane defaults. Do so with the JS lock, otherwise a
// badly timed meta snapshot can result in a race condition.
mset.js.mu.Lock()
setConsumerConfigDefaults(config, &mset.cfg, srvLim, &selectedLimits)
setConsumerConfigDefaults(config, &mset.cfg, srvLim, selectedLimits)
mset.js.mu.Unlock()

if err := checkConsumerCfg(config, srvLim, &cfg, acc, &selectedLimits, isRecovering); err != nil {
if err := checkConsumerCfg(config, srvLim, &cfg, acc, selectedLimits, isRecovering); err != nil {
return nil, err
}
sampleFreq := 0
Expand Down
47 changes: 31 additions & 16 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,11 @@ func (a *Account) maxBytesLimits(cfg *StreamConfig) (bool, int64) {
return false, 0
}
jsa.usageMu.RLock()
selectedLimits, _, ok := jsa.selectLimits(cfg)
var replicas int
if cfg != nil {
replicas = cfg.Replicas
}
selectedLimits, _, ok := jsa.selectLimits(replicas)
jsa.usageMu.RUnlock()
if !ok {
return false, 0
Expand Down Expand Up @@ -1590,7 +1594,7 @@ func diffCheckedLimits(a, b map[string]JetStreamAccountLimits) map[string]JetStr
func (jsa *jsAccount) reservedStorage(tier string) (mem, store uint64) {
for _, mset := range jsa.streams {
cfg := &mset.cfg
if tier == _EMPTY_ || tier == tierName(cfg) && cfg.MaxBytes > 0 {
if tier == _EMPTY_ || tier == tierName(cfg.Replicas) && cfg.MaxBytes > 0 {
switch cfg.Storage {
case FileStorage:
store += uint64(cfg.MaxBytes)
Expand All @@ -1607,7 +1611,7 @@ func (jsa *jsAccount) reservedStorage(tier string) (mem, store uint64) {
func reservedStorage(sas map[string]*streamAssignment, tier string) (mem, store uint64) {
for _, sa := range sas {
cfg := sa.Config
if tier == _EMPTY_ || tier == tierName(cfg) && cfg.MaxBytes > 0 {
if tier == _EMPTY_ || tier == tierName(cfg.Replicas) && cfg.MaxBytes > 0 {
switch cfg.Storage {
case FileStorage:
store += uint64(cfg.MaxBytes)
Expand Down Expand Up @@ -1695,17 +1699,29 @@ func (a *Account) JetStreamUsage() JetStreamAccountStats {
stats.ReservedMemory, stats.ReservedStore = reservedStorage(sas, _EMPTY_)
}
for _, sa := range sas {
stats.Consumers += len(sa.consumers)
if !defaultTier {
tier := tierName(sa.Config)
u, ok := stats.Tiers[tier]
if defaultTier {
stats.Consumers += len(sa.consumers)
} else {
stats.Streams++
streamTier := tierName(sa.Config.Replicas)
su, ok := stats.Tiers[streamTier]
if !ok {
u = JetStreamTier{}
su = JetStreamTier{}
}
su.Streams++
stats.Tiers[streamTier] = su

// Now consumers, check each since could be different tiers.
for _, ca := range sa.consumers {
stats.Consumers++
consumerTier := tierName(ca.Config.replicas(sa.Config))
cu, ok := stats.Tiers[consumerTier]
if !ok {
cu = JetStreamTier{}
}
cu.Consumers++
stats.Tiers[consumerTier] = cu
}
u.Streams++
stats.Streams++
u.Consumers += len(sa.consumers)
stats.Tiers[tier] = u
}
}
} else {
Expand Down Expand Up @@ -2089,9 +2105,8 @@ func (js *jetStream) limitsExceeded(storeType StorageType) bool {
return js.wouldExceedLimits(storeType, 0)
}

func tierName(cfg *StreamConfig) string {
func tierName(replicas int) string {
// TODO (mh) this is where we could select based off a placement tag as well "qos:tier"
replicas := cfg.Replicas
if replicas == 0 {
replicas = 1
}
Expand All @@ -2111,11 +2126,11 @@ func (jsa *jsAccount) jetStreamAndClustered() (*jetStream, bool) {
}

// jsa.usageMu read lock should be held.
func (jsa *jsAccount) selectLimits(cfg *StreamConfig) (JetStreamAccountLimits, string, bool) {
func (jsa *jsAccount) selectLimits(replicas int) (JetStreamAccountLimits, string, bool) {
if selectedLimits, ok := jsa.limits[_EMPTY_]; ok {
return selectedLimits, _EMPTY_, true
}
tier := tierName(cfg)
tier := tierName(replicas)
if selectedLimits, ok := jsa.limits[tier]; ok {
return selectedLimits, tier, true
}
Expand Down
6 changes: 5 additions & 1 deletion server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3267,7 +3267,11 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account,
}

func (acc *Account) jsNonClusteredStreamLimitsCheck(cfg *StreamConfig) *ApiError {
selectedLimits, tier, jsa, apiErr := acc.selectLimits(cfg)
var replicas int
if cfg != nil {
replicas = cfg.Replicas
}
selectedLimits, tier, jsa, apiErr := acc.selectLimits(replicas)
if apiErr != nil {
return apiErr
}
Expand Down
14 changes: 9 additions & 5 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5986,7 +5986,7 @@ func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) (*r
return nil, errs
}

func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, string, *jsAccount, *ApiError) {
func (acc *Account) selectLimits(replicas int) (*JetStreamAccountLimits, string, *jsAccount, *ApiError) {
// Grab our jetstream account info.
acc.mu.RLock()
jsa := acc.js
Expand All @@ -5997,7 +5997,7 @@ func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, st
}

jsa.usageMu.RLock()
selectedLimits, tierName, ok := jsa.selectLimits(cfg)
selectedLimits, tierName, ok := jsa.selectLimits(replicas)
jsa.usageMu.RUnlock()

if !ok {
Expand All @@ -6008,7 +6008,11 @@ func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, st

// Read lock needs to be held
func (js *jetStream) jsClusteredStreamLimitsCheck(acc *Account, cfg *StreamConfig) *ApiError {
selectedLimits, tier, _, apiErr := acc.selectLimits(cfg)
var replicas int
if cfg != nil {
replicas = cfg.Replicas
}
selectedLimits, tier, _, apiErr := acc.selectLimits(replicas)
if apiErr != nil {
return apiErr
}
Expand Down Expand Up @@ -7145,7 +7149,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
selectedLimits, _, _, apiErr := acc.selectLimits(&streamCfg)
selectedLimits, _, _, apiErr := acc.selectLimits(cfg.replicas(&streamCfg))
if apiErr != nil {
resp.Error = apiErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
Expand Down Expand Up @@ -7202,7 +7206,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
// If the consumer name is specified and we think it already exists, then
// we're likely updating an existing consumer, so don't count it. Otherwise
// we will incorrectly return NewJSMaximumConsumersLimitError for an update.
if oname != "" && cn == oname && sa.consumers[oname] != nil {
if oname != _EMPTY_ && cn == oname && sa.consumers[oname] != nil {
continue
}
}
Expand Down
72 changes: 72 additions & 0 deletions server/jetstream_jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1460,3 +1460,75 @@ func TestJetStreamJWTHAStorageLimitsOnScaleAndUpdate(t *testing.T) {
require_Equal(t, r3.ReservedMemory, 22*1024) // TEST9
require_Equal(t, r3.ReservedStore, 5*1024*1024) // TEST1-TEST6
}

func TestJetStreamJWTClusteredTiersR3StreamWithR1ConsumersAndAccounting(t *testing.T) {
sysKp, syspub := createKey(t)
sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub)
newUser(t, sysKp)

accKp, aExpPub := createKey(t)
accClaim := jwt.NewAccountClaims(aExpPub)
accClaim.Name = "acc"
accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{
DiskStorage: 1100, Consumer: 10, Streams: 1}
accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{
DiskStorage: 1100, Consumer: 1, Streams: 1}
accJwt := encodeClaim(t, accClaim, aExpPub)
accCreds := newUser(t, accKp)
tmlp := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
leaf {
listen: 127.0.0.1:-1
}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
` + fmt.Sprintf(`
operator: %s
system_account: %s
resolver = MEMORY
resolver_preload = {
%s : %s
%s : %s
}
`, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt)

c := createJetStreamClusterWithTemplate(t, tmlp, "cluster", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer(), nats.UserCredentials(accCreds))
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Replicas: 3,
})
require_NoError(t, err)

// Now make sure we can add in 10 R1 consumers.
for i := 1; i <= 10; i++ {
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Name: fmt.Sprintf("C-%d", i),
AckPolicy: nats.AckExplicitPolicy,
Replicas: 1,
})
require_NoError(t, err)
}

info, err := js.AccountInfo()
require_NoError(t, err)

// Make sure we account for these properly.
r1 := info.Tiers["R1"]
r3 := info.Tiers["R3"]

require_Equal(t, r1.Streams, 0)
require_Equal(t, r1.Consumers, 10)
require_Equal(t, r3.Streams, 1)
require_Equal(t, r3.Consumers, 0)
}
82 changes: 82 additions & 0 deletions server/jetstream_leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1242,3 +1242,85 @@ func TestJetStreamLeafNodeSvcImportExportCycle(t *testing.T) {
_, err = js.Publish("foo", []byte("msg"))
require_NoError(t, err)
}

func TestJetStreamLeafNodeJSClusterMigrateRecovery(t *testing.T) {
tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: hub, store_dir:", 1)
c := createJetStreamCluster(t, tmpl, "hub", _EMPTY_, 3, 12232, true)
defer c.shutdown()

tmpl = strings.Replace(jsClusterTemplWithLeafNode, "store_dir:", "domain: leaf, store_dir:", 1)
lnc := c.createLeafNodesWithTemplateAndStartPort(tmpl, "leaf", 3, 23913)
defer lnc.shutdown()

lnc.waitOnClusterReady()
for _, s := range lnc.servers {
s.setJetStreamMigrateOnRemoteLeaf()
}

nc, _ := jsClientConnect(t, lnc.randomServer())
defer nc.Close()

ljs, err := nc.JetStream(nats.Domain("leaf"))
require_NoError(t, err)

// Create an asset in the leafnode cluster.
si, err := ljs.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
require_Equal(t, si.Cluster.Name, "leaf")
require_NotEqual(t, si.Cluster.Leader, noLeader)
require_Equal(t, len(si.Cluster.Replicas), 2)

// Count how many remotes each server in the leafnode cluster is
// supposed to have and then take them down.
remotes := map[*Server]int{}
for _, s := range lnc.servers {
remotes[s] += len(s.leafRemoteCfgs)
s.closeAndDisableLeafnodes()
checkLeafNodeConnectedCount(t, s, 0)
}

// The Raft nodes in the leafnode cluster now need some time to
// notice that they're no longer receiving AEs from a leader, as
// they should have been forced into observer mode. Check that
// this is the case.
time.Sleep(maxElectionTimeout)
for _, s := range lnc.servers {
s.rnMu.RLock()
for name, n := range s.raftNodes {
// We don't expect the metagroup to have turned into an
// observer but all other assets should have done.
if name == defaultMetaGroupName {
require_False(t, n.IsObserver())
} else {
require_True(t, n.IsObserver())
}
}
s.rnMu.RUnlock()
}

// Bring the leafnode connections back up.
for _, s := range lnc.servers {
s.reEnableLeafnodes()
checkLeafNodeConnectedCount(t, s, remotes[s])
}

// Wait for nodes to notice they are no longer in observer mode
// and to leave observer mode.
time.Sleep(maxElectionTimeout)
for _, s := range lnc.servers {
s.rnMu.RLock()
for _, n := range s.raftNodes {
require_False(t, n.IsObserver())
}
s.rnMu.RUnlock()
}

// Previously nodes would have left observer mode but then would
// have failed to elect a stream leader as they were stuck on a
// long election timer. Now this should work reliably.
lnc.waitOnStreamLeader(globalAccountName, "TEST")
}
Loading