diff --git a/server/filestore.go b/server/filestore.go index 270556d12bb..9f7f32f4a58 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -11764,29 +11764,47 @@ func (alg StoreCompression) Decompress(buf []byte) ([]byte, error) { // sets O_SYNC on the open file if SyncAlways is set. The dios semaphore is // handled automatically by this function, so don't wrap calls to it in dios. func (fs *fileStore) writeFileWithOptionalSync(name string, data []byte, perm fs.FileMode) error { - if fs.fcfg.SyncAlways { - return writeFileWithSync(name, data, perm) - } - <-dios - defer func() { - dios <- struct{}{} - }() - return os.WriteFile(name, data, perm) + return writeAtomically(name, data, perm, fs.fcfg.SyncAlways) } func writeFileWithSync(name string, data []byte, perm fs.FileMode) error { + return writeAtomically(name, data, perm, true) +} + +func writeAtomically(name string, data []byte, perm fs.FileMode, sync bool) error { + tmp := name + ".tmp" + flags := os.O_CREATE | os.O_WRONLY | os.O_TRUNC + if sync { + flags = flags | os.O_SYNC + } <-dios defer func() { dios <- struct{}{} }() - flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC | os.O_SYNC - f, err := os.OpenFile(name, flags, perm) + f, err := os.OpenFile(tmp, flags, perm) if err != nil { return err } - if _, err = f.Write(data); err != nil { + if _, err := f.Write(data); err != nil { _ = f.Close() + _ = os.Remove(tmp) return err } - return f.Close() + if err := f.Close(); err != nil { + _ = os.Remove(tmp) + return err + } + if err := os.Rename(tmp, name); err != nil { + _ = os.Remove(tmp) + return err + } + if sync { + // To ensure that the file rename was persisted on all filesystems, + // also try to flush the directory metadata. + if d, err := os.Open(filepath.Dir(name)); err == nil { + _ = d.Sync() + _ = d.Close() + } + } + return nil } diff --git a/server/filestore_test.go b/server/filestore_test.go index a903f99d21a..42b3f059ff9 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -4182,6 +4182,7 @@ func TestFileStoreEncrypted(t *testing.T) { err = o.Update(state) require_NoError(t, err) + o.Stop() fs.Stop() fs, err = newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, created, prf(&fcfg), nil) require_NoError(t, err) @@ -4982,36 +4983,41 @@ func TestFileStoreSubjectsTotals(t *testing.T) { func TestFileStoreConsumerStoreEncodeAfterRestart(t *testing.T) { testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { - fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil) - require_NoError(t, err) - defer fs.Stop() + state := &ConsumerState{} - o, err := fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit}) - require_NoError(t, err) + func() { // for defers + fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() - state := &ConsumerState{} - state.Delivered.Consumer = 22 - state.Delivered.Stream = 22 - state.AckFloor.Consumer = 11 - state.AckFloor.Stream = 11 - err = o.Update(state) - require_NoError(t, err) + o, err := fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit}) + require_NoError(t, err) + defer o.Stop() - fs.Stop() + state.Delivered.Consumer = 22 + state.Delivered.Stream = 22 + state.AckFloor.Consumer = 11 + state.AckFloor.Stream = 11 + err = o.Update(state) + require_NoError(t, err) + }() - fs, err = newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil) - require_NoError(t, err) - defer fs.Stop() + func() { // for defers + fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() - o, err = fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit}) - require_NoError(t, err) + o, err := fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit}) + require_NoError(t, err) + defer o.Stop() - if o.(*consumerFileStore).state.Delivered != state.Delivered { - t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state) - } - if o.(*consumerFileStore).state.AckFloor != state.AckFloor { - t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state) - } + if o.(*consumerFileStore).state.Delivered != state.Delivered { + t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state) + } + if o.(*consumerFileStore).state.AckFloor != state.AckFloor { + t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state) + } + }() }) }