diff --git a/wal/wal.go b/wal/wal.go index 13193c064a40..7457c1ae85b1 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -69,7 +69,11 @@ var ( // A just opened WAL is in read mode, and ready for reading records. // The WAL will be ready for appending after reading out all the previous records. type WAL struct { - dir string // the living directory of the underlay files + dir string // the living directory of the underlay files + + // dirFile is a fd for the wal directory for syncing on Rename + dirFile *os.File + metadata []byte // metadata recorded at the head of each WAL state raftpb.HardState // hardstate recorded at the head of WAL @@ -108,10 +112,10 @@ func Create(dirpath string, metadata []byte) (*WAL, error) { if err != nil { return nil, err } - if _, err := f.Seek(0, os.SEEK_END); err != nil { + if _, err = f.Seek(0, os.SEEK_END); err != nil { return nil, err } - if err := fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil { + if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil { return nil, err } @@ -121,17 +125,33 @@ func Create(dirpath string, metadata []byte) (*WAL, error) { encoder: newEncoder(f, 0), } w.locks = append(w.locks, f) - if err := w.saveCrc(0); err != nil { + if err = w.saveCrc(0); err != nil { return nil, err } - if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil { + if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil { return nil, err } - if err := w.SaveSnapshot(walpb.Snapshot{}); err != nil { + if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil { return nil, err } - return w.renameWal(tmpdirpath) + if w, err = w.renameWal(tmpdirpath); err != nil { + return nil, err + } + + // directory was renamed; sync parent dir to persist rename + pdir, perr := openSyncDir(path.Dir(w.dir)) + if perr != nil { + return nil, perr + } + if perr = fileutil.Fsync(pdir); perr != nil { + return nil, perr + } + if perr = pdir.Close(); err != nil { + return nil, perr + } + + return w, nil } // Open opens the WAL at the given snap. @@ -141,7 +161,14 @@ func Create(dirpath string, metadata []byte) (*WAL, error) { // the given snap. The WAL cannot be appended to before reading out all of its // previous records. func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) { - return openAtIndex(dirpath, snap, true) + w, err := openAtIndex(dirpath, snap, true) + if err != nil { + return nil, err + } + if err = w.openDir(); err != nil { + return nil, err + } + return w, nil } // OpenForRead only opens the wal files for read. @@ -373,6 +400,10 @@ func (w *WAL) cut() error { if err = os.Rename(newTail.Name(), fpath); err != nil { return err } + if err = fileutil.Fsync(w.dirFile); err != nil { + return err + } + newTail.Close() if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil { @@ -467,6 +498,7 @@ func (w *WAL) Close() error { return err } } + for _, l := range w.locks { if l == nil { continue @@ -475,6 +507,11 @@ func (w *WAL) Close() error { plog.Errorf("failed to unlock during closing wal: %s", err) } } + + if err := w.dirFile.Close(); err != nil { + return err + } + return nil } @@ -574,6 +611,15 @@ func (w *WAL) seq() uint64 { return seq } +func (w *WAL) openDir() error { + df, derr := openSyncDir(w.dir) + if derr != nil { + return derr + } + w.dirFile = df + return nil +} + func mustSync(st, prevst raftpb.HardState, entsnum int) bool { // Persistent state on all servers: // (Updated on stable storage before responding to RPCs) diff --git a/wal/wal_unix.go b/wal/wal_unix.go index 101ea6acc3c8..9c826b65d8a6 100644 --- a/wal/wal_unix.go +++ b/wal/wal_unix.go @@ -34,5 +34,7 @@ func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) { } w.fp = newFilePipeline(w.dir, SegmentSizeBytes) - return w, nil + return w, w.openDir() } + +func openSyncDir(dir string) (*os.File, error) { return os.Open(dir) } diff --git a/wal/wal_windows.go b/wal/wal_windows.go index 0b9e434cf546..c120f2c049e0 100644 --- a/wal/wal_windows.go +++ b/wal/wal_windows.go @@ -17,6 +17,7 @@ package wal import ( "os" + "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/wal/walpb" ) @@ -37,5 +38,10 @@ func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) { newWAL.Close() return nil, err } + return newWAL, nil } + +func openSyncDir(dir string) (*os.File, error) { + return fileutil.OpenDir(dir) +}