diff --git a/server/jetstream_benchmark_test.go b/server/jetstream_benchmark_test.go index 1381d7fa645..b1b67d7d858 100644 --- a/server/jetstream_benchmark_test.go +++ b/server/jetstream_benchmark_test.go @@ -403,6 +403,78 @@ func BenchmarkJetStreamConsume(b *testing.B) { } } +// BenchmarkJetStreamConsumeFilteredContiguous verifies the fix in +// https://github.com/nats-io/nats-server/pull/7015 and should +// capture future regressions. +func BenchmarkJetStreamConsumeFilteredContiguous(b *testing.B) { + clusterSizeCases := []struct { + clusterSize int // Single node or cluster + replicas int // Stream replicas + storage nats.StorageType // Stream storage + }{ + {1, 1, nats.MemoryStorage}, + {3, 3, nats.MemoryStorage}, + {1, 1, nats.FileStorage}, + {3, 3, nats.FileStorage}, + } + + for _, cs := range clusterSizeCases { + name := fmt.Sprintf( + "N=%d,R=%d,storage=%s", + cs.clusterSize, + cs.replicas, + cs.storage.String(), + ) + b.Run(name, func(b *testing.B) { + _, _, shutdown, nc, js := startJSClusterAndConnect(b, cs.clusterSize) + defer shutdown() + defer nc.Close() + + var msgs = b.N + payload := make([]byte, 1024) + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "test", + Subjects: []string{"foo"}, + Retention: nats.LimitsPolicy, + Storage: cs.storage, + Replicas: cs.replicas, + }) + require_NoError(b, err) + + for range msgs { + _, err = js.Publish("foo", payload) + require_NoError(b, err) + } + + // Subject filters deliberately vary from the stream, ensures that we hit + // the right paths in the filestore, rather than detecting 1:1 overlap. + _, err = js.AddConsumer("test", &nats.ConsumerConfig{ + Name: "test_consumer", + FilterSubjects: []string{"foo", "bar"}, + DeliverPolicy: nats.DeliverAllPolicy, + AckPolicy: nats.AckNonePolicy, + Replicas: cs.replicas, + MemoryStorage: true, + }) + require_NoError(b, err) + + ps, err := js.PullSubscribe(_EMPTY_, _EMPTY_, nats.Bind("test", "test_consumer")) + require_NoError(b, err) + + b.SetBytes(int64(len(payload))) + b.ReportAllocs() + b.ResetTimer() + for range msgs { + msgs, err := ps.Fetch(1) + require_NoError(b, err) + require_Len(b, len(msgs), 1) + } + b.StopTimer() + }) + } +} + func BenchmarkJetStreamConsumeWithFilters(b *testing.B) { const ( verbose = false