From cac67ddc8e8f929ab68ca9f5897d50b14b7d1b5f Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Mon, 18 May 2020 01:54:15 -0700 Subject: [PATCH] etcdserver,wal: fix inconsistencies in WAL and snapshot ref. https://github.com/etcd-io/etcd/issues/10219 Signed-off-by: Gyuho Lee --- etcdserver/raft.go | 26 +++++-- etcdserver/server.go | 14 +++- etcdserver/server_test.go | 22 ++++-- etcdserver/storage.go | 26 +++++-- pkg/mock/mockstorage/storage_recorder.go | 12 ++++ snap/snapshotter.go | 71 +++++++++++++++---- snap/snapshotter_test.go | 87 ++++++++++++++++++++++-- wal/wal.go | 61 +++++++++++++++++ wal/wal_test.go | 53 +++++++++++++++ 9 files changed, 338 insertions(+), 34 deletions(-) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index f73df6c7dc0d..212dccbdcc8a 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -228,9 +228,19 @@ func (r *raftNode) start(rh *raftReadyHandler) { r.transport.Send(r.processMessages(rd.Messages)) } + // Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to + // ensure that recovery after a snapshot restore is possible. + if !raft.IsEmptySnap(rd.Snapshot) { + // gofail: var raftBeforeSaveSnap struct{} + if err := r.storage.SaveSnap(rd.Snapshot); err != nil { + plog.Fatalf("failed to save Raft snapshot %v", err) + } + // gofail: var raftAfterSaveSnap struct{} + } + // gofail: var raftBeforeSave struct{} if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { - plog.Fatalf("raft save state and entries error: %v", err) + plog.Fatalf("failed to raft save state and entries %v", err) } if !raft.IsEmptyHardState(rd.HardState) { proposalsCommitted.Set(float64(rd.HardState.Commit)) @@ -238,10 +248,14 @@ func (r *raftNode) start(rh *raftReadyHandler) { // gofail: var raftAfterSave struct{} if !raft.IsEmptySnap(rd.Snapshot) { - // gofail: var raftBeforeSaveSnap struct{} - if err := r.storage.SaveSnap(rd.Snapshot); err != nil { - plog.Fatalf("raft save snapshot error: %v", err) + // Force WAL to fsync its hard state before Release() releases + // old data from the WAL. Otherwise could get an error like: + // panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost? + // See https://github.com/etcd-io/etcd/issues/10219 for more details. + if err := r.storage.Sync(); err != nil { + plog.Fatalf("failed to sync Raft snapshot %v", err) } + // etcdserver now claim the snapshot has been persisted onto the disk notifyc <- struct{}{} @@ -249,6 +263,10 @@ func (r *raftNode) start(rh *raftReadyHandler) { r.raftStorage.ApplySnapshot(rd.Snapshot) plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index) // gofail: var raftAfterApplySnap struct{} + + if err := r.storage.Release(rd.Snapshot); err != nil { + plog.Fatalf("failed to release Raft wal %v", err) + } } r.raftStorage.Append(rd.Entries) diff --git a/etcdserver/server.go b/etcdserver/server.go index 7c1948c5aa4a..4fedb200734d 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -375,7 +375,15 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { if cfg.ShouldDiscover() { plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir()) } - snapshot, err = ss.Load() + + // Find a snapshot to start/restart a raft node + walSnaps, serr := wal.ValidSnapshotEntries(cfg.WALDir()) + if serr != nil { + return nil, serr + } + // snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding + // wal log entries + snapshot, err = ss.LoadNewestAvailable(walSnaps) if err != nil && err != snap.ErrNoSnapshot { return nil, err } @@ -1556,6 +1564,10 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { } plog.Infof("saved snapshot at index %d", snap.Metadata.Index) + if err = s.r.storage.Release(snap); err != nil { + plog.Panicf("failed to release wal %v", err) + } + // When sending a snapshot, etcd will pause compaction. // After receives a snapshot, the slow follower needs to get all the entries right after // the snapshot sent to catch up. If we do not pause compaction, the log entries right after diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 7ab52b5367e9..9713f426439d 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -922,11 +922,11 @@ func TestSnapshot(t *testing.T) { ch := make(chan struct{}, 2) go func() { - gaction, _ := p.Wait(1) + gaction, _ := p.Wait(2) defer func() { ch <- struct{}{} }() - if len(gaction) != 1 { - t.Fatalf("len(action) = %d, want 1", len(gaction)) + if len(gaction) != 2 { + t.Fatalf("len(action) = %d, want 2", len(gaction)) } if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) { t.Errorf("action = %s, want SaveSnap", gaction[0]) @@ -1013,6 +1013,9 @@ func TestSnapshotOrdering(t *testing.T) { if ac := <-p.Chan(); ac.Name != "Save" { t.Fatalf("expected Save, got %+v", ac) } + if ac := <-p.Chan(); ac.Name != "SaveSnap" { + t.Fatalf("expected Save, got %+v", ac) + } if ac := <-p.Chan(); ac.Name != "Save" { t.Fatalf("expected Save, got %+v", ac) } @@ -1022,7 +1025,10 @@ func TestSnapshotOrdering(t *testing.T) { t.Fatalf("expected file %q, got missing", snapPath) } // unblock SaveSnapshot, etcdserver now permitted to move snapshot file - if ac := <-p.Chan(); ac.Name != "SaveSnap" { + if ac := <-p.Chan(); ac.Name != "Sync" { + t.Fatalf("expected SaveSnap, got %+v", ac) + } + if ac := <-p.Chan(); ac.Name != "Release" { t.Fatalf("expected SaveSnap, got %+v", ac) } } @@ -1059,17 +1065,21 @@ func TestTriggerSnap(t *testing.T) { donec := make(chan struct{}) go func() { - wcnt := 2 + snapc + wcnt := 3 + snapc gaction, _ := p.Wait(wcnt) // each operation is recorded as a Save - // (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap + // (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap + Release if len(gaction) != wcnt { + t.Logf("gaction: %v", gaction) t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt) } if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) { t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1]) } + if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "Release"}) { + t.Errorf("action = %s, want Release", gaction[wcnt-1]) + } close(donec) }() diff --git a/etcdserver/storage.go b/etcdserver/storage.go index 55c2dd4b6a41..2cddb5cb218d 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -34,6 +34,10 @@ type Storage interface { SaveSnap(snap raftpb.Snapshot) error // Close closes the Storage and performs finalization. Close() error + // Release releases the locked wal files older than the provided snapshot. + Release(snap raftpb.Snapshot) error + // Sync WAL + Sync() error } type storage struct { @@ -45,22 +49,32 @@ func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage { return &storage{w, s} } -// SaveSnap saves the snapshot to disk and release the locked -// wal files since they will not be used. +// SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry. func (st *storage) SaveSnap(snap raftpb.Snapshot) error { walsnap := walpb.Snapshot{ Index: snap.Metadata.Index, Term: snap.Metadata.Term, } - err := st.WAL.SaveSnapshot(walsnap) + + // save the snapshot file before writing the snapshot to the wal. + // This makes it possible for the snapshot file to become orphaned, but prevents + // a WAL snapshot entry from having no corresponding snapshot file. + err := st.Snapshotter.SaveSnap(snap) if err != nil { return err } - err = st.Snapshotter.SaveSnap(snap) - if err != nil { + + return st.WAL.SaveSnapshot(walsnap) +} + +// Release releases resources older than the given snap and are no longer needed: +// - releases the locks to the wal files that are older than the provided wal for the given snap. +// - deletes any .snap.db files that are older than the given snap. +func (st *storage) Release(snap raftpb.Snapshot) error { + if err := st.WAL.ReleaseLockTo(snap.Metadata.Index); err != nil { return err } - return st.WAL.ReleaseLockTo(snap.Metadata.Index) + return st.Snapshotter.ReleaseSnapDBs(snap) } func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) { diff --git a/pkg/mock/mockstorage/storage_recorder.go b/pkg/mock/mockstorage/storage_recorder.go index 4ecab9831b3d..362912f6dcc5 100644 --- a/pkg/mock/mockstorage/storage_recorder.go +++ b/pkg/mock/mockstorage/storage_recorder.go @@ -45,4 +45,16 @@ func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error { return nil } +func (p *storageRecorder) Release(st raftpb.Snapshot) error { + if !raft.IsEmptySnap(st) { + p.Record(testutil.Action{Name: "Release"}) + } + return nil +} + +func (p *storageRecorder) Sync() error { + p.Record(testutil.Action{Name: "Sync"}) + return nil +} + func (p *storageRecorder) Close() error { return nil } diff --git a/snap/snapshotter.go b/snap/snapshotter.go index 5d79b175915e..1d73a1c2a271 100644 --- a/snap/snapshotter.go +++ b/snap/snapshotter.go @@ -23,6 +23,7 @@ import ( "os" "path/filepath" "sort" + "strconv" "strings" "time" @@ -31,7 +32,7 @@ import ( "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap/snappb" - + "github.com/coreos/etcd/wal/walpb" "github.com/coreos/pkg/capnslog" ) @@ -80,9 +81,8 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error { d, err := snap.Marshal() if err != nil { return err - } else { - marshallingDurations.Observe(float64(time.Since(start)) / float64(time.Second)) } + marshallingDurations.Observe(float64(time.Since(start)) / float64(time.Second)) err = pioutil.WriteAndSyncFile(filepath.Join(s.dir, fname), d, 0666) if err == nil { @@ -97,20 +97,35 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error { } func (s *Snapshotter) Load() (*raftpb.Snapshot, error) { + return s.loadMatching(func(*raftpb.Snapshot) bool { return true }) +} + +// LoadNewestAvailable loads the newest snapshot available that is in walSnaps. +func (s *Snapshotter) LoadNewestAvailable(walSnaps []walpb.Snapshot) (*raftpb.Snapshot, error) { + return s.loadMatching(func(snapshot *raftpb.Snapshot) bool { + m := snapshot.Metadata + for i := len(walSnaps) - 1; i >= 0; i-- { + if m.Term == walSnaps[i].Term && m.Index == walSnaps[i].Index { + return true + } + } + return false + }) +} + +// loadMatching returns the newest snapshot where matchFn returns true. +func (s *Snapshotter) loadMatching(matchFn func(*raftpb.Snapshot) bool) (*raftpb.Snapshot, error) { names, err := s.snapNames() if err != nil { return nil, err } var snap *raftpb.Snapshot for _, name := range names { - if snap, err = loadSnap(s.dir, name); err == nil { - break + if snap, err = loadSnap(s.dir, name); err == nil && matchFn(snap) { + return snap, nil } } - if err != nil { - return nil, ErrNoSnapshot - } - return snap, nil + return nil, ErrNoSnapshot } func loadSnap(dir, name string) (*raftpb.Snapshot, error) { @@ -172,7 +187,8 @@ func (s *Snapshotter) snapNames() ([]string, error) { if err != nil { return nil, err } - if err = s.cleanupSnapdir(names); err != nil { + names, err = s.cleanupSnapdir(names) + if err != nil { return nil, err } snaps := checkSuffix(names) @@ -208,12 +224,43 @@ func renameBroken(path string) { // cleanupSnapdir removes any files that should not be in the snapshot directory: // - db.tmp prefixed files that can be orphaned by defragmentation -func (s *Snapshotter) cleanupSnapdir(filenames []string) error { +func (s *Snapshotter) cleanupSnapdir(filenames []string) (names []string, err error) { for _, filename := range filenames { if strings.HasPrefix(filename, "db.tmp") { plog.Infof("found orphaned defragmentation file; deleting: %s", filename) if rmErr := os.Remove(filepath.Join(s.dir, filename)); rmErr != nil && !os.IsNotExist(rmErr) { - return fmt.Errorf("failed to remove orphaned defragmentation file %s: %v", filename, rmErr) + return nil, fmt.Errorf("failed to remove orphaned defragmentation file %s: %v", filename, rmErr) + } + continue + } + names = append(names, filename) + } + return names, nil +} + +func (s *Snapshotter) ReleaseSnapDBs(snap raftpb.Snapshot) error { + dir, err := os.Open(s.dir) + if err != nil { + return err + } + defer dir.Close() + filenames, err := dir.Readdirnames(-1) + if err != nil { + return err + } + for _, filename := range filenames { + if strings.HasSuffix(filename, ".snap.db") { + hexIndex := strings.TrimSuffix(filepath.Base(filename), ".snap.db") + index, err := strconv.ParseUint(hexIndex, 16, 64) + if err != nil { + plog.Warningf("failed to parse index from filename: %s (%v)", filename, err) + continue + } + if index < snap.Metadata.Index { + plog.Infof("found orphaned .snap.db file; deleting %q", filename) + if rmErr := os.Remove(filepath.Join(s.dir, filename)); rmErr != nil && !os.IsNotExist(rmErr) { + plog.Warningf("failed to remove orphaned .snap.db file: %s (%v)", filename, rmErr) + } } } } diff --git a/snap/snapshotter_test.go b/snap/snapshotter_test.go index 6af823f03bfa..926b75a807fb 100644 --- a/snap/snapshotter_test.go +++ b/snap/snapshotter_test.go @@ -23,7 +23,9 @@ import ( "reflect" "testing" + "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/wal/walpb" ) var testSnap = &raftpb.Snapshot{ @@ -165,12 +167,48 @@ func TestLoadNewestSnap(t *testing.T) { t.Fatal(err) } - g, err := ss.Load() - if err != nil { - t.Errorf("err = %v, want nil", err) + cases := []struct { + name string + availableWalSnaps []walpb.Snapshot + expected *raftpb.Snapshot + }{ + { + name: "load-newest", + expected: &newSnap, + }, + { + name: "loadnewestavailable-newest", + availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}, {Index: 5, Term: 1}}, + expected: &newSnap, + }, + { + name: "loadnewestavailable-newest-unsorted", + availableWalSnaps: []walpb.Snapshot{{Index: 5, Term: 1}, {Index: 1, Term: 1}, {Index: 0, Term: 0}}, + expected: &newSnap, + }, + { + name: "loadnewestavailable-previous", + availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}}, + expected: testSnap, + }, } - if !reflect.DeepEqual(g, &newSnap) { - t.Errorf("snap = %#v, want %#v", g, &newSnap) + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + var err error + var g *raftpb.Snapshot + if tc.availableWalSnaps != nil { + g, err = ss.LoadNewestAvailable(tc.availableWalSnaps) + } else { + g, err = ss.Load() + } + if err != nil { + t.Errorf("err = %v, want nil", err) + } + if !reflect.DeepEqual(g, tc.expected) { + t.Errorf("snap = %#v, want %#v", g, tc.expected) + } + }) } } @@ -228,3 +266,42 @@ func TestAllSnapshotBroken(t *testing.T) { t.Errorf("err = %v, want %v", err, ErrNoSnapshot) } } + +func TestReleaseSnapDBs(t *testing.T) { + dir := filepath.Join(os.TempDir(), "snapshot") + err := os.Mkdir(dir, 0700) + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + snapIndices := []uint64{100, 200, 300, 400} + for _, index := range snapIndices { + filename := filepath.Join(dir, fmt.Sprintf("%016x.snap.db", index)) + if err := ioutil.WriteFile(filename, []byte("snap file\n"), 0644); err != nil { + t.Fatal(err) + } + } + + ss := New(dir) + + if err := ss.ReleaseSnapDBs(raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 300}}); err != nil { + t.Fatal(err) + } + + deleted := []uint64{100, 200} + for _, index := range deleted { + filename := filepath.Join(dir, fmt.Sprintf("%016x.snap.db", index)) + if fileutil.Exist(filename) { + t.Errorf("expected %s (index: %d) to be deleted, but it still exists", filename, index) + } + } + + retained := []uint64{300, 400} + for _, index := range retained { + filename := filepath.Join(dir, fmt.Sprintf("%016x.snap.db", index)) + if !fileutil.Exist(filename) { + t.Errorf("expected %s (index: %d) to be retained, but it no longer exists", filename, index) + } + } +} diff --git a/wal/wal.go b/wal/wal.go index 6909e3ac7afd..13f7ca5d3357 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -420,6 +420,63 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. return metadata, state, ents, err } +// ValidSnapshotEntries returns all the valid snapshot entries in the wal logs in the given directory. +// Snapshot entries are valid if their index is less than or equal to the most recent committed hardstate. +func ValidSnapshotEntries(walDir string) ([]walpb.Snapshot, error) { + var snaps []walpb.Snapshot + var state raftpb.HardState + var err error + + rec := &walpb.Record{} + names, err := readWalNames(walDir) + if err != nil { + return nil, err + } + + // open wal files in read mode, so that there is no conflict + // when the same WAL is opened elsewhere in write mode + rs, _, closer, err := openWALFiles(walDir, names, 0, false) + if err != nil { + return nil, err + } + defer func() { + if closer != nil { + closer() + } + }() + + // create a new decoder from the readers on the WAL files + decoder := newDecoder(rs...) + + for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) { + switch rec.Type { + case snapshotType: + var loadedSnap walpb.Snapshot + pbutil.MustUnmarshal(&loadedSnap, rec.Data) + snaps = append(snaps, loadedSnap) + case stateType: + state = mustUnmarshalState(rec.Data) + } + } + // We do not have to read out all the WAL entries + // as the decoder is opened in read mode. + if err != io.EOF && err != io.ErrUnexpectedEOF { + return nil, err + } + + // filter out any snaps that are newer than the committed hardstate + n := 0 + for _, s := range snaps { + if s.Index <= state.Commit { + snaps[n] = s + n++ + } + } + snaps = snaps[:n:n] + + return snaps, nil +} + // Verify reads through the given WAL and verifies that it is not corrupted. // It creates a new decoder to read through the records of the given WAL. // It does not conflict with any open WAL, but it is recommended not to @@ -599,6 +656,10 @@ func (w *WAL) sync() error { return err } +func (w *WAL) Sync() error { + return w.sync() +} + // ReleaseLockTo releases the locks, which has smaller index than the given index // except the largest one among them. // For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release diff --git a/wal/wal_test.go b/wal/wal_test.go index b060da191284..4c4b667f0cf9 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -852,3 +852,56 @@ func TestOpenOnTornWrite(t *testing.T) { t.Fatalf("expected len(ents) = %d, got %d", wEntries, len(ents)) } } + +// TestValidSnapshotEntries ensures ValidSnapshotEntries returns all valid wal snapshot entries, accounting +// for hardstate +func TestValidSnapshotEntries(t *testing.T) { + p, err := ioutil.TempDir(os.TempDir(), "waltest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(p) + snap0 := walpb.Snapshot{Index: 0, Term: 0} + snap1 := walpb.Snapshot{Index: 1, Term: 1} + state1 := raftpb.HardState{Commit: 1, Term: 1} + snap2 := walpb.Snapshot{Index: 2, Term: 1} + snap3 := walpb.Snapshot{Index: 3, Term: 2} + state2 := raftpb.HardState{Commit: 3, Term: 2} + snap4 := walpb.Snapshot{Index: 4, Term: 2} // will be orphaned since the last committed entry will be snap3 + func() { + var w *WAL + w, err = Create(p, nil) + if err != nil { + t.Fatal(err) + } + defer w.Close() + + // snap0 is implicitly created at index 0, term 0 + if err = w.SaveSnapshot(snap1); err != nil { + t.Fatal(err) + } + if err = w.Save(state1, nil); err != nil { + t.Fatal(err) + } + if err = w.SaveSnapshot(snap2); err != nil { + t.Fatal(err) + } + if err = w.SaveSnapshot(snap3); err != nil { + t.Fatal(err) + } + if err = w.Save(state2, nil); err != nil { + t.Fatal(err) + } + if err = w.SaveSnapshot(snap4); err != nil { + t.Fatal(err) + } + }() + walSnaps, serr := ValidSnapshotEntries(p) + if serr != nil { + t.Fatal(serr) + } + expected := []walpb.Snapshot{snap0, snap1, snap2, snap3} + if !reflect.DeepEqual(walSnaps, expected) { + t.Errorf("expected walSnaps %+v, got %+v", expected, walSnaps) + } +}