diff --git a/server/filestore_test.go b/server/filestore_test.go index 936b3ef33fc..de25e7728f7 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -6755,67 +6755,6 @@ func TestFileStoreMsgBlockFirstAndLastSeqCorrupt(t *testing.T) { require_Equal(t, lseq, 10) } -func TestFileStoreMsgLoadNextMsgMulti(t *testing.T) { - fs, err := newFileStore( - FileStoreConfig{StoreDir: t.TempDir()}, - StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage}) - require_NoError(t, err) - defer fs.Stop() - - // Put 1k msgs in - for i := 0; i < 1000; i++ { - subj := fmt.Sprintf("foo.%d", i) - fs.StoreMsg(subj, nil, []byte("ZZZ")) - } - - var smv StoreMsg - // Do multi load next with 1 wc entry. - sl := NewSublistWithCache() - sl.Insert(&subscription{subject: []byte("foo.>")}) - for i, seq := 0, uint64(1); i < 1000; i++ { - sm, nseq, err := fs.LoadNextMsgMulti(sl, seq, &smv) - require_NoError(t, err) - require_True(t, sm.subj == fmt.Sprintf("foo.%d", i)) - require_Equal(t, nseq, seq) - seq++ - } - - // Now do multi load next with 1000 literal subjects. - sl = NewSublistWithCache() - for i := 0; i < 1000; i++ { - subj := fmt.Sprintf("foo.%d", i) - sl.Insert(&subscription{subject: []byte(subj)}) - } - for i, seq := 0, uint64(1); i < 1000; i++ { - sm, nseq, err := fs.LoadNextMsgMulti(sl, seq, &smv) - require_NoError(t, err) - require_True(t, sm.subj == fmt.Sprintf("foo.%d", i)) - require_Equal(t, nseq, seq) - seq++ - } - - // Check that we can pull out 3 individuals. - sl = NewSublistWithCache() - sl.Insert(&subscription{subject: []byte("foo.2")}) - sl.Insert(&subscription{subject: []byte("foo.222")}) - sl.Insert(&subscription{subject: []byte("foo.999")}) - sm, seq, err := fs.LoadNextMsgMulti(sl, 1, &smv) - require_NoError(t, err) - require_True(t, sm.subj == "foo.2") - require_Equal(t, seq, 3) - sm, seq, err = fs.LoadNextMsgMulti(sl, seq+1, &smv) - require_NoError(t, err) - require_True(t, sm.subj == "foo.222") - require_Equal(t, seq, 223) - sm, seq, err = fs.LoadNextMsgMulti(sl, seq+1, &smv) - require_NoError(t, err) - require_True(t, sm.subj == "foo.999") - require_Equal(t, seq, 1000) - _, seq, err = fs.LoadNextMsgMulti(sl, seq+1, &smv) - require_Error(t, err) - require_Equal(t, seq, 1000) -} - /////////////////////////////////////////////////////////////////////////// // Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/memstore_test.go b/server/memstore_test.go index 2d35262b855..3c049233447 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -1051,67 +1051,3 @@ func TestMemStorePurgeExWithDeletedMsgs(t *testing.T) { require_Equal(t, state.LastSeq, 10) require_Equal(t, state.Msgs, 1) } - -func TestMemStoreMsgLoadNextMsgMulti(t *testing.T) { - cfg := &StreamConfig{ - Name: "zzz", - Subjects: []string{"foo.*"}, - Storage: MemoryStorage, - } - ms, err := newMemStore(cfg) - require_NoError(t, err) - defer ms.Stop() - - // Put 1k msgs in - for i := 0; i < 1000; i++ { - subj := fmt.Sprintf("foo.%d", i) - ms.StoreMsg(subj, nil, []byte("ZZZ")) - } - - var smv StoreMsg - // Do multi load next with 1 wc entry. - sl := NewSublistWithCache() - sl.Insert(&subscription{subject: []byte("foo.>")}) - for i, seq := 0, uint64(1); i < 1000; i++ { - sm, nseq, err := ms.LoadNextMsgMulti(sl, seq, &smv) - require_NoError(t, err) - require_True(t, sm.subj == fmt.Sprintf("foo.%d", i)) - require_Equal(t, nseq, seq) - seq++ - } - - // Now do multi load next with 1000 literal subjects. - sl = NewSublistWithCache() - for i := 0; i < 1000; i++ { - subj := fmt.Sprintf("foo.%d", i) - sl.Insert(&subscription{subject: []byte(subj)}) - } - for i, seq := 0, uint64(1); i < 1000; i++ { - sm, nseq, err := ms.LoadNextMsgMulti(sl, seq, &smv) - require_NoError(t, err) - require_True(t, sm.subj == fmt.Sprintf("foo.%d", i)) - require_Equal(t, nseq, seq) - seq++ - } - - // Check that we can pull out 3 individuals. - sl = NewSublistWithCache() - sl.Insert(&subscription{subject: []byte("foo.2")}) - sl.Insert(&subscription{subject: []byte("foo.222")}) - sl.Insert(&subscription{subject: []byte("foo.999")}) - sm, seq, err := ms.LoadNextMsgMulti(sl, 1, &smv) - require_NoError(t, err) - require_True(t, sm.subj == "foo.2") - require_Equal(t, seq, 3) - sm, seq, err = ms.LoadNextMsgMulti(sl, seq+1, &smv) - require_NoError(t, err) - require_True(t, sm.subj == "foo.222") - require_Equal(t, seq, 223) - sm, seq, err = ms.LoadNextMsgMulti(sl, seq+1, &smv) - require_NoError(t, err) - require_True(t, sm.subj == "foo.999") - require_Equal(t, seq, 1000) - _, seq, err = ms.LoadNextMsgMulti(sl, seq+1, &smv) - require_Error(t, err) - require_Equal(t, seq, 1000) -} diff --git a/server/store_test.go b/server/store_test.go new file mode 100644 index 00000000000..fdf4690a993 --- /dev/null +++ b/server/store_test.go @@ -0,0 +1,111 @@ +// Copyright 2012-2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !skip_store_tests +// +build !skip_store_tests + +package server + +import ( + "fmt" + "testing" +) + +func testAllStoreAllPermutations(t *testing.T, compressionAndEncryption bool, cfg StreamConfig, fn func(t *testing.T, fs StreamStore)) { + t.Run("Memory", func(t *testing.T) { + cfg.Storage = MemoryStorage + fs, err := newMemStore(&cfg) + require_NoError(t, err) + defer fs.Stop() + fn(t, fs) + }) + t.Run("File", func(t *testing.T) { + cfg.Storage = FileStorage + if compressionAndEncryption { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fs, err := newFileStore(fcfg, cfg) + require_NoError(t, err) + defer fs.Stop() + fn(t, fs) + }) + } else { + fs, err := newFileStore(FileStoreConfig{ + StoreDir: t.TempDir(), + }, cfg) + require_NoError(t, err) + defer fs.Stop() + fn(t, fs) + } + }) +} + +func TestStoreMsgLoadNextMsgMulti(t *testing.T) { + testAllStoreAllPermutations( + t, false, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}}, + func(t *testing.T, fs StreamStore) { + // Put 1k msgs in + for i := 0; i < 1000; i++ { + subj := fmt.Sprintf("foo.%d", i) + fs.StoreMsg(subj, nil, []byte("ZZZ")) + } + + var smv StoreMsg + // Do multi load next with 1 wc entry. + sl := NewSublistWithCache() + sl.Insert(&subscription{subject: []byte("foo.>")}) + for i, seq := 0, uint64(1); i < 1000; i++ { + sm, nseq, err := fs.LoadNextMsgMulti(sl, seq, &smv) + require_NoError(t, err) + require_Equal(t, sm.subj, fmt.Sprintf("foo.%d", i)) + require_Equal(t, nseq, seq) + seq++ + } + + // Now do multi load next with 1000 literal subjects. + sl = NewSublistWithCache() + for i := 0; i < 1000; i++ { + subj := fmt.Sprintf("foo.%d", i) + sl.Insert(&subscription{subject: []byte(subj)}) + } + for i, seq := 0, uint64(1); i < 1000; i++ { + sm, nseq, err := fs.LoadNextMsgMulti(sl, seq, &smv) + require_NoError(t, err) + require_Equal(t, sm.subj, fmt.Sprintf("foo.%d", i)) + require_Equal(t, nseq, seq) + seq++ + } + + // Check that we can pull out 3 individuals. + sl = NewSublistWithCache() + sl.Insert(&subscription{subject: []byte("foo.2")}) + sl.Insert(&subscription{subject: []byte("foo.222")}) + sl.Insert(&subscription{subject: []byte("foo.999")}) + sm, seq, err := fs.LoadNextMsgMulti(sl, 1, &smv) + require_NoError(t, err) + require_Equal(t, sm.subj, "foo.2") + require_Equal(t, seq, 3) + sm, seq, err = fs.LoadNextMsgMulti(sl, seq+1, &smv) + require_NoError(t, err) + require_Equal(t, sm.subj, "foo.222") + require_Equal(t, seq, 223) + sm, seq, err = fs.LoadNextMsgMulti(sl, seq+1, &smv) + require_NoError(t, err) + require_Equal(t, sm.subj, "foo.999") + require_Equal(t, seq, 1000) + _, seq, err = fs.LoadNextMsgMulti(sl, seq+1, &smv) + require_Error(t, err) + require_Equal(t, seq, 1000) + }, + ) +}