Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BREAKING] Switch Raft WAL to use simple files #6572

Merged
merged 52 commits into from
Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
99f391b
Add comments to explain the ideas.
manishrjain Sep 22, 2020
18ef430
use file buffer to store wal metadata.
martinmr Sep 22, 2020
fc66d9d
Basic entry file.
martinmr Sep 23, 2020
973cb09
more changes.
martinmr Sep 24, 2020
cf933b7
more work
martinmr Sep 24, 2020
f7c8714
Meta file works. Entries doesn't
Sep 24, 2020
6b972ee
add a bunch of APIs
manishrjain Sep 24, 2020
6c21ede
allEntries function.
martinmr Sep 24, 2020
9b733b9
fix basic tests.
martinmr Sep 24, 2020
c8d7d98
revert to ristretto master.
martinmr Sep 24, 2020
cb0eaee
more changes.
martinmr Sep 24, 2020
4879cc4
Compiles
manishrjain Sep 24, 2020
9283489
test changes.
martinmr Sep 24, 2020
4307fcc
progress so far
manishrjain Sep 24, 2020
01e778d
All tests pass
manishrjain Sep 25, 2020
41cb58f
Make the codebase compile
manishrjain Sep 25, 2020
8d32b98
Merge branch 'master' into mrjn/raftwal
manishrjain Sep 25, 2020
34d4162
Tests pass again
manishrjain Sep 25, 2020
627b34a
Zero runs
manishrjain Sep 25, 2020
210a0a4
debug info
manishrjain Sep 25, 2020
ebf0668
changes
manishrjain Sep 25, 2020
e11cdf7
Fix TestBig test
Sep 25, 2020
533b2a3
remove some logs
manishrjain Sep 25, 2020
081c480
Running ok
manishrjain Sep 25, 2020
66d4dc7
This works
manishrjain Sep 25, 2020
64d9e47
use default raft
manishrjain Sep 25, 2020
db80528
Fix TestProposal
animesh2049 Sep 25, 2020
0b4a98e
feat(raftmigrate): Add tool to migrate from old raft format to new one
Sep 25, 2020
12d0bff
Only complain if we miss 5 ticks.
manishrjain Sep 25, 2020
9bf3d91
Copy over the byte slice to re.Data
manishrjain Sep 25, 2020
cd0faaf
Add some locks
manishrjain Sep 25, 2020
140379c
Store index separately
manishrjain Sep 25, 2020
15037f5
Merge branch 'ibrahim/mrjn/raftwal' into mrjn/raftwal
manishrjain Sep 25, 2020
2338df2
Divide up the code in two files
manishrjain Sep 25, 2020
bd1ada3
Print heartbeats in v2
manishrjain Sep 25, 2020
c676e40
Add raftID in migrate tool
Sep 25, 2020
14905d9
Dump raftID in debug tool
Sep 25, 2020
7055554
Fix glog error
Sep 25, 2020
6005bd2
Revert version upgrades for protobuf and gRPC.
danielmai Sep 25, 2020
0d91b71
Always return a lastindex >= snapshotindex
manishrjain Sep 25, 2020
feb7bef
Ensure that lastIndex is not less than SnapshotIndex
manishrjain Sep 25, 2020
bae8e64
fix(config): Set glog -v flag correctly from config files.
danielmai Sep 25, 2020
b0eaad5
Merge branch 'master' into mrjn/raftwal
manishrjain Sep 29, 2020
dcc7bbd
Fix up flags and such. Remove WAL flags. Create shared flags for Alph…
manishrjain Sep 29, 2020
c2b2321
Merge branch 'master' into mrjn/raftwal
martinmr Sep 29, 2020
0d4fb93
add comments and address review comments.
martinmr Sep 29, 2020
f05dc80
Flag parsing refactoring
manishrjain Sep 30, 2020
5f59055
Refactor Raft WAL code into multiple files.
manishrjain Sep 30, 2020
4c491c5
Make build work on Windows
manishrjain Sep 30, 2020
2f9cb9b
Fix a bug.
manishrjain Sep 30, 2020
ef1ea36
Final review
manishrjain Sep 30, 2020
eb80f35
Merge branch 'master' into mrjn/raftwal
manishrjain Sep 30, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion compose/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ func getAlpha(idx int) service {
internalPort := alphaBasePort + opts.PortOffset + getOffset(idx)
grpcPort := internalPort + 1000
svc := initService(basename, idx, grpcPort)
// Don't make Alphas depend on each other.
svc.DependsOn = nil

if opts.TmpFS {
svc.TmpFS = append(svc.TmpFS, fmt.Sprintf("/data/%s/w", svc.name))
Expand Down Expand Up @@ -251,7 +253,9 @@ func getAlpha(idx int) service {
svc.Command += fmt.Sprintf(" --lru_mb=%d", opts.LruSizeMB)
svc.Command += fmt.Sprintf(" --zero=%s", zerosOpt)
svc.Command += fmt.Sprintf(" --logtostderr -v=%d", opts.Verbosity)
svc.Command += fmt.Sprintf(" --idx=%d", idx)

// Don't assign idx, let it auto-assign.
// svc.Command += fmt.Sprintf(" --idx=%d", idx)
if opts.WhiteList {
svc.Command += " --whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16"
}
Expand Down
6 changes: 1 addition & 5 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,7 @@ func (n *Node) PastLife() (uint64, bool, error) {
restart = true
}

var num int
num, rerr = n.Store.NumEntries()
if rerr != nil {
return 0, false, rerr
}
num := n.Store.NumEntries()
glog.Infof("Group %d found %d entries\n", n.RaftContext.Group, num)
// We'll always have at least one entry.
if num > 1 {
Expand Down
6 changes: 1 addition & 5 deletions conn/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"testing"
"time"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -65,10 +64,7 @@ func TestProposal(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)

db, err := badger.OpenManaged(badger.DefaultOptions(dir))
require.NoError(t, err)
store := raftwal.Init(db, 0, 0)
defer store.Closer.SignalAndWait()
store := raftwal.Init(dir)

rc := &pb.RaftContext{Id: 1}
n := NewNode(rc, store)
Expand Down
106 changes: 30 additions & 76 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,27 +104,21 @@ they form a Raft group and provide synchronous replication.
// with the flag name so that the values are picked up by Cobra/Viper's various config inputs
// (e.g, config file, env vars, cli flags, etc.)
flag := Alpha.Cmd.Flags()
x.FillCommonFlags(flag)

flag.StringP("postings", "p", "p", "Directory to store posting lists.")

// Options around how to set up Badger.
flag.String("badger.tables", "mmap,mmap",
"[ram, mmap, disk] Specifies how Badger LSM tree is stored for the postings and "+
"write-ahead directory. Option sequence consume most to least RAM while providing "+
"best to worst read performance respectively. If you pass two values separated by a "+
"comma, the first value will be used for the postings directory and the second for "+
"the write-ahead log directory.")
flag.String("badger.vlog", "mmap,mmap",
"[mmap, disk] Specifies how Badger Value log is stored for the postings and write-ahead "+
"log directory. mmap consumes more RAM, but provides better performance. If you pass "+
"two values separated by a comma the first value will be used for the postings "+
"directory and the second for the w directory.")
flag.String("badger.compression_level", "3,0",
"Specifies the compression level for the postings and write-ahead log "+
"directory. A higher value uses more resources. The value of 0 disables "+
"compression. If you pass two values separated by a comma the first "+
"value will be used for the postings directory (p) and the second for "+
"the wal directory (w). If a single value is passed the value is used "+
"as compression level for both directories.")
flag.String("badger.tables", "mmap",
"[ram, mmap, disk] Specifies how Badger LSM tree is stored for the postings."+
"Option sequence consume most to least RAM while providing "+
"best to worst read performance respectively.")
flag.String("badger.vlog", "mmap",
"[mmap, disk] Specifies how Badger Value log is stored for the postings."+
"mmap consumes more RAM, but provides better performance.")
flag.String("badger.compression_level", "3",
"Specifies the compression level for the postings directory. A higher value"+
" uses more resources. The value of 0 disables compression.")
enc.RegisterFlags(flag)

// Snapshot and Transactions.
Expand All @@ -136,14 +130,6 @@ they form a Raft group and provide synchronous replication.
"Abort any pending transactions older than this duration. The liveness of a"+
" transaction is determined by its last mutation.")

// OpenCensus flags.
flag.Float64("trace", 0.01, "The ratio of queries to trace.")
flag.String("jaeger.collector", "", "Send opencensus traces to Jaeger.")
// See https://github.com/DataDog/opencensus-go-exporter-datadog/issues/34
// about the status of supporting annotation logs through the datadog exporter
flag.String("datadog.collector", "", "Send opencensus traces to Datadog. As of now, the trace"+
" exporter does not support annotation logs and would discard them.")

flag.StringP("wal", "w", "w", "Directory to store raft write-ahead logs.")
flag.String("whitelist", "",
"A comma separated list of IP addresses, IP ranges, CIDR blocks, or hostnames you "+
Expand All @@ -152,8 +138,6 @@ they form a Raft group and provide synchronous replication.
flag.String("export", "export", "Folder in which to store exports.")
flag.Int("pending_proposals", 256,
"Number of pending mutation proposals. Useful for rate limiting.")
flag.String("my", "",
"IP_ADDRESS:PORT of this Dgraph Alpha, so other Dgraph Alphas can talk to this.")
flag.StringP("zero", "z", fmt.Sprintf("localhost:%d", x.PortZeroGrpc),
"Comma separated list of Dgraph zero addresses of the form IP_ADDRESS:PORT.")
flag.Uint64("idx", 0,
Expand All @@ -165,7 +149,6 @@ they form a Raft group and provide synchronous replication.
"If set, all Alter requests to Dgraph would need to have this token."+
" The token can be passed as follows: For HTTP requests, in X-Dgraph-AuthToken header."+
" For Grpc, in auth-token key in the context.")
flag.Bool("enable_sentry", true, "Turn on/off sending events to Sentry. (default on)")

flag.String("acl_secret_file", "", "The file that stores the HMAC secret, "+
"which is used for signing the JWT and should have at least 32 ASCII characters. "+
Expand All @@ -175,13 +158,13 @@ they form a Raft group and provide synchronous replication.
flag.Duration("acl_refresh_ttl", 30*24*time.Hour, "The TTL for the refresh jwt. "+
"Enterprise feature.")
flag.Duration("acl_cache_ttl", 30*time.Second, "DEPRECATED: The interval to refresh the acl "+
"cache. Enterprise feature.")
flag.Float64P("lru_mb", "l", -1,
"cache. Enterprise feature.") // TODO: Remove this flag.

flag.Float64P("lru_mb", "l", -1, // TODO: Remove this flag.
"Estimated memory the LRU cache can take. "+
"Actual usage by the process would be more than specified here.")
flag.String("mutations", "allow",
"Set mutation mode to allow, disallow, or strict.")
flag.Bool("telemetry", true, "Send anonymous telemetry data to Dgraph devs.")

// Useful for running multiple servers on the same machine.
flag.IntP("port_offset", "o", 0,
Expand Down Expand Up @@ -210,16 +193,18 @@ they form a Raft group and provide synchronous replication.

flag.Bool("graphql_introspection", true, "Set to false for no GraphQL schema introspection")
flag.Bool("graphql_debug", false, "Enable debug mode in GraphQL. This returns auth errors to clients. We do not recommend turning it on for production.")
flag.Bool("ludicrous_mode", false, "Run alpha in ludicrous mode")

// Ludicrous mode
flag.Bool("ludicrous_mode", false, "Run Dgraph in ludicrous mode.")
flag.Int("ludicrous_concurrency", 2000, "Number of concurrent threads in ludicrous mode")

flag.Bool("graphql_extensions", true, "Set to false if extensions not required in GraphQL response body")
flag.Duration("graphql_poll_interval", time.Second, "polling interval for graphql subscription.")

// Cache flags
flag.Int64("cache_mb", 0, "Total size of cache (in MB) to be used in alpha.")
flag.String("cache_percentage", "0,65,25,0,10",
flag.String("cache_percentage", "0,65,35,0",
`Cache percentages summing up to 100 for various caches (FORMAT:
PostingListCache,PstoreBlockCache,PstoreIndexCache,WstoreBlockCache,WstoreIndexCache).`)
PostingListCache,PstoreBlockCache,PstoreIndexCache,WAL).`)
}

func setupCustomTokenizers() {
Expand Down Expand Up @@ -616,60 +601,29 @@ func run() {
x.AssertTruef(totalCache >= 0, "ERROR: Cache size must be non-negative")

cachePercentage := Alpha.Conf.GetString("cache_percentage")
cachePercent, err := x.GetCachePercentages(cachePercentage, 5)
cachePercent, err := x.GetCachePercentages(cachePercentage, 4)
x.Check(err)
postingListCacheSize := (cachePercent[0] * (totalCache << 20)) / 100
pstoreBlockCacheSize := (cachePercent[1] * (totalCache << 20)) / 100
pstoreIndexCacheSize := (cachePercent[2] * (totalCache << 20)) / 100
wstoreBlockCacheSize := (cachePercent[3] * (totalCache << 20)) / 100
wstoreIndexCacheSize := (cachePercent[4] * (totalCache << 20)) / 100

compressionLevelString := Alpha.Conf.GetString("badger.compression_level")
compressionLevels, err := x.GetCompressionLevels(compressionLevelString)
x.Check(err)
postingDirCompressionLevel := compressionLevels[0]
walDirCompressionLevel := compressionLevels[1]
walCache := (cachePercent[3] * (totalCache << 20)) / 100

level := x.ParseCompressionLevel(Alpha.Conf.GetString("badger.compression_level"))
opts := worker.Options{
PostingDir: Alpha.Conf.GetString("postings"),
WALDir: Alpha.Conf.GetString("wal"),
PostingDirCompressionLevel: postingDirCompressionLevel,
WALDirCompressionLevel: walDirCompressionLevel,
PostingDirCompressionLevel: level,
PBlockCacheSize: pstoreBlockCacheSize,
PIndexCacheSize: pstoreIndexCacheSize,
WBlockCacheSize: wstoreBlockCacheSize,
WIndexCacheSize: wstoreIndexCacheSize,
WalCache: walCache,

MutationsMode: worker.AllowMutations,
AuthToken: Alpha.Conf.GetString("auth_token"),
AllottedMemory: Alpha.Conf.GetFloat64("lru_mb"),
}

badgerTables := strings.Split(Alpha.Conf.GetString("badger.tables"), ",")
if len(badgerTables) != 1 && len(badgerTables) != 2 {
glog.Fatalf("Unable to read badger.tables options. Expected single value or two "+
"comma-separated values. Got %s", Alpha.Conf.GetString("badger.tables"))
}
if len(badgerTables) == 1 {
opts.BadgerTables = badgerTables[0]
opts.BadgerWalTables = badgerTables[0]
} else {
opts.BadgerTables = badgerTables[0]
opts.BadgerWalTables = badgerTables[1]
}

badgerVlog := strings.Split(Alpha.Conf.GetString("badger.vlog"), ",")
if len(badgerVlog) != 1 && len(badgerVlog) != 2 {
glog.Fatalf("Unable to read badger.vlog options. Expected single value or two "+
"comma-separated values. Got %s", Alpha.Conf.GetString("badger.vlog"))
}
if len(badgerVlog) == 1 {
opts.BadgerVlog = badgerVlog[0]
opts.BadgerWalVlog = badgerVlog[0]
} else {
opts.BadgerVlog = badgerVlog[0]
opts.BadgerWalVlog = badgerVlog[1]
}
opts.BadgerTables = Alpha.Conf.GetString("badger.tables")
opts.BadgerVlog = Alpha.Conf.GetString("badger.vlog")

secretFile := Alpha.Conf.GetString("acl_secret_file")
if secretFile != "" {
Expand Down Expand Up @@ -711,8 +665,6 @@ func run() {
x.WorkerConfig = x.WorkerOptions{
ExportPath: Alpha.Conf.GetString("export"),
NumPendingProposals: Alpha.Conf.GetInt("pending_proposals"),
Tracing: Alpha.Conf.GetFloat64("trace"),
MyAddr: Alpha.Conf.GetString("my"),
ZeroAddr: strings.Split(Alpha.Conf.GetString("zero"), ","),
RaftId: cast.ToUint64(Alpha.Conf.GetString("idx")),
WhiteListedIPRanges: ips,
Expand All @@ -725,6 +677,8 @@ func run() {
LudicrousMode: Alpha.Conf.GetBool("ludicrous_mode"),
LudicrousConcurrency: Alpha.Conf.GetInt("ludicrous_concurrency"),
}
x.WorkerConfig.Parse(Alpha.Conf)

if x.WorkerConfig.EncryptionKey, err = enc.ReadKey(Alpha.Conf); err != nil {
glog.Infof("unable to read key %v", err)
return
Expand Down
30 changes: 25 additions & 5 deletions dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/x"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -62,6 +63,7 @@ type flagOptions struct {
wdir string
wtruncateUntil uint64
wsetSnapshot string
oldWalFormat bool
}

func init() {
Expand All @@ -88,6 +90,8 @@ func init() {
flag.BoolVar(&opt.sizeHistogram, "histogram", false,
"Show a histogram of the key and value sizes.")
flag.StringVarP(&opt.wdir, "wal", "w", "", "Directory where Raft write-ahead logs are stored.")
flag.BoolVar(&opt.oldWalFormat, "old-wal", false,
"Denotes that the directory pointed by --wal is a wal directory in old format.")
flag.Uint64VarP(&opt.wtruncateUntil, "truncate", "t", 0,
"Remove data from Raft entries until but not including this index.")
flag.StringVarP(&opt.wsetSnapshot, "snap", "s", "",
Expand Down Expand Up @@ -777,12 +781,28 @@ func run() {
x.AssertTruef(len(bopts.Dir) > 0, "No posting or wal dir specified.")
fmt.Printf("Opening DB: %s\n", bopts.Dir)

var db *badger.DB
if isWal {
db, err = badger.Open(bopts)
} else {
db, err = badger.OpenManaged(bopts)
// If this is a new format WAL, print and return.
if isWal && !opt.oldWalFormat {
store := raftwal.Init(dir)
fmt.Printf("RaftID: %+v\n", store.Uint(raftwal.RaftId))

// TODO: Fix the pending logic.
pending := make(map[uint64]bool)

start, last := printBasic(store)
for start < last-1 {
entries, err := store.Entries(start, last+1, 64<<20)
x.Check(err)
for _, e := range entries {
printEntry(e, pending)
start = x.Max(start, e.Index)
}
}
fmt.Println("Done")
return
}

db, err := badger.OpenManaged(bopts)
x.Check(err)
// Not using posting list cache
posting.Init(db, 0)
Expand Down
26 changes: 19 additions & 7 deletions dgraph/cmd/debug/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import (
"strings"

"github.com/dgraph-io/badger/v2"
raftmigrate "github.com/dgraph-io/dgraph/dgraph/cmd/raft-migrate"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/x"
humanize "github.com/dustin/go-humanize"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
)

Expand All @@ -53,7 +54,13 @@ func printEntry(es raftpb.Entry, pending map[uint64]bool) {
fmt.Printf("%s\n", buf.Bytes())
}

func printRaft(db *badger.DB, store *raftwal.DiskStorage) {
type RaftStore interface {
raft.Storage
Checkpoint() (uint64, error)
HardState() (raftpb.HardState, error)
}

func printBasic(store RaftStore) (uint64, uint64) {
fmt.Println()
snap, err := store.Snapshot()
if err != nil {
Expand Down Expand Up @@ -100,13 +107,19 @@ func printRaft(db *badger.DB, store *raftwal.DiskStorage) {
lastIdx, err := store.LastIndex()
if err != nil {
fmt.Printf("Got error while retrieving last index: %v\n", err)
return
}
startIdx := snap.Metadata.Index + 1
fmt.Printf("Last Index: %d . Num Entries: %d .\n\n", lastIdx, lastIdx-startIdx)
return startIdx, lastIdx
}

func printRaft(db *badger.DB, store *raftmigrate.OldDiskStorage) {
startIdx, lastIdx := printBasic(store)

commitTs, err := db.MaxVersion()
x.Check(err)
// In case we need to truncate raft entries.
batch := db.NewWriteBatch()
batch := db.NewWriteBatchAt(commitTs)
defer batch.Cancel()
var numTruncates int

Expand Down Expand Up @@ -149,7 +162,7 @@ func printRaft(db *badger.DB, store *raftwal.DiskStorage) {
}
}

func overwriteSnapshot(db *badger.DB, store *raftwal.DiskStorage) error {
func overwriteSnapshot(db *badger.DB, store *raftmigrate.OldDiskStorage) error {
snap, err := store.Snapshot()
x.Checkf(err, "Unable to get snapshot")
cs := snap.Metadata.ConfState
Expand Down Expand Up @@ -197,7 +210,6 @@ func overwriteSnapshot(db *badger.DB, store *raftwal.DiskStorage) error {
if err = txn.Set(store.EntryKey(ent.Index), data); err != nil {
return err
}

data, err = hs.Marshal()
if err != nil {
return err
Expand Down Expand Up @@ -264,7 +276,7 @@ func handleWal(db *badger.DB) error {
for rid := range rids {
for gid := range gids {
fmt.Printf("Iterating with Raft Id = %d Groupd Id = %d\n", rid, gid)
store := raftwal.Init(db, rid, gid)
store := raftmigrate.Init(db, rid, gid)
switch {
case len(opt.wsetSnapshot) > 0:
err := overwriteSnapshot(db, store)
Expand Down
Loading