From c72cdf1fc8fc9b347884a409a27d24e2c1614408 Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Fri, 5 Aug 2016 13:32:23 -0700 Subject: [PATCH] raft: Garbage collect WAL files We currently garbage collect snapshot files (keeping only KeepOldSnapshot outdated snapshots, which defaults to 0). However, we don't garbage collect the WAL files that the snapshots replace. Delete any WALs which are so old that they only contain information that predates the oldest of the snapshots we have retained. This means that by default, we will remove old WALs once they are supplanted by a snapshot. However, if KeepOldSnapshots is set above 0, we will keep whichever WALs are necessary to catch up from the oldest of the retained snapshots. This makes sure that the old snapshots we retain are actually useful, and avoids adding an independent knob for WAL retention that might end up with an inconsistent setting. Also, fix serious brokenness in the the deletion of old snapshots (it was deleting the most recent outdated snapshots, instead of the oldest). Signed-off-by: Aaron Lehmann --- manager/state/raft/storage.go | 109 ++++++++++++++--- manager/state/raft/storage_test.go | 180 +++++++++++++++++++++++++++++ 2 files changed, 274 insertions(+), 15 deletions(-) diff --git a/manager/state/raft/storage.go b/manager/state/raft/storage.go index 95074ffc19..e47fe01b2f 100644 --- a/manager/state/raft/storage.go +++ b/manager/state/raft/storage.go @@ -10,6 +10,7 @@ import ( "sort" "strings" + "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" @@ -80,7 +81,7 @@ func (n *Node) createWAL(nodeID string) (raft.Peer, error) { } n.wal, err = wal.Create(n.walDir(), metadata) if err != nil { - return raft.Peer{}, fmt.Errorf("create wal error: %v", err) + return raft.Peer{}, fmt.Errorf("create WAL error: %v", err) } n.cluster.AddMember(&membership.Member{RaftMember: raftNode}) @@ -127,7 +128,7 @@ func (n *Node) readWAL(ctx context.Context, snapshot *raftpb.Snapshot, forceNewC repaired := false for { if n.wal, err = wal.Open(n.walDir(), walsnap); err != nil { - return fmt.Errorf("open wal error: %v", err) + return fmt.Errorf("open WAL error: %v", err) } if metadata, st, ents, err = n.wal.ReadAll(); err != nil { if err := n.wal.Close(); err != nil { @@ -135,7 +136,7 @@ func (n *Node) readWAL(ctx context.Context, snapshot *raftpb.Snapshot, forceNewC } // we can only repair ErrUnexpectedEOF and we never repair twice. if repaired || err != io.ErrUnexpectedEOF { - return fmt.Errorf("read wal error (%v) and cannot be repaired", err) + return fmt.Errorf("read WAL error (%v) and cannot be repaired", err) } if !wal.Repair(n.walDir()) { return fmt.Errorf("WAL error (%v) cannot be repaired", err) @@ -157,7 +158,7 @@ func (n *Node) readWAL(ctx context.Context, snapshot *raftpb.Snapshot, forceNewC var raftNode api.RaftMember if err := raftNode.Unmarshal(metadata); err != nil { - return fmt.Errorf("error unmarshalling wal metadata: %v", err) + return fmt.Errorf("error unmarshalling WAL metadata: %v", err) } n.Config.ID = raftNode.RaftID @@ -274,25 +275,103 @@ func (n *Node) saveSnapshot(snapshot raftpb.Snapshot, keepOldSnapshots uint64) e // This means that if the current snapshot doesn't appear in the // directory for some strange reason, we won't delete anything, which // is the safe behavior. + curSnapshotIdx := -1 var ( - afterCurSnapshot bool - removeErr error + removeErr error + oldestSnapshot string ) + for i, snapFile := range snapshots { - if afterCurSnapshot { - if uint64(len(snapshots)-i) <= keepOldSnapshots { - return removeErr - } - err := os.Remove(filepath.Join(n.snapDir(), snapFile)) - if err != nil && removeErr == nil { - removeErr = err + if curSnapshotIdx >= 0 && i > curSnapshotIdx { + if uint64(i-curSnapshotIdx) > keepOldSnapshots { + err := os.Remove(filepath.Join(n.snapDir(), snapFile)) + if err != nil && removeErr == nil { + removeErr = err + } + continue } } else if snapFile == curSnapshot { - afterCurSnapshot = true + curSnapshotIdx = i + } + oldestSnapshot = snapFile + } + + if removeErr != nil { + return removeErr + } + + // Remove any WAL files that only contain data from before the oldest + // remaining snapshot. + + if oldestSnapshot == "" { + return nil + } + + // Parse index out of oldest snapshot's filename + var snapTerm, snapIndex uint64 + _, err = fmt.Sscanf(oldestSnapshot, "%016x-%016x.snap", &snapTerm, &snapIndex) + if err != nil { + return fmt.Errorf("malformed snapshot filename %s: %v", oldestSnapshot, err) + } + + // List the WALs + dirents, err = ioutil.ReadDir(n.walDir()) + if err != nil { + return err + } + + var wals []string + for _, dirent := range dirents { + if strings.HasSuffix(dirent.Name(), ".wal") { + wals = append(wals, dirent.Name()) } } - return removeErr + // Sort WAL filenames in lexical order + sort.Sort(sort.StringSlice(wals)) + + found := false + deleteUntil := -1 + + for i, walName := range wals { + var walSeq, walIndex uint64 + _, err = fmt.Sscanf(walName, "%016x-%016x.wal", &walSeq, &walIndex) + if err != nil { + return fmt.Errorf("could not parse WAL name %s: %v", walName, err) + } + + if walIndex >= snapIndex { + deleteUntil = i - 1 + found = true + break + } + } + + // If all WAL files started with indices below the oldest snapshot's + // index, we can delete all but the newest WAL file. + if !found && len(wals) != 0 { + deleteUntil = len(wals) - 1 + } + + for i := 0; i < deleteUntil; i++ { + walPath := filepath.Join(n.walDir(), wals[i]) + l, err := fileutil.NewLock(walPath) + if err != nil { + continue + } + err = l.TryLock() + if err != nil { + return fmt.Errorf("could not lock old WAL file %s for removal: %v", wals[i], err) + } + err = os.Remove(walPath) + l.Unlock() + l.Destroy() + if err != nil { + return fmt.Errorf("error removing old WAL file %s: %v", wals[i], err) + } + } + + return nil } func (n *Node) doSnapshot(raftConfig *api.RaftConfig) { diff --git a/manager/state/raft/storage_test.go b/manager/state/raft/storage_test.go index b24c18123d..977036f97a 100644 --- a/manager/state/raft/storage_test.go +++ b/manager/state/raft/storage_test.go @@ -5,11 +5,14 @@ import ( "io/ioutil" "path/filepath" "testing" + "time" "github.com/docker/swarmkit/api" raftutils "github.com/docker/swarmkit/manager/state/raft/testutils" + "github.com/docker/swarmkit/manager/state/store" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/net/context" ) func TestRaftSnapshot(t *testing.T) { @@ -243,3 +246,180 @@ func TestRaftSnapshotRestart(t *testing.T) { require.NoError(t, err) raftutils.CheckValuesOnNodes(t, clockSource, nodes, nodeIDs, values) } + +func TestGCWAL(t *testing.T) { + if testing.Short() { + t.Skip("TestGCWAL skipped with -short because it's resource-intensive") + } + t.Parallel() + + // Additional log entries from cluster setup, leader election + extraLogEntries := 5 + // Number of large entries to propose + proposals := 47 + + // Bring up a 3 node cluster + nodes, clockSource := raftutils.NewRaftCluster(t, tc, &api.RaftConfig{SnapshotInterval: uint64(proposals + extraLogEntries), LogEntriesForSlowFollowers: 0}) + + for i := 0; i != proposals; i++ { + _, err := proposeHugeValue(t, nodes[1], DefaultProposalTime, fmt.Sprintf("id%d", i)) + assert.NoError(t, err, "failed to propose value") + } + + time.Sleep(250 * time.Millisecond) + + // Snapshot should have been triggered just before the WAL rotated, so + // both WAL files should be preserved + assert.NoError(t, raftutils.PollFunc(clockSource, func() error { + dirents, err := ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "snap")) + if err != nil { + return err + } + if len(dirents) != 1 { + return fmt.Errorf("expected 1 snapshot, found %d", len(dirents)) + } + + dirents, err = ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "wal")) + if err != nil { + return err + } + if len(dirents) != 2 { + return fmt.Errorf("expected 2 WAL files, found %d", len(dirents)) + } + return nil + })) + + raftutils.TeardownCluster(t, nodes) + + // Repeat this test, but trigger the snapshot after the WAL has rotated + proposals++ + nodes, clockSource = raftutils.NewRaftCluster(t, tc, &api.RaftConfig{SnapshotInterval: uint64(proposals + extraLogEntries), LogEntriesForSlowFollowers: 0}) + defer raftutils.TeardownCluster(t, nodes) + + for i := 0; i != proposals; i++ { + _, err := proposeHugeValue(t, nodes[1], DefaultProposalTime, fmt.Sprintf("id%d", i)) + assert.NoError(t, err, "failed to propose value") + } + + time.Sleep(250 * time.Millisecond) + + // This time only one WAL file should be saved. + assert.NoError(t, raftutils.PollFunc(clockSource, func() error { + dirents, err := ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "snap")) + if err != nil { + return err + } + if len(dirents) != 1 { + return fmt.Errorf("expected 1 snapshot, found %d", len(dirents)) + } + + dirents, err = ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "wal")) + if err != nil { + return err + } + if len(dirents) != 1 { + return fmt.Errorf("expected 1 WAL file, found %d", len(dirents)) + } + return nil + })) + + // Restart the whole cluster + for _, node := range nodes { + node.Server.Stop() + node.Shutdown() + } + + raftutils.AdvanceTicks(clockSource, 5) + + i := 0 + for k, node := range nodes { + nodes[k] = raftutils.RestartNode(t, clockSource, node, false) + i++ + } + raftutils.WaitForCluster(t, clockSource, nodes) + + // Is the data intact after restart? + for _, node := range nodes { + assert.NoError(t, raftutils.PollFunc(clockSource, func() error { + var err error + node.MemoryStore().View(func(tx store.ReadTx) { + var allNodes []*api.Node + allNodes, err = store.FindNodes(tx, store.All) + if err != nil { + return + } + if len(allNodes) != proposals { + err = fmt.Errorf("expected %d nodes, got %d", proposals, len(allNodes)) + return + } + }) + return err + })) + } + + // It should still be possible to propose values + _, err := raftutils.ProposeValue(t, raftutils.Leader(nodes), DefaultProposalTime, "newnode") + assert.NoError(t, err, "failed to propose value") + + for _, node := range nodes { + assert.NoError(t, raftutils.PollFunc(clockSource, func() error { + var err error + node.MemoryStore().View(func(tx store.ReadTx) { + var allNodes []*api.Node + allNodes, err = store.FindNodes(tx, store.All) + if err != nil { + return + } + if len(allNodes) != proposals+1 { + err = fmt.Errorf("expected %d nodes, got %d", proposals, len(allNodes)) + return + } + }) + return err + })) + } +} + +// proposeHugeValue proposes a 1.4MB value to a raft test cluster +func proposeHugeValue(t *testing.T, raftNode *raftutils.TestNode, time time.Duration, nodeID ...string) (*api.Node, error) { + nodeIDStr := "id1" + if len(nodeID) != 0 { + nodeIDStr = nodeID[0] + } + a := make([]byte, 1400000) + for i := 0; i != len(a); i++ { + a[i] = 'a' + } + node := &api.Node{ + ID: nodeIDStr, + Spec: api.NodeSpec{ + Annotations: api.Annotations{ + Name: nodeIDStr, + Labels: map[string]string{ + "largestring": string(a), + }, + }, + }, + } + + storeActions := []*api.StoreAction{ + { + Action: api.StoreActionKindCreate, + Target: &api.StoreAction_Node{ + Node: node, + }, + }, + } + + ctx, _ := context.WithTimeout(context.Background(), time) + + err := raftNode.ProposeValue(ctx, storeActions, func() { + err := raftNode.MemoryStore().ApplyStoreActions(storeActions) + assert.NoError(t, err, "error applying actions") + }) + if err != nil { + return nil, err + } + + return node, nil +}