69
69
// A just opened WAL is in read mode, and ready for reading records.
70
70
// The WAL will be ready for appending after reading out all the previous records.
71
71
type WAL struct {
72
- dir string // the living directory of the underlay files
72
+ dir string // the living directory of the underlay files
73
+
74
+ // dirFile is a fd for the wal directory for syncing on Rename
75
+ dirFile * os.File
76
+
73
77
metadata []byte // metadata recorded at the head of each WAL
74
78
state raftpb.HardState // hardstate recorded at the head of WAL
75
79
@@ -131,7 +135,24 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
131
135
return nil , err
132
136
}
133
137
134
- return w .renameWal (tmpdirpath )
138
+ w , err = w .renameWal (tmpdirpath )
139
+ if err != nil {
140
+ return nil , err
141
+ }
142
+
143
+ // directory was renamed; sync parent dir to persist rename
144
+ pdir , perr := openSyncDir (path .Dir (w .dir ))
145
+ if perr != nil {
146
+ return nil , perr
147
+ }
148
+ if perr = fileutil .Fsync (pdir ); perr != nil {
149
+ return nil , perr
150
+ }
151
+ if perr = pdir .Close (); err != nil {
152
+ return nil , perr
153
+ }
154
+
155
+ return w , nil
135
156
}
136
157
137
158
// Open opens the WAL at the given snap.
@@ -141,7 +162,14 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
141
162
// the given snap. The WAL cannot be appended to before reading out all of its
142
163
// previous records.
143
164
func Open (dirpath string , snap walpb.Snapshot ) (* WAL , error ) {
144
- return openAtIndex (dirpath , snap , true )
165
+ w , err := openAtIndex (dirpath , snap , true )
166
+ if err != nil {
167
+ return nil , err
168
+ }
169
+ if err = w .openDir (); err != nil {
170
+ return nil , err
171
+ }
172
+ return w , nil
145
173
}
146
174
147
175
// OpenForRead only opens the wal files for read.
@@ -373,6 +401,10 @@ func (w *WAL) cut() error {
373
401
if err = os .Rename (newTail .Name (), fpath ); err != nil {
374
402
return err
375
403
}
404
+ if err = fileutil .Fsync (w .dirFile ); err != nil {
405
+ return err
406
+ }
407
+
376
408
newTail .Close ()
377
409
378
410
if newTail , err = fileutil .LockFile (fpath , os .O_WRONLY , fileutil .PrivateFileMode ); err != nil {
@@ -467,6 +499,7 @@ func (w *WAL) Close() error {
467
499
return err
468
500
}
469
501
}
502
+
470
503
for _ , l := range w .locks {
471
504
if l == nil {
472
505
continue
@@ -475,6 +508,11 @@ func (w *WAL) Close() error {
475
508
plog .Errorf ("failed to unlock during closing wal: %s" , err )
476
509
}
477
510
}
511
+
512
+ if err := w .dirFile .Close (); err != nil {
513
+ return err
514
+ }
515
+
478
516
return nil
479
517
}
480
518
@@ -574,6 +612,15 @@ func (w *WAL) seq() uint64 {
574
612
return seq
575
613
}
576
614
615
+ func (w * WAL ) openDir () error {
616
+ df , derr := openSyncDir (w .dir )
617
+ if derr != nil {
618
+ return derr
619
+ }
620
+ w .dirFile = df
621
+ return nil
622
+ }
623
+
577
624
func mustSync (st , prevst raftpb.HardState , entsnum int ) bool {
578
625
// Persistent state on all servers:
579
626
// (Updated on stable storage before responding to RPCs)
0 commit comments