Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 35 additions & 33 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"fmt"
"hash"
"io"
"io/fs"
"math"
"net"
"os"
Expand Down Expand Up @@ -765,9 +766,7 @@ func (fs *fileStore) setupAEK() error {
if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) {
return err
}
<-dios
err = os.WriteFile(keyFile, encrypted, defaultFilePerms)
dios <- struct{}{}
err = fs.writeFileWithOptionalSync(keyFile, encrypted, defaultFilePerms)
if err != nil {
return err
}
Expand Down Expand Up @@ -803,19 +802,15 @@ func (fs *fileStore) writeStreamMeta() error {
b = fs.aek.Seal(nonce, nonce, b, nil)
}

<-dios
err = os.WriteFile(meta, b, defaultFilePerms)
dios <- struct{}{}
err = fs.writeFileWithOptionalSync(meta, b, defaultFilePerms)
if err != nil {
return err
}
fs.hh.Reset()
fs.hh.Write(b)
checksum := hex.EncodeToString(fs.hh.Sum(nil))
sum := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum)
<-dios
err = os.WriteFile(sum, []byte(checksum), defaultFilePerms)
dios <- struct{}{}
err = fs.writeFileWithOptionalSync(sum, []byte(checksum), defaultFilePerms)
if err != nil {
return err
}
Expand Down Expand Up @@ -1206,9 +1201,7 @@ func (mb *msgBlock) convertCipher() error {
// the old keyfile back.
if err := fs.genEncryptionKeysForBlock(mb); err != nil {
keyFile := filepath.Join(mdir, fmt.Sprintf(keyScan, mb.index))
<-dios
os.WriteFile(keyFile, ekey, defaultFilePerms)
dios <- struct{}{}
fs.writeFileWithOptionalSync(keyFile, ekey, defaultFilePerms)
return err
}
mb.bek.XORKeyStream(buf, buf)
Expand Down Expand Up @@ -3401,9 +3394,7 @@ func (fs *fileStore) genEncryptionKeysForBlock(mb *msgBlock) error {
if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) {
return err
}
<-dios
err = os.WriteFile(keyFile, encrypted, defaultFilePerms)
dios <- struct{}{}
err = fs.writeFileWithOptionalSync(keyFile, encrypted, defaultFilePerms)
if err != nil {
return err
}
Expand Down Expand Up @@ -8695,9 +8686,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
if err != nil {
return nil, err
}
<-dios
err = os.WriteFile(o.ifn, state, defaultFilePerms)
dios <- struct{}{}
err = fs.writeFileWithOptionalSync(o.ifn, state, defaultFilePerms)
if err != nil {
if didCreate {
os.RemoveAll(odir)
Expand Down Expand Up @@ -9171,9 +9160,7 @@ func (o *consumerFileStore) writeState(buf []byte) error {
o.mu.Unlock()

// Lock not held here but we do limit number of outstanding calls that could block OS threads.
<-dios
err := os.WriteFile(ifn, buf, defaultFilePerms)
dios <- struct{}{}
err := o.fs.writeFileWithOptionalSync(ifn, buf, defaultFilePerms)

o.mu.Lock()
if err != nil {
Expand Down Expand Up @@ -9212,9 +9199,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error {
if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) {
return err
}
<-dios
err = os.WriteFile(keyFile, encrypted, defaultFilePerms)
dios <- struct{}{}
err = cfs.fs.writeFileWithOptionalSync(keyFile, encrypted, defaultFilePerms)
if err != nil {
return err
}
Expand All @@ -9235,9 +9220,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error {
b = cfs.aek.Seal(nonce, nonce, b, nil)
}

<-dios
err = os.WriteFile(meta, b, defaultFilePerms)
dios <- struct{}{}
err = cfs.fs.writeFileWithOptionalSync(meta, b, defaultFilePerms)
if err != nil {
return err
}
Expand All @@ -9246,9 +9229,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error {
checksum := hex.EncodeToString(cfs.hh.Sum(nil))
sum := filepath.Join(cfs.odir, JetStreamMetaFileSum)

<-dios
err = os.WriteFile(sum, []byte(checksum), defaultFilePerms)
dios <- struct{}{}
err = cfs.fs.writeFileWithOptionalSync(sum, []byte(checksum), defaultFilePerms)
if err != nil {
return err
}
Expand Down Expand Up @@ -9543,9 +9524,7 @@ func (o *consumerFileStore) Stop() error {

if len(buf) > 0 {
o.waitOnFlusher()
<-dios
err = os.WriteFile(ifn, buf, defaultFilePerms)
dios <- struct{}{}
err = o.fs.writeFileWithOptionalSync(ifn, buf, defaultFilePerms)
}
return err
}
Expand Down Expand Up @@ -9779,3 +9758,26 @@ func (alg StoreCompression) Decompress(buf []byte) ([]byte, error) {

return output, reader.Close()
}

// writeFileWithOptionalSync is equivalent to os.WriteFile() but optionally
// sets O_SYNC on the open file if SyncAlways is set. The dios semaphore is
// handled automatically by this function, so don't wrap calls to it in dios.
func (fs *fileStore) writeFileWithOptionalSync(name string, data []byte, perm fs.FileMode) error {
<-dios
defer func() {
dios <- struct{}{}
}()
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
if fs.fcfg.SyncAlways {
flags |= os.O_SYNC
}
f, err := os.OpenFile(name, flags, perm)
if err != nil {
return err
}
if _, err = f.Write(data); err != nil {
_ = f.Close()
return err
}
return f.Close()
}
26 changes: 26 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7754,3 +7754,29 @@ func Benchmark_FileStoreLoadNextMsgVerySparseMsgsLargeTail(b *testing.B) {
require_Error(b, err, ErrStoreEOF)
}
}

func Benchmark_FileStoreCreateConsumerStores(b *testing.B) {
for _, syncAlways := range []bool{true, false} {
b.Run(fmt.Sprintf("%v", syncAlways), func(b *testing.B) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: b.TempDir(), SyncAlways: syncAlways},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
require_NoError(b, err)
defer fs.Stop()

oconfig := ConsumerConfig{
DeliverSubject: "d",
FilterSubject: "foo",
AckPolicy: AckAll,
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
oname := fmt.Sprintf("obs22_%d", i)
ofs, err := fs.ConsumerStore(oname, &oconfig)
require_NoError(b, err)
require_NoError(b, ofs.Stop())
}
})
}
}