Skip to content

Commit

Permalink
wal: fsync directory after wal file rename
Browse files Browse the repository at this point in the history
  • Loading branch information
Anthony Romano committed Sep 8, 2016
1 parent 0b63502 commit d2e62f9
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 4 deletions.
53 changes: 50 additions & 3 deletions wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ var (
// A just opened WAL is in read mode, and ready for reading records.
// The WAL will be ready for appending after reading out all the previous records.
type WAL struct {
dir string // the living directory of the underlay files
dir string // the living directory of the underlay files

// dirFile is a fd for the wal directory for syncing on Rename
dirFile *os.File

metadata []byte // metadata recorded at the head of each WAL
state raftpb.HardState // hardstate recorded at the head of WAL

Expand Down Expand Up @@ -131,7 +135,24 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
return nil, err
}

return w.renameWal(tmpdirpath)
w, err = w.renameWal(tmpdirpath)
if err != nil {
return nil, err
}

// directory was renamed; sync parent dir to persist rename
pdir, perr := openSyncDir(path.Dir(w.dir))
if perr != nil {
return nil, perr
}
if perr = fileutil.Fsync(pdir); perr != nil {
return nil, perr
}
if perr = pdir.Close(); err != nil {
return nil, perr
}

return w, nil
}

// Open opens the WAL at the given snap.
Expand All @@ -141,7 +162,14 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
// the given snap. The WAL cannot be appended to before reading out all of its
// previous records.
func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
return openAtIndex(dirpath, snap, true)
w, err := openAtIndex(dirpath, snap, true)
if err != nil {
return nil, err
}
if err = w.openDir(); err != nil {
return nil, err
}
return w, nil
}

// OpenForRead only opens the wal files for read.
Expand Down Expand Up @@ -373,6 +401,10 @@ func (w *WAL) cut() error {
if err = os.Rename(newTail.Name(), fpath); err != nil {
return err
}
if err = fileutil.Fsync(w.dirFile); err != nil {
return err
}

newTail.Close()

if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
Expand Down Expand Up @@ -467,6 +499,7 @@ func (w *WAL) Close() error {
return err
}
}

for _, l := range w.locks {
if l == nil {
continue
Expand All @@ -475,6 +508,11 @@ func (w *WAL) Close() error {
plog.Errorf("failed to unlock during closing wal: %s", err)
}
}

if err := w.dirFile.Close(); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -574,6 +612,15 @@ func (w *WAL) seq() uint64 {
return seq
}

func (w *WAL) openDir() error {
df, derr := openSyncDir(w.dir)
if derr != nil {
return derr
}
w.dirFile = df
return nil
}

func mustSync(st, prevst raftpb.HardState, entsnum int) bool {
// Persistent state on all servers:
// (Updated on stable storage before responding to RPCs)
Expand Down
4 changes: 3 additions & 1 deletion wal/wal_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,7 @@ func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
}

w.fp = newFilePipeline(w.dir, SegmentSizeBytes)
return w, nil
return w, w.openDir()
}

func openSyncDir(dir string) (*os.File, error) { return os.Open(dir) }
6 changes: 6 additions & 0 deletions wal/wal_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package wal
import (
"os"

"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/wal/walpb"
)

Expand All @@ -37,5 +38,10 @@ func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
newWAL.Close()
return nil, err
}

return newWAL, nil
}

func openSyncDir(dir string) (*os.File, error) {
return os.OpenFile(dir, os.O_WRONLY, fileutil.PrivateFileMode)
}

0 comments on commit d2e62f9

Please sign in to comment.