Skip to content

Commit

Permalink
etcdserver,wal: fix inconsistencies in WAL and snapshot
Browse files Browse the repository at this point in the history
ref. #10219

Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed May 18, 2020
1 parent 2333747 commit cac67dd
Show file tree
Hide file tree
Showing 9 changed files with 338 additions and 34 deletions.
26 changes: 22 additions & 4 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,27 +228,45 @@ func (r *raftNode) start(rh *raftReadyHandler) {
r.transport.Send(r.processMessages(rd.Messages))
}

// Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
// ensure that recovery after a snapshot restore is possible.
if !raft.IsEmptySnap(rd.Snapshot) {
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
plog.Fatalf("failed to save Raft snapshot %v", err)
}
// gofail: var raftAfterSaveSnap struct{}
}

// gofail: var raftBeforeSave struct{}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
plog.Fatalf("raft save state and entries error: %v", err)
plog.Fatalf("failed to raft save state and entries %v", err)
}
if !raft.IsEmptyHardState(rd.HardState) {
proposalsCommitted.Set(float64(rd.HardState.Commit))
}
// gofail: var raftAfterSave struct{}

if !raft.IsEmptySnap(rd.Snapshot) {
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
plog.Fatalf("raft save snapshot error: %v", err)
// Force WAL to fsync its hard state before Release() releases
// old data from the WAL. Otherwise could get an error like:
// panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
// See https://github.com/etcd-io/etcd/issues/10219 for more details.
if err := r.storage.Sync(); err != nil {
plog.Fatalf("failed to sync Raft snapshot %v", err)
}

// etcdserver now claim the snapshot has been persisted onto the disk
notifyc <- struct{}{}

// gofail: var raftAfterSaveSnap struct{}
r.raftStorage.ApplySnapshot(rd.Snapshot)
plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
// gofail: var raftAfterApplySnap struct{}

if err := r.storage.Release(rd.Snapshot); err != nil {
plog.Fatalf("failed to release Raft wal %v", err)
}
}

r.raftStorage.Append(rd.Entries)
Expand Down
14 changes: 13 additions & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,15 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
if cfg.ShouldDiscover() {
plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
}
snapshot, err = ss.Load()

// Find a snapshot to start/restart a raft node
walSnaps, serr := wal.ValidSnapshotEntries(cfg.WALDir())
if serr != nil {
return nil, serr
}
// snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding
// wal log entries
snapshot, err = ss.LoadNewestAvailable(walSnaps)
if err != nil && err != snap.ErrNoSnapshot {
return nil, err
}
Expand Down Expand Up @@ -1556,6 +1564,10 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
}
plog.Infof("saved snapshot at index %d", snap.Metadata.Index)

if err = s.r.storage.Release(snap); err != nil {
plog.Panicf("failed to release wal %v", err)
}

// When sending a snapshot, etcd will pause compaction.
// After receives a snapshot, the slow follower needs to get all the entries right after
// the snapshot sent to catch up. If we do not pause compaction, the log entries right after
Expand Down
22 changes: 16 additions & 6 deletions etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,11 +922,11 @@ func TestSnapshot(t *testing.T) {
ch := make(chan struct{}, 2)

go func() {
gaction, _ := p.Wait(1)
gaction, _ := p.Wait(2)
defer func() { ch <- struct{}{} }()

if len(gaction) != 1 {
t.Fatalf("len(action) = %d, want 1", len(gaction))
if len(gaction) != 2 {
t.Fatalf("len(action) = %d, want 2", len(gaction))
}
if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) {
t.Errorf("action = %s, want SaveSnap", gaction[0])
Expand Down Expand Up @@ -1013,6 +1013,9 @@ func TestSnapshotOrdering(t *testing.T) {
if ac := <-p.Chan(); ac.Name != "Save" {
t.Fatalf("expected Save, got %+v", ac)
}
if ac := <-p.Chan(); ac.Name != "SaveSnap" {
t.Fatalf("expected Save, got %+v", ac)
}
if ac := <-p.Chan(); ac.Name != "Save" {
t.Fatalf("expected Save, got %+v", ac)
}
Expand All @@ -1022,7 +1025,10 @@ func TestSnapshotOrdering(t *testing.T) {
t.Fatalf("expected file %q, got missing", snapPath)
}
// unblock SaveSnapshot, etcdserver now permitted to move snapshot file
if ac := <-p.Chan(); ac.Name != "SaveSnap" {
if ac := <-p.Chan(); ac.Name != "Sync" {
t.Fatalf("expected SaveSnap, got %+v", ac)
}
if ac := <-p.Chan(); ac.Name != "Release" {
t.Fatalf("expected SaveSnap, got %+v", ac)
}
}
Expand Down Expand Up @@ -1059,17 +1065,21 @@ func TestTriggerSnap(t *testing.T) {

donec := make(chan struct{})
go func() {
wcnt := 2 + snapc
wcnt := 3 + snapc
gaction, _ := p.Wait(wcnt)

// each operation is recorded as a Save
// (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap
// (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap + Release
if len(gaction) != wcnt {
t.Logf("gaction: %v", gaction)
t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
}
if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) {
t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1])
}
if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "Release"}) {
t.Errorf("action = %s, want Release", gaction[wcnt-1])
}
close(donec)
}()

Expand Down
26 changes: 20 additions & 6 deletions etcdserver/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type Storage interface {
SaveSnap(snap raftpb.Snapshot) error
// Close closes the Storage and performs finalization.
Close() error
// Release releases the locked wal files older than the provided snapshot.
Release(snap raftpb.Snapshot) error
// Sync WAL
Sync() error
}

type storage struct {
Expand All @@ -45,22 +49,32 @@ func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
return &storage{w, s}
}

// SaveSnap saves the snapshot to disk and release the locked
// wal files since they will not be used.
// SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry.
func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
walsnap := walpb.Snapshot{
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
}
err := st.WAL.SaveSnapshot(walsnap)

// save the snapshot file before writing the snapshot to the wal.
// This makes it possible for the snapshot file to become orphaned, but prevents
// a WAL snapshot entry from having no corresponding snapshot file.
err := st.Snapshotter.SaveSnap(snap)
if err != nil {
return err
}
err = st.Snapshotter.SaveSnap(snap)
if err != nil {

return st.WAL.SaveSnapshot(walsnap)
}

// Release releases resources older than the given snap and are no longer needed:
// - releases the locks to the wal files that are older than the provided wal for the given snap.
// - deletes any .snap.db files that are older than the given snap.
func (st *storage) Release(snap raftpb.Snapshot) error {
if err := st.WAL.ReleaseLockTo(snap.Metadata.Index); err != nil {
return err
}
return st.WAL.ReleaseLockTo(snap.Metadata.Index)
return st.Snapshotter.ReleaseSnapDBs(snap)
}

func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
Expand Down
12 changes: 12 additions & 0 deletions pkg/mock/mockstorage/storage_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,16 @@ func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error {
return nil
}

func (p *storageRecorder) Release(st raftpb.Snapshot) error {
if !raft.IsEmptySnap(st) {
p.Record(testutil.Action{Name: "Release"})
}
return nil
}

func (p *storageRecorder) Sync() error {
p.Record(testutil.Action{Name: "Sync"})
return nil
}

func (p *storageRecorder) Close() error { return nil }
71 changes: 59 additions & 12 deletions snap/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"

Expand All @@ -31,7 +32,7 @@ import (
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap/snappb"

"github.com/coreos/etcd/wal/walpb"
"github.com/coreos/pkg/capnslog"
)

Expand Down Expand Up @@ -80,9 +81,8 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
d, err := snap.Marshal()
if err != nil {
return err
} else {
marshallingDurations.Observe(float64(time.Since(start)) / float64(time.Second))
}
marshallingDurations.Observe(float64(time.Since(start)) / float64(time.Second))

err = pioutil.WriteAndSyncFile(filepath.Join(s.dir, fname), d, 0666)
if err == nil {
Expand All @@ -97,20 +97,35 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
}

func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
return s.loadMatching(func(*raftpb.Snapshot) bool { return true })
}

// LoadNewestAvailable loads the newest snapshot available that is in walSnaps.
func (s *Snapshotter) LoadNewestAvailable(walSnaps []walpb.Snapshot) (*raftpb.Snapshot, error) {
return s.loadMatching(func(snapshot *raftpb.Snapshot) bool {
m := snapshot.Metadata
for i := len(walSnaps) - 1; i >= 0; i-- {
if m.Term == walSnaps[i].Term && m.Index == walSnaps[i].Index {
return true
}
}
return false
})
}

// loadMatching returns the newest snapshot where matchFn returns true.
func (s *Snapshotter) loadMatching(matchFn func(*raftpb.Snapshot) bool) (*raftpb.Snapshot, error) {
names, err := s.snapNames()
if err != nil {
return nil, err
}
var snap *raftpb.Snapshot
for _, name := range names {
if snap, err = loadSnap(s.dir, name); err == nil {
break
if snap, err = loadSnap(s.dir, name); err == nil && matchFn(snap) {
return snap, nil
}
}
if err != nil {
return nil, ErrNoSnapshot
}
return snap, nil
return nil, ErrNoSnapshot
}

func loadSnap(dir, name string) (*raftpb.Snapshot, error) {
Expand Down Expand Up @@ -172,7 +187,8 @@ func (s *Snapshotter) snapNames() ([]string, error) {
if err != nil {
return nil, err
}
if err = s.cleanupSnapdir(names); err != nil {
names, err = s.cleanupSnapdir(names)
if err != nil {
return nil, err
}
snaps := checkSuffix(names)
Expand Down Expand Up @@ -208,12 +224,43 @@ func renameBroken(path string) {

// cleanupSnapdir removes any files that should not be in the snapshot directory:
// - db.tmp prefixed files that can be orphaned by defragmentation
func (s *Snapshotter) cleanupSnapdir(filenames []string) error {
func (s *Snapshotter) cleanupSnapdir(filenames []string) (names []string, err error) {
for _, filename := range filenames {
if strings.HasPrefix(filename, "db.tmp") {
plog.Infof("found orphaned defragmentation file; deleting: %s", filename)
if rmErr := os.Remove(filepath.Join(s.dir, filename)); rmErr != nil && !os.IsNotExist(rmErr) {
return fmt.Errorf("failed to remove orphaned defragmentation file %s: %v", filename, rmErr)
return nil, fmt.Errorf("failed to remove orphaned defragmentation file %s: %v", filename, rmErr)
}
continue
}
names = append(names, filename)
}
return names, nil
}

func (s *Snapshotter) ReleaseSnapDBs(snap raftpb.Snapshot) error {
dir, err := os.Open(s.dir)
if err != nil {
return err
}
defer dir.Close()
filenames, err := dir.Readdirnames(-1)
if err != nil {
return err
}
for _, filename := range filenames {
if strings.HasSuffix(filename, ".snap.db") {
hexIndex := strings.TrimSuffix(filepath.Base(filename), ".snap.db")
index, err := strconv.ParseUint(hexIndex, 16, 64)
if err != nil {
plog.Warningf("failed to parse index from filename: %s (%v)", filename, err)
continue
}
if index < snap.Metadata.Index {
plog.Infof("found orphaned .snap.db file; deleting %q", filename)
if rmErr := os.Remove(filepath.Join(s.dir, filename)); rmErr != nil && !os.IsNotExist(rmErr) {
plog.Warningf("failed to remove orphaned .snap.db file: %s (%v)", filename, rmErr)
}
}
}
}
Expand Down
Loading

0 comments on commit cac67dd

Please sign in to comment.