Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -11764,29 +11764,47 @@ func (alg StoreCompression) Decompress(buf []byte) ([]byte, error) {
// sets O_SYNC on the open file if SyncAlways is set. The dios semaphore is
// handled automatically by this function, so don't wrap calls to it in dios.
func (fs *fileStore) writeFileWithOptionalSync(name string, data []byte, perm fs.FileMode) error {
if fs.fcfg.SyncAlways {
return writeFileWithSync(name, data, perm)
}
<-dios
defer func() {
dios <- struct{}{}
}()
return os.WriteFile(name, data, perm)
return writeAtomically(name, data, perm, fs.fcfg.SyncAlways)
}

func writeFileWithSync(name string, data []byte, perm fs.FileMode) error {
return writeAtomically(name, data, perm, true)
}

func writeAtomically(name string, data []byte, perm fs.FileMode, sync bool) error {
tmp := name + ".tmp"
flags := os.O_CREATE | os.O_WRONLY | os.O_TRUNC
if sync {
flags = flags | os.O_SYNC
Comment thread
neilalexander marked this conversation as resolved.
}
<-dios
defer func() {
dios <- struct{}{}
}()
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC | os.O_SYNC
f, err := os.OpenFile(name, flags, perm)
f, err := os.OpenFile(tmp, flags, perm)
if err != nil {
return err
}
if _, err = f.Write(data); err != nil {
if _, err := f.Write(data); err != nil {
_ = f.Close()
_ = os.Remove(tmp)
return err
}
return f.Close()
if err := f.Close(); err != nil {
_ = os.Remove(tmp)
return err
}
if err := os.Rename(tmp, name); err != nil {
_ = os.Remove(tmp)
return err
}
if sync {
// To ensure that the file rename was persisted on all filesystems,
// also try to flush the directory metadata.
if d, err := os.Open(filepath.Dir(name)); err == nil {
_ = d.Sync()
_ = d.Close()
}
}
return nil
}
54 changes: 30 additions & 24 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4182,6 +4182,7 @@ func TestFileStoreEncrypted(t *testing.T) {
err = o.Update(state)
require_NoError(t, err)

o.Stop()
fs.Stop()
fs, err = newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, created, prf(&fcfg), nil)
require_NoError(t, err)
Expand Down Expand Up @@ -4982,36 +4983,41 @@ func TestFileStoreSubjectsTotals(t *testing.T) {

func TestFileStoreConsumerStoreEncodeAfterRestart(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil)
require_NoError(t, err)
defer fs.Stop()
state := &ConsumerState{}

o, err := fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit})
require_NoError(t, err)
func() { // for defers
fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil)
require_NoError(t, err)
defer fs.Stop()

state := &ConsumerState{}
state.Delivered.Consumer = 22
state.Delivered.Stream = 22
state.AckFloor.Consumer = 11
state.AckFloor.Stream = 11
err = o.Update(state)
require_NoError(t, err)
o, err := fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit})
require_NoError(t, err)
defer o.Stop()

fs.Stop()
state.Delivered.Consumer = 22
state.Delivered.Stream = 22
state.AckFloor.Consumer = 11
state.AckFloor.Stream = 11
err = o.Update(state)
require_NoError(t, err)
}()

fs, err = newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil)
require_NoError(t, err)
defer fs.Stop()
func() { // for defers
fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil)
require_NoError(t, err)
defer fs.Stop()

o, err = fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit})
require_NoError(t, err)
o, err := fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit})
require_NoError(t, err)
defer o.Stop()

if o.(*consumerFileStore).state.Delivered != state.Delivered {
t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state)
}
if o.(*consumerFileStore).state.AckFloor != state.AckFloor {
t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state)
}
if o.(*consumerFileStore).state.Delivered != state.Delivered {
t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state)
}
if o.(*consumerFileStore).state.AckFloor != state.AckFloor {
t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state)
}
}()
})
}

Expand Down
Loading