Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debug tool to modify Raft snapshot and hardstate #3364

Merged
merged 5 commits into from
May 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 6 additions & 171 deletions dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package debug

import (
"bytes"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
Expand All @@ -33,12 +32,9 @@ import (
"github.com/dgraph-io/dgraph/codec"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/x"
humanize "github.com/dustin/go-humanize"
"github.com/spf13/cobra"
"go.etcd.io/etcd/raft/raftpb"
)

var (
Expand All @@ -62,6 +58,7 @@ type flagOptions struct {
// Options related to the WAL.
wdir string
wtruncateUntil uint64
wsetSnapshot string
}

func init() {
Expand Down Expand Up @@ -91,6 +88,9 @@ func init() {
flag.StringVarP(&opt.wdir, "wal", "w", "", "Directory where Raft write-ahead logs are stored.")
flag.Uint64VarP(&opt.wtruncateUntil, "truncate", "t", 0,
"Remove data from Raft entries until but not including this index.")
flag.StringVarP(&opt.wsetSnapshot, "snap", "s", "",
"Set snapshot term,index,readts to this. Value must be comma-separated list containing"+
" the value for these vars in that order.")
}

func toInt(o *pb.Posting) int {
Expand Down Expand Up @@ -725,171 +725,6 @@ func printZeroProposal(buf *bytes.Buffer, zpr pb.ZeroProposal) {
}
}

func parseWal(db *badger.DB) error {
rids := make(map[uint64]bool)
gids := make(map[uint32]bool)

parseIds := func(item *badger.Item) {
key := item.Key()
switch {
case len(key) == 14:
// hard state and snapshot key.
rid := binary.BigEndian.Uint64(key[0:8])
rids[rid] = true

gid := binary.BigEndian.Uint32(key[10:14])
gids[gid] = true
case len(key) == 20:
// entry key.
rid := binary.BigEndian.Uint64(key[0:8])
rids[rid] = true

gid := binary.BigEndian.Uint32(key[8:12])
gids[gid] = true
default:
// Ignore other keys.
}
}

err := db.View(func(txn *badger.Txn) error {
opt := badger.DefaultIteratorOptions
opt.PrefetchValues = false
itr := txn.NewIterator(opt)
defer itr.Close()

for itr.Rewind(); itr.Valid(); itr.Next() {
parseIds(itr.Item())
}
return nil
})
if err != nil {
return err
}
fmt.Printf("rids: %v\n", rids)
fmt.Printf("gids: %v\n", gids)

pending := make(map[uint64]bool)
printEntry := func(es raftpb.Entry) {
var buf bytes.Buffer
fmt.Fprintf(&buf, "%d . %d . %v . %-6s .", es.Term, es.Index, es.Type,
humanize.Bytes(uint64(es.Size())))
if es.Type == raftpb.EntryConfChange {
fmt.Printf("%s\n", buf.Bytes())
return
}
var pr pb.Proposal
var zpr pb.ZeroProposal
if err := pr.Unmarshal(es.Data); err == nil {
printAlphaProposal(&buf, pr, pending)
} else if err := zpr.Unmarshal(es.Data); err == nil {
printZeroProposal(&buf, zpr)
} else {
fmt.Printf("%s Unable to parse Proposal: %v\n", buf.Bytes(), err)
return
}
fmt.Printf("%s\n", buf.Bytes())
}

printRaft := func(store *raftwal.DiskStorage) {
fmt.Println()
snap, err := store.Snapshot()
if err != nil {
fmt.Printf("Got error while retrieving snapshot: %v\n", err)
} else {
fmt.Printf("Snapshot Metadata: %+v\n", snap.Metadata)
var ds pb.Snapshot
var ms pb.MembershipState
if err := ds.Unmarshal(snap.Data); err == nil {
fmt.Printf("Snapshot Alpha: %+v\n", ds)
} else if err := ms.Unmarshal(snap.Data); err == nil {
for gid, group := range ms.GetGroups() {
fmt.Printf("\nGROUP: %d\n", gid)
for _, member := range group.GetMembers() {
fmt.Printf("Member: %+v .\n", member)
}
for _, tablet := range group.GetTablets() {
fmt.Printf("Tablet: %+v .\n", tablet)
}
group.Members = nil
group.Tablets = nil
fmt.Printf("Group: %d %+v .\n", gid, group)
}
ms.Groups = nil
fmt.Printf("\nSnapshot Zero: %+v\n", ms)
} else {
fmt.Printf("Unable to unmarshal Dgraph snapshot: %v", err)
}
}
fmt.Println()

if hs, err := store.HardState(); err != nil {
fmt.Printf("Got error while retrieving hardstate: %v\n", err)
} else {
fmt.Printf("Hardstate: %+v\n", hs)
}

lastIdx, err := store.LastIndex()
if err != nil {
fmt.Printf("Got error while retrieving last index: %v\n", err)
return
}
startIdx := snap.Metadata.Index + 1
fmt.Printf("Last Index: %d . Num Entries: %d .\n\n", lastIdx, lastIdx-startIdx)

// In case we need to truncate raft entries.
batch := db.NewWriteBatch()
defer batch.Cancel()
var numTruncates int

pending = make(map[uint64]bool)
for startIdx < lastIdx-1 {
entries, err := store.Entries(startIdx, lastIdx, 64<<20 /* 64 MB Max Size */)
if err != nil {
fmt.Printf("Got error while retrieving entries: %v\n", err)
return
}
for _, ent := range entries {
switch {
case ent.Type == raftpb.EntryNormal && ent.Index < opt.wtruncateUntil:
if len(ent.Data) == 0 {
continue
}
ent.Data = nil
numTruncates++
k := store.EntryKey(ent.Index)
data, err := ent.Marshal()
if err != nil {
log.Fatalf("Unable to marshal entry: %+v. Error: %v", ent, err)
}
if err := batch.Set(k, data, 0); err != nil {
log.Fatalf("Unable to set data: %+v", err)
}
default:
printEntry(ent)
}
startIdx = x.Max(startIdx, ent.Index)
}
}
if err := batch.Flush(); err != nil {
fmt.Printf("Got error while flushing batch: %v\n", err)
}
if numTruncates > 0 {
fmt.Printf("==> Log entries truncated: %d\n\n", numTruncates)
err := db.Flatten(1)
fmt.Printf("Flatten done with error: %v\n", err)
}
}

for rid := range rids {
for gid := range gids {
fmt.Printf("Iterating with Raft Id = %d Groupd Id = %d\n", rid, gid)
store := raftwal.Init(db, rid, gid)
printRaft(store)
}
}
return nil
}

func run() {
dir := opt.pdir
isWal := false
Expand Down Expand Up @@ -917,8 +752,8 @@ func run() {
defer db.Close()

if isWal {
if err := parseWal(db); err != nil {
fmt.Printf("\nGot error while parsing WAL: %v\n", err)
if err := handleWal(db); err != nil {
fmt.Printf("\nGot error while handling WAL: %v\n", err)
}
fmt.Println("Done")
// WAL can't execute the getMinMax function, so we need to deal with it
Expand Down
Loading