diff --git a/server/accounts.go b/server/accounts.go index a9df6e0365a..548f6943ec4 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -3794,7 +3794,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim // If JetStream is enabled for this server we will call into configJetStream for the account // regardless of enabled or disabled. It handles both cases. if jsEnabled { - if err := s.configJetStream(a); err != nil { + if err := s.configJetStream(a, nil); err != nil { s.Errorf("Error configuring jetstream for account [%s]: %v", tl, err.Error()) a.mu.Lock() // Absent reload of js server cfg, this is going to be broken until js is disabled @@ -4371,7 +4371,7 @@ func (dr *DirAccResolver) Start(s *Server) error { s.Warnf("DirResolver - Error checking for JetStream support for account %q: %v", pubKey, err) } } else if jsa == nil { - if err = s.configJetStream(acc); err != nil { + if err = s.configJetStream(acc, nil); err != nil { s.Errorf("DirResolver - Error configuring JetStream for account %q: %v", pubKey, err) } } diff --git a/server/events_test.go b/server/events_test.go index 8d4531e6bd8..68d2b83c55a 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1238,7 +1238,7 @@ func TestAccountReqMonitoring(t *testing.T) { s.EnableJetStream(&JetStreamConfig{StoreDir: t.TempDir()}) unusedAcc, _ := createAccount(s) acc, akp := createAccount(s) - acc.EnableJetStream(nil) + acc.EnableJetStream(nil, nil) subsz := fmt.Sprintf(accDirectReqSubj, acc.Name, "SUBSZ") connz := fmt.Sprintf(accDirectReqSubj, acc.Name, "CONNZ") jsz := fmt.Sprintf(accDirectReqSubj, acc.Name, "JSZ") diff --git a/server/jetstream.go b/server/jetstream.go index 2b02fd99f8c..9584e59a91d 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -194,6 +194,11 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error { } s.Noticef("Starting JetStream") + start := time.Now() + defer func() { + s.Noticef("Took %s to start JetStream", time.Since(start)) + }() + if config == nil || config.MaxMemory <= 0 || config.MaxStore <= 0 { var storeDir, domain, uniqueTag string var maxStore, maxMem int64 @@ -686,6 +691,11 @@ func (s *Server) DisableJetStream() error { } func (s *Server) enableJetStreamAccounts() error { + // Reuse the same task workers across all accounts, so that we don't explode + // with a large number of goroutines on multi-account systems. + tq := parallelTaskQueue(0) + defer close(tq) + // If we have no configured accounts setup then setup imports on global account. if s.globalAccountOnly() { gacc := s.GlobalAccount() @@ -694,10 +704,10 @@ func (s *Server) enableJetStreamAccounts() error { gacc.jsLimits = defaultJSAccountTiers } gacc.mu.Unlock() - if err := s.configJetStream(gacc); err != nil { + if err := s.configJetStream(gacc, tq); err != nil { return err } - } else if err := s.configAllJetStreamAccounts(); err != nil { + } else if err := s.configAllJetStreamAccounts(tq); err != nil { return fmt.Errorf("Error enabling jetstream on configured accounts: %v", err) } return nil @@ -761,7 +771,7 @@ func (a *Account) enableJetStreamInfoServiceImportOnly() error { return a.enableAllJetStreamServiceImportsAndMappings() } -func (s *Server) configJetStream(acc *Account) error { +func (s *Server) configJetStream(acc *Account, tq chan<- func()) error { if acc == nil { return nil } @@ -778,7 +788,7 @@ func (s *Server) configJetStream(acc *Account) error { return err } } else { - if err := acc.EnableJetStream(jsLimits); err != nil { + if err := acc.EnableJetStream(jsLimits, tq); err != nil { return err } if s.gateway.enabled { @@ -799,7 +809,7 @@ func (s *Server) configJetStream(acc *Account) error { } // configAllJetStreamAccounts walk all configured accounts and turn on jetstream if requested. -func (s *Server) configAllJetStreamAccounts() error { +func (s *Server) configAllJetStreamAccounts(tq chan<- func()) error { // Check to see if system account has been enabled. We could arrive here via reload and // a non-default system account. s.checkJetStreamExports() @@ -839,7 +849,7 @@ func (s *Server) configAllJetStreamAccounts() error { // Process any jetstream enabled accounts here. These will be accounts we are // already aware of at startup etc. for _, acc := range jsAccounts { - if err := s.configJetStream(acc); err != nil { + if err := s.configJetStream(acc, tq); err != nil { return err } } @@ -852,7 +862,7 @@ func (s *Server) configAllJetStreamAccounts() error { // Only load up ones not already loaded since they are processed above. if _, ok := accounts.Load(accName); !ok { if acc, err := s.lookupAccount(accName); err != nil && acc != nil { - if err := s.configJetStream(acc); err != nil { + if err := s.configJetStream(acc, tq); err != nil { return err } } @@ -1100,7 +1110,7 @@ func (a *Account) assignJetStreamLimits(limits map[string]JetStreamAccountLimits // EnableJetStream will enable JetStream on this account with the defined limits. // This is a helper for JetStreamEnableAccount. -func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) error { +func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq chan<- func()) error { a.mu.RLock() s := a.srv a.mu.RUnlock() @@ -1253,30 +1263,139 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro } } - // Collect consumers, do after all streams. - type ce struct { - mset *stream - odir string - } - var consumers []*ce - - // Collect any interest policy streams to check for - // https://github.com/nats-io/nats-server/issues/3612 - var ipstreams []*stream - // Remember if we should be encrypted and what cipher we think we should use. encrypted := s.getOpts().JetStreamKey != _EMPTY_ - plaintext := true sc := s.getOpts().JetStreamCipher + doConsumers := func(mset *stream, odir string) { + ofis, _ := os.ReadDir(odir) + if len(ofis) > 0 { + s.Noticef(" Recovering %d consumers for stream - '%s > %s'", len(ofis), mset.accName(), mset.name()) + } + for _, ofi := range ofis { + metafile := filepath.Join(odir, ofi.Name(), JetStreamMetaFile) + metasum := filepath.Join(odir, ofi.Name(), JetStreamMetaFileSum) + if _, err := os.Stat(metafile); os.IsNotExist(err) { + s.Warnf(" Missing consumer metafile %q", metafile) + continue + } + buf, err := os.ReadFile(metafile) + if err != nil { + s.Warnf(" Error reading consumer metafile %q: %v", metafile, err) + continue + } + if _, err := os.Stat(metasum); os.IsNotExist(err) { + s.Warnf(" Missing consumer checksum for %q", metasum) + continue + } + + // Check if we are encrypted. + if key, err := os.ReadFile(filepath.Join(odir, ofi.Name(), JetStreamMetaFileKey)); err == nil { + s.Debugf(" Consumer metafile is encrypted, reading encrypted keyfile") + // Decode the buffer before proceeding. + ctxName := mset.name() + tsep + ofi.Name() + nbuf, _, err := s.decryptMeta(sc, key, buf, a.Name, ctxName) + if err != nil { + s.Warnf(" Error decrypting our consumer metafile: %v", err) + continue + } + buf = nbuf + } + + var cfg FileConsumerInfo + decoder := json.NewDecoder(bytes.NewReader(buf)) + decoder.DisallowUnknownFields() + strictErr := decoder.Decode(&cfg) + if strictErr != nil { + cfg = FileConsumerInfo{} + if err := json.Unmarshal(buf, &cfg); err != nil { + s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, err) + continue + } + } + if supported := supportsRequiredApiLevel(cfg.Metadata); !supported || strictErr != nil { + var offlineReason string + if !supported { + apiLevel := getRequiredApiLevel(cfg.Metadata) + if strictErr != nil { + offlineReason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(strictErr.Error(), "json: ")) + } else { + offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel) + } + s.Warnf(" Detected unsupported consumer '%s > %s > %s': %s", a.Name, mset.name(), cfg.Name, offlineReason) + } else { + offlineReason = fmt.Sprintf("decoding error: %v", strictErr) + s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, strictErr) + } + singleServerMode := !s.JetStreamIsClustered() && s.standAloneMode() + if singleServerMode { + if !mset.closed.Load() { + s.Warnf(" Stopping unsupported stream '%s > %s'", a.Name, mset.name()) + mset.mu.Lock() + mset.offlineReason = fmt.Sprintf("stopped - unsupported consumer %q", cfg.Name) + mset.mu.Unlock() + mset.stop(false, false) + } + + // Fake a consumer, so we can respond to API requests as single-server. + o := &consumer{ + mset: mset, + js: s.getJetStream(), + acc: a, + srv: s, + cfg: cfg.ConsumerConfig, + active: false, + stream: mset.name(), + name: cfg.Name, + dseq: 1, + sseq: 1, + created: time.Now().UTC(), + closed: true, + offlineReason: offlineReason, + } + if !cfg.Created.IsZero() { + o.created = cfg.Created + } + + mset.mu.Lock() + mset.setConsumer(o) + mset.mu.Unlock() + } + continue + } + + isEphemeral := !isDurableConsumer(&cfg.ConsumerConfig) + if isEphemeral { + // This is an ephemeral consumer and this could fail on restart until + // the consumer can reconnect. We will create it as a durable and switch it. + cfg.ConsumerConfig.Durable = ofi.Name() + } + obs, err := mset.addConsumerWithAssignment(&cfg.ConsumerConfig, _EMPTY_, nil, true, ActionCreateOrUpdate, false) + if err != nil { + s.Warnf(" Error adding consumer %q: %v", cfg.Name, err) + continue + } + if isEphemeral { + obs.switchToEphemeral() + } + if !cfg.Created.IsZero() { + obs.setCreatedTime(cfg.Created) + } + if err != nil { + s.Warnf(" Error restoring consumer %q state: %v", cfg.Name, err) + } + } + } + // Now recover the streams. fis, _ := os.ReadDir(sdir) - for _, fi := range fis { + doStream := func(fi os.DirEntry) error { + plaintext := true mdir := filepath.Join(sdir, fi.Name()) // Check for partially deleted streams. They are marked with "." prefix. if strings.HasPrefix(fi.Name(), tsep) { go os.RemoveAll(mdir) - continue + return nil } key := sha256.Sum256([]byte(fi.Name())) hh, err := highwayhash.New64(key[:]) @@ -1287,27 +1406,27 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro metasum := filepath.Join(mdir, JetStreamMetaFileSum) if _, err := os.Stat(metafile); os.IsNotExist(err) { s.Warnf(" Missing stream metafile for %q", metafile) - continue + return nil } buf, err := os.ReadFile(metafile) if err != nil { s.Warnf(" Error reading metafile %q: %v", metafile, err) - continue + return nil } if _, err := os.Stat(metasum); os.IsNotExist(err) { s.Warnf(" Missing stream checksum file %q", metasum) - continue + return nil } sum, err := os.ReadFile(metasum) if err != nil { s.Warnf(" Error reading Stream metafile checksum %q: %v", metasum, err) - continue + return nil } hh.Write(buf) checksum := hex.EncodeToString(hh.Sum(nil)) if checksum != string(sum) { s.Warnf(" Stream metafile %q: checksums do not match %q vs %q", metafile, sum, checksum) - continue + return nil } // Track if we are converting ciphers. @@ -1320,14 +1439,14 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro s.Debugf(" Stream metafile is encrypted, reading encrypted keyfile") if len(keyBuf) < minMetaKeySize { s.Warnf(" Bad stream encryption key length of %d", len(keyBuf)) - continue + return nil } // Decode the buffer before proceeding. var nbuf []byte nbuf, convertingCiphers, err = s.decryptMeta(sc, keyBuf, buf, a.Name, fi.Name()) if err != nil { s.Warnf(" Error decrypting our stream metafile: %v", err) - continue + return nil } buf = nbuf plaintext = false @@ -1341,7 +1460,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro cfg = FileStreamInfo{} if err := json.Unmarshal(buf, &cfg); err != nil { s.Warnf(" Error unmarshalling stream metafile %q: %v", metafile, err) - continue + return nil } } if supported := supportsRequiredApiLevel(cfg.Metadata); !supported || strictErr != nil { @@ -1384,13 +1503,16 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro // Now do the consumers. odir := filepath.Join(sdir, fi.Name(), consumerDir) - consumers = append(consumers, &ce{mset, odir}) + doConsumers(mset, odir) } - continue + return nil } if cfg.Template != _EMPTY_ { - if err := jsa.addStreamNameToTemplate(cfg.Template, cfg.Name); err != nil { + jsa.mu.Lock() + err := jsa.addStreamNameToTemplate(cfg.Template, cfg.Name) + jsa.mu.Unlock() + if err != nil { s.Warnf(" Error adding stream %q to template %q: %v", cfg.Name, cfg.Template, err) } } @@ -1415,7 +1537,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro } } if hadSubjErr { - continue + return nil } // The other possible bug is assigning subjects to mirrors, so check for that and patch as well. @@ -1449,7 +1571,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro s.Warnf(" Error replacing meta keyfile for stream %q: %v", cfg.Name, err) } } - continue + return nil } if !cfg.Created.IsZero() { mset.setCreatedTime(cfg.Created) @@ -1531,146 +1653,41 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro s.Noticef(" Restored %s messages for stream '%s > %s' in %v", comma(int64(state.Msgs)), mset.accName(), mset.name(), time.Since(rt).Round(time.Millisecond)) + // Now do the consumers. + odir := filepath.Join(sdir, fi.Name(), consumerDir) + doConsumers(mset, odir) + // Collect to check for dangling messages. // TODO(dlc) - Can be removed eventually. if cfg.StreamConfig.Retention == InterestPolicy { - ipstreams = append(ipstreams, mset) + mset.checkForOrphanMsgs() + mset.checkConsumerReplication() } - // Now do the consumers. - odir := filepath.Join(sdir, fi.Name(), consumerDir) - consumers = append(consumers, &ce{mset, odir}) + return nil } - for _, e := range consumers { - ofis, _ := os.ReadDir(e.odir) - if len(ofis) > 0 { - s.Noticef(" Recovering %d consumers for stream - '%s > %s'", len(ofis), e.mset.accName(), e.mset.name()) - } - for _, ofi := range ofis { - metafile := filepath.Join(e.odir, ofi.Name(), JetStreamMetaFile) - metasum := filepath.Join(e.odir, ofi.Name(), JetStreamMetaFileSum) - if _, err := os.Stat(metafile); os.IsNotExist(err) { - s.Warnf(" Missing consumer metafile %q", metafile) - continue - } - buf, err := os.ReadFile(metafile) - if err != nil { - s.Warnf(" Error reading consumer metafile %q: %v", metafile, err) - continue - } - if _, err := os.Stat(metasum); os.IsNotExist(err) { - s.Warnf(" Missing consumer checksum for %q", metasum) - continue - } - - // Check if we are encrypted. - if key, err := os.ReadFile(filepath.Join(e.odir, ofi.Name(), JetStreamMetaFileKey)); err == nil { - s.Debugf(" Consumer metafile is encrypted, reading encrypted keyfile") - // Decode the buffer before proceeding. - ctxName := e.mset.name() + tsep + ofi.Name() - nbuf, _, err := s.decryptMeta(sc, key, buf, a.Name, ctxName) - if err != nil { - s.Warnf(" Error decrypting our consumer metafile: %v", err) - continue - } - buf = nbuf - } - - var cfg FileConsumerInfo - decoder := json.NewDecoder(bytes.NewReader(buf)) - decoder.DisallowUnknownFields() - strictErr := decoder.Decode(&cfg) - if strictErr != nil { - cfg = FileConsumerInfo{} - if err := json.Unmarshal(buf, &cfg); err != nil { - s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, err) - continue - } - } - if supported := supportsRequiredApiLevel(cfg.Metadata); !supported || strictErr != nil { - var offlineReason string - if !supported { - apiLevel := getRequiredApiLevel(cfg.Metadata) - if strictErr != nil { - offlineReason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(strictErr.Error(), "json: ")) - } else { - offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel) - } - s.Warnf(" Detected unsupported consumer '%s > %s > %s': %s", a.Name, e.mset.name(), cfg.Name, offlineReason) - } else { - offlineReason = fmt.Sprintf("decoding error: %v", strictErr) - s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, strictErr) - } - singleServerMode := !s.JetStreamIsClustered() && s.standAloneMode() - if singleServerMode { - if !e.mset.closed.Load() { - s.Warnf(" Stopping unsupported stream '%s > %s'", a.Name, e.mset.name()) - e.mset.mu.Lock() - e.mset.offlineReason = fmt.Sprintf("stopped - unsupported consumer %q", cfg.Name) - e.mset.mu.Unlock() - e.mset.stop(false, false) - } - - // Fake a consumer, so we can respond to API requests as single-server. - o := &consumer{ - mset: e.mset, - js: s.getJetStream(), - acc: a, - srv: s, - cfg: cfg.ConsumerConfig, - active: false, - stream: e.mset.name(), - name: cfg.Name, - dseq: 1, - sseq: 1, - created: time.Now().UTC(), - closed: true, - offlineReason: offlineReason, - } - if !cfg.Created.IsZero() { - o.created = cfg.Created - } - - e.mset.mu.Lock() - e.mset.setConsumer(o) - e.mset.mu.Unlock() - } - continue - } - - isEphemeral := !isDurableConsumer(&cfg.ConsumerConfig) - if isEphemeral { - // This is an ephemeral consumer and this could fail on restart until - // the consumer can reconnect. We will create it as a durable and switch it. - cfg.ConsumerConfig.Durable = ofi.Name() - } - obs, err := e.mset.addConsumerWithAssignment(&cfg.ConsumerConfig, _EMPTY_, nil, true, ActionCreateOrUpdate, false) - if err != nil { - s.Warnf(" Error adding consumer %q: %v", cfg.Name, err) - continue - } - if isEphemeral { - obs.switchToEphemeral() - } - if !cfg.Created.IsZero() { - obs.setCreatedTime(cfg.Created) - } - if err != nil { - s.Warnf(" Error restoring consumer %q state: %v", cfg.Name, err) + if tq != nil { + // If a parallelTaskQueue was provided then use that for concurrency. + var wg sync.WaitGroup + wg.Add(len(fis)) + for _, fi := range fis { + tq <- func() { + doStream(fi) + wg.Done() } } + wg.Wait() + } else { + // No parallelTaskQueue provided, do inline as before. + for _, fi := range fis { + doStream(fi) + } } // Make sure to cleanup any old remaining snapshots. os.RemoveAll(filepath.Join(jsa.storeDir, snapsDir)) - // Check interest policy streams for auto cleanup. - for _, mset := range ipstreams { - mset.checkForOrphanMsgs() - mset.checkConsumerReplication() - } - s.Debugf("JetStream state for account %q recovered", a.Name) return nil diff --git a/server/jetstream_benchmark_test.go b/server/jetstream_benchmark_test.go index bdd263da39d..c9dd7097e0d 100644 --- a/server/jetstream_benchmark_test.go +++ b/server/jetstream_benchmark_test.go @@ -19,8 +19,13 @@ import ( "encoding/json" "fmt" "math" + "math/bits" "math/rand" + "os" + "path/filepath" + "runtime" "strconv" + "strings" "sync" "sync/atomic" "testing" @@ -2184,6 +2189,61 @@ func BenchmarkJetStreamPublishConcurrent(b *testing.B) { } } +func BenchmarkJetStreamParallelStartup(b *testing.B) { + omp := runtime.GOMAXPROCS(-1) + streams, msgs, cardinality := omp, 100_000, 10_000 + + _, s, shutdown, nc, js := startJSClusterAndConnect(b, 1) + defer shutdown() + jsc := *s.JetStreamConfig() + sd := strings.TrimSuffix(jsc.StoreDir, "/jetstream") + + b.Logf("Building %d streams with %d messages, %d subjects...", streams, msgs, cardinality) + start := time.Now() + for i := range streams { + jsStreamCreate(b, nc, &StreamConfig{ + Name: fmt.Sprintf("stream_%d", i), + Subjects: []string{fmt.Sprintf("%d.>", i)}, + Storage: FileStorage, + }) + for n := range msgs { + subj := fmt.Sprintf("%d.%d", i, n%cardinality) + _, err := js.Publish(subj, nil) + require_NoError(b, err) + } + } + b.Logf("Streams built in %s", time.Since(start)) + + bench := func(b *testing.B) { + s.shutdownJetStream() + jsc.StoreDir = sd + require_NoError(b, filepath.Walk(jsc.StoreDir, func(path string, info os.FileInfo, err error) error { + require_NoError(b, err) + if info.Mode().IsRegular() && info.Name() == "index.db" { + return os.Truncate(path, 0) + } + return nil + })) + b.ResetTimer() + s.EnableJetStream(&jsc) + } + + // Try to step down GOMAXPROCS in common CPU core counts. + mp := 1 << (bits.Len(uint(omp)) - 1) + if omp > mp { + b.Run(fmt.Sprintf("GOMAXPROCS=%d", omp), func(b *testing.B) { + bench(b) + }) + } + for ; mp >= 1; mp >>= 1 { + b.Run(fmt.Sprintf("GOMAXPROCS=%d", mp), func(b *testing.B) { + runtime.GOMAXPROCS(mp) + defer runtime.GOMAXPROCS(omp) + bench(b) + }) + } +} + // Helper function to stand up a JS-enabled single server or cluster func startJSClusterAndConnect(b *testing.B, clusterSize int) (c *cluster, s *Server, shutdown func(), nc *nats.Conn, js nats.JetStreamContext) { b.Helper() diff --git a/server/jetstream_test.go b/server/jetstream_test.go index d5e831a87b0..d5d47fb55b2 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -152,7 +152,7 @@ func TestJetStreamEnableAndDisableAccount(t *testing.T) { } acc, _ := s.LookupOrRegisterAccount("$FOO") - if err := acc.EnableJetStream(nil); err != nil { + if err := acc.EnableJetStream(nil, nil); err != nil { t.Fatalf("Did not expect error on enabling account: %v", err) } if na := s.JetStreamNumAccounts(); na != 1 { @@ -171,7 +171,7 @@ func TestJetStreamEnableAndDisableAccount(t *testing.T) { } // Should get an error for trying to enable a non-registered account. acc = NewAccount("$BAZ") - if err := acc.EnableJetStream(nil); err == nil { + if err := acc.EnableJetStream(nil, nil); err == nil { t.Fatalf("Expected error on enabling account that was not registered") } } @@ -4869,20 +4869,20 @@ func TestJetStreamSystemLimits(t *testing.T) { } } - if err := facc.EnableJetStream(limits(24, 192)); err != nil { + if err := facc.EnableJetStream(limits(24, 192), nil); err != nil { t.Fatalf("Unexpected error: %v", err) } // Use up rest of our resources in memory - if err := bacc.EnableJetStream(limits(1000, 0)); err != nil { + if err := bacc.EnableJetStream(limits(1000, 0), nil); err != nil { t.Fatalf("Unexpected error: %v", err) } // Now ask for more memory. Should error. - if err := zacc.EnableJetStream(limits(1000, 0)); err == nil { + if err := zacc.EnableJetStream(limits(1000, 0), nil); err == nil { t.Fatalf("Expected an error when exhausting memory resource limits") } // Disk too. - if err := zacc.EnableJetStream(limits(0, 10000)); err == nil { + if err := zacc.EnableJetStream(limits(0, 10000), nil); err == nil { t.Fatalf("Expected an error when exhausting memory resource limits") } facc.DisableJetStream() @@ -4896,7 +4896,7 @@ func TestJetStreamSystemLimits(t *testing.T) { t.Fatalf("Expected reserved memory and store to be 0, got %v and %v", friendlyBytes(rm), friendlyBytes(rd)) } - if err := facc.EnableJetStream(limits(24, 192)); err != nil { + if err := facc.EnableJetStream(limits(24, 192), nil); err != nil { t.Fatalf("Unexpected error: %v", err) } // Test Adjust @@ -7409,7 +7409,7 @@ func TestJetStreamCanNotEnableOnSystemAccount(t *testing.T) { defer s.Shutdown() sa := s.SystemAccount() - if err := sa.EnableJetStream(nil); err == nil { + if err := sa.EnableJetStream(nil, nil); err == nil { t.Fatalf("Expected an error trying to enable on the system account") } } diff --git a/server/mqtt_test.go b/server/mqtt_test.go index e779096ea07..c08c75e94b3 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -1035,7 +1035,7 @@ func testMQTTEnableJSForAccount(t *testing.T, s *Server, accName string) { MaxStore: 1024 * 1024, }, } - if err := acc.EnableJetStream(limits); err != nil { + if err := acc.EnableJetStream(limits, nil); err != nil { t.Fatalf("Error enabling JS: %v", err) } } diff --git a/server/util.go b/server/util.go index dcfa1000d43..4e08e3f0cda 100644 --- a/server/util.go +++ b/server/util.go @@ -23,6 +23,7 @@ import ( "net" "net/url" "reflect" + "runtime" "strconv" "strings" "time" @@ -340,3 +341,25 @@ func generateInfoJSON(info *Info) []byte { pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)} return bytes.Join(pcs, []byte(" ")) } + +// parallelTaskQueue starts a number of goroutines and returns a channel +// which functions can be sent to for queued parallel execution. The +// goroutines will stop running when the returned channel is closed and +// all queued tasks have completed. The passed in mp limits concurrency, +// or a value <= 0 will default to GOMAXPROCS. +func parallelTaskQueue(mp int) chan<- func() { + if rmp := runtime.GOMAXPROCS(-1); mp <= 0 { + mp = rmp + } else { + mp = max(rmp, mp) + } + tq := make(chan func(), mp) + for range mp { + go func() { + for fn := range tq { + fn() + } + }() + } + return tq +}