Skip to content

Debug tool to modify Raft snapshot and hardstate #3364

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 3, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 72 additions & 1 deletion dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type flagOptions struct {
// Options related to the WAL.
wdir string
wtruncateUntil uint64
wsetSnapshot string
}

func init() {
Expand Down Expand Up @@ -91,6 +92,8 @@ func init() {
flag.StringVarP(&opt.wdir, "wal", "w", "", "Directory where Raft write-ahead logs are stored.")
flag.Uint64VarP(&opt.wtruncateUntil, "truncate", "t", 0,
"Remove data from Raft entries until but not including this index.")
flag.StringVarP(&opt.wsetSnapshot, "snap", "s", "",
"Set snapshot term,index,readts to this.")
}

func toInt(o *pb.Posting) int {
Expand Down Expand Up @@ -884,7 +887,75 @@ func parseWal(db *badger.DB) error {
for gid := range gids {
fmt.Printf("Iterating with Raft Id = %d Groupd Id = %d\n", rid, gid)
store := raftwal.Init(db, rid, gid)
printRaft(store)
switch {
case len(opt.wsetSnapshot) > 0:
snap, err := store.Snapshot()
x.Checkf(err, "Unable to get snapshot")
cs := snap.Metadata.ConfState
fmt.Printf("Confstate: %+v\n", cs)

var dsnap pb.Snapshot
if len(snap.Data) > 0 {
x.Check(dsnap.Unmarshal(snap.Data))
}
fmt.Printf("Previous snapshot: %+v\n", dsnap)

splits := strings.Split(opt.wsetSnapshot, ",")
x.AssertTruef(len(splits) == 3,
"Expected term,index,readts in string. Got: %s", splits)
term, err := strconv.Atoi(splits[0])
x.Check(err)
index, err := strconv.Atoi(splits[1])
x.Check(err)
readTs, err := strconv.Atoi(splits[2])

ent := raftpb.Entry{
Term: uint64(term),
Index: uint64(index),
Type: raftpb.EntryNormal,
}
fmt.Printf("Using term: %d , index: %d , readTs : %d\n", term, index, readTs)
if dsnap.Index >= ent.Index {
fmt.Printf("Older snapshot is >= index %d", ent.Index)
return nil
}

// We need to write the Raft entry first.
fmt.Printf("Setting entry: %+v\n", ent)
hs := raftpb.HardState{
Term: ent.Term,
Commit: ent.Index,
}
fmt.Printf("Setting hard state: %+v\n", hs)
err = db.Update(func(txn *badger.Txn) error {
data, err := ent.Marshal()
if err != nil {
return err
}
if err = txn.Set(store.EntryKey(ent.Index), data); err != nil {
return err
}

data, err = hs.Marshal()
if err != nil {
return err
}
return txn.Set(store.HardStateKey(), data)
})
x.Check(err)

dsnap.Index = ent.Index
dsnap.ReadTs = uint64(readTs)

fmt.Printf("Setting snapshot to: %+v\n", dsnap)
data, err := dsnap.Marshal()
x.Check(err)
err = store.CreateSnapshot(dsnap.Index, &cs, data)
fmt.Printf("Created snapshot with error: %v\n", err)

default:
printRaft(store)
}
}
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions raftwal/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (w *DiskStorage) snapshotKey() []byte {
return b
}

func (w *DiskStorage) hardStateKey() []byte {
func (w *DiskStorage) HardStateKey() []byte {
b := make([]byte, 14)
binary.BigEndian.PutUint64(b[0:8], w.id)
copy(b[8:10], []byte("hs"))
Expand Down Expand Up @@ -338,7 +338,7 @@ func (w *DiskStorage) setHardState(batch *badger.WriteBatch, st pb.HardState) er
if err != nil {
return x.Wrapf(err, "wal.Store: While marshal hardstate")
}
return batch.Set(w.hardStateKey(), data, 0)
return batch.Set(w.HardStateKey(), data, 0)
}

// reset resets the entries. Used for testing.
Expand Down Expand Up @@ -406,7 +406,7 @@ func (w *DiskStorage) HardState() (hd pb.HardState, rerr error) {
w.elog.Printf("HardState")
defer w.elog.Printf("Done")
err := w.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(w.hardStateKey())
item, err := txn.Get(w.HardStateKey())
if err != nil {
return err
}
Expand Down