diff --git a/server/jetstream_benchmark_test.go b/server/jetstream_benchmark_test.go index cd75089ab89..bdd263da39d 100644 --- a/server/jetstream_benchmark_test.go +++ b/server/jetstream_benchmark_test.go @@ -18,7 +18,9 @@ package server import ( "encoding/json" "fmt" + "math" "math/rand" + "strconv" "sync" "sync/atomic" "testing" @@ -991,6 +993,90 @@ func BenchmarkJetStreamPublish(b *testing.B) { } } +func BenchmarkJetStreamMetaSnapshot(b *testing.B) { + c := createJetStreamClusterExplicit(b, "R3S", 3) + defer c.shutdown() + + setup := func(reqLevel string) *jetStream { + ml := c.leader() + acc, js := ml.globalAccount(), ml.getJetStream() + n := js.getMetaGroup() + + // Create all streams and consumers. + numStreams := 200 + numConsumers := 500 + ci := &ClientInfo{Cluster: "R3S", Account: globalAccountName} + js.mu.Lock() + metadata := map[string]string{JSRequiredLevelMetadataKey: reqLevel} + for i := 0; i < numStreams; i++ { + scfg := &StreamConfig{ + Name: fmt.Sprintf("STREAM-%d", i), + Subjects: []string{fmt.Sprintf("SUBJECT-%d", i)}, + Storage: MemoryStorage, + Metadata: metadata, + } + cfg, _ := ml.checkStreamCfg(scfg, acc, false) + rg, _ := js.createGroupForStream(ci, &cfg) + sa := &streamAssignment{Group: rg, Sync: syncSubjForStream(), Config: &cfg, Client: ci, Created: time.Now().UTC()} + n.Propose(encodeAddStreamAssignment(sa)) + + for j := 0; j < numConsumers; j++ { + ccfg := &ConsumerConfig{ + Durable: fmt.Sprintf("CONSUMER-%d", j), + MemoryStorage: true, + Metadata: metadata, + } + selectedLimits, _, _, _ := acc.selectLimits(ccfg.replicas(&cfg)) + srvLim := &ml.getOpts().JetStreamLimits + setConsumerConfigDefaults(ccfg, &cfg, srvLim, selectedLimits, false) + rg = js.cluster.createGroupForConsumer(ccfg, sa) + ca := &consumerAssignment{Group: rg, Stream: cfg.Name, Name: ccfg.Durable, Config: ccfg, Client: ci, Created: time.Now().UTC()} + n.Propose(encodeAddConsumerAssignment(ca)) + } + } + js.mu.Unlock() + + // Wait for all servers to have created all assets. + checkFor(b, 20*time.Second, 200*time.Millisecond, func() error { + for _, s := range c.servers { + sjs := s.getJetStream() + sjs.mu.RLock() + streams := sjs.cluster.streams[globalAccountName] + if len(streams) != numStreams { + sjs.mu.RUnlock() + return fmt.Errorf("expected %d streams, got %d", numStreams, len(streams)) + } + for _, sa := range streams { + if nc := len(sa.consumers); nc != numConsumers { + sjs.mu.RUnlock() + return fmt.Errorf("expected %d consumers, got %d", numConsumers, nc) + } + } + sjs.mu.RUnlock() + } + return nil + }) + return js + } + + for _, t := range []struct { + title string + reqLevel string + }{ + {title: "Default", reqLevel: "0"}, + {title: "AllUnsupported", reqLevel: strconv.Itoa(math.MaxInt)}, + } { + b.Run(t.title, func(b *testing.B) { + js := setup(t.reqLevel) + b.ResetTimer() + for range b.N { + js.metaSnapshot() + } + b.StopTimer() + }) + } +} + func BenchmarkJetStreamCounters(b *testing.B) { const ( verbose = false