Skip to content

Commit

Permalink
Switch Raft WAL to use simple files (#6572)
Browse files Browse the repository at this point in the history
This PR switches the way we store Raft write-ahead log. Before we were using Badger, but it caused a bunch of latency issues due to a high number of key deletions. With this change, we are using mmapped files, which can be deleted directly when not needed. This speeds up the Raft execution quite significantly. We no longer see any quorum issues and so on. The exact description of how these files work is in raftwal/storage.go.

This PR also removes a whole bunch of flags around how WAL living on Badger was initialized.

This PR introduces a concept of fault tolerance level. It adds a new flag --survive, which can choose between "process" or "filesystem". Most users should be OK with just process crashes and don't need to opt for filesystem crashes. In that case, they can gain huge speedups, because mmaped files can easily survive process crashes without calling expensive msync.

This PR also refactors common flags between Alpha and Zero and how they are parsed.

Fixes DGRAPH-2487

Co-authored-by: Martin Martinez Rivera <[email protected]>
Co-authored-by: Ibrahim Jarif <[email protected]>
Co-authored-by: Animesh <[email protected]>
Co-authored-by: Daniel Mai <[email protected]>
  • Loading branch information
5 people authored Sep 30, 2020
1 parent b777913 commit fa62774
Show file tree
Hide file tree
Showing 27 changed files with 2,522 additions and 1,071 deletions.
6 changes: 5 additions & 1 deletion compose/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ func getAlpha(idx int) service {
internalPort := alphaBasePort + opts.PortOffset + getOffset(idx)
grpcPort := internalPort + 1000
svc := initService(basename, idx, grpcPort)
// Don't make Alphas depend on each other.
svc.DependsOn = nil

if opts.TmpFS {
svc.TmpFS = append(svc.TmpFS, fmt.Sprintf("/data/%s/w", svc.name))
Expand Down Expand Up @@ -251,7 +253,9 @@ func getAlpha(idx int) service {
svc.Command += fmt.Sprintf(" --lru_mb=%d", opts.LruSizeMB)
svc.Command += fmt.Sprintf(" --zero=%s", zerosOpt)
svc.Command += fmt.Sprintf(" --logtostderr -v=%d", opts.Verbosity)
svc.Command += fmt.Sprintf(" --idx=%d", idx)

// Don't assign idx, let it auto-assign.
// svc.Command += fmt.Sprintf(" --idx=%d", idx)
if opts.WhiteList {
svc.Command += " --whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16"
}
Expand Down
6 changes: 1 addition & 5 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,7 @@ func (n *Node) PastLife() (uint64, bool, error) {
restart = true
}

var num int
num, rerr = n.Store.NumEntries()
if rerr != nil {
return 0, false, rerr
}
num := n.Store.NumEntries()
glog.Infof("Group %d found %d entries\n", n.RaftContext.Group, num)
// We'll always have at least one entry.
if num > 1 {
Expand Down
6 changes: 1 addition & 5 deletions conn/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"testing"
"time"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -65,10 +64,7 @@ func TestProposal(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)

db, err := badger.OpenManaged(badger.DefaultOptions(dir))
require.NoError(t, err)
store := raftwal.Init(db, 0, 0)
defer store.Closer.SignalAndWait()
store := raftwal.Init(dir)

rc := &pb.RaftContext{Id: 1}
n := NewNode(rc, store)
Expand Down
106 changes: 30 additions & 76 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,27 +104,21 @@ they form a Raft group and provide synchronous replication.
// with the flag name so that the values are picked up by Cobra/Viper's various config inputs
// (e.g, config file, env vars, cli flags, etc.)
flag := Alpha.Cmd.Flags()
x.FillCommonFlags(flag)

flag.StringP("postings", "p", "p", "Directory to store posting lists.")

// Options around how to set up Badger.
flag.String("badger.tables", "mmap,mmap",
"[ram, mmap, disk] Specifies how Badger LSM tree is stored for the postings and "+
"write-ahead directory. Option sequence consume most to least RAM while providing "+
"best to worst read performance respectively. If you pass two values separated by a "+
"comma, the first value will be used for the postings directory and the second for "+
"the write-ahead log directory.")
flag.String("badger.vlog", "mmap,mmap",
"[mmap, disk] Specifies how Badger Value log is stored for the postings and write-ahead "+
"log directory. mmap consumes more RAM, but provides better performance. If you pass "+
"two values separated by a comma the first value will be used for the postings "+
"directory and the second for the w directory.")
flag.String("badger.compression_level", "3,0",
"Specifies the compression level for the postings and write-ahead log "+
"directory. A higher value uses more resources. The value of 0 disables "+
"compression. If you pass two values separated by a comma the first "+
"value will be used for the postings directory (p) and the second for "+
"the wal directory (w). If a single value is passed the value is used "+
"as compression level for both directories.")
flag.String("badger.tables", "mmap",
"[ram, mmap, disk] Specifies how Badger LSM tree is stored for the postings."+
"Option sequence consume most to least RAM while providing "+
"best to worst read performance respectively.")
flag.String("badger.vlog", "mmap",
"[mmap, disk] Specifies how Badger Value log is stored for the postings."+
"mmap consumes more RAM, but provides better performance.")
flag.String("badger.compression_level", "3",
"Specifies the compression level for the postings directory. A higher value"+
" uses more resources. The value of 0 disables compression.")
enc.RegisterFlags(flag)

// Snapshot and Transactions.
Expand All @@ -136,14 +130,6 @@ they form a Raft group and provide synchronous replication.
"Abort any pending transactions older than this duration. The liveness of a"+
" transaction is determined by its last mutation.")

// OpenCensus flags.
flag.Float64("trace", 0.01, "The ratio of queries to trace.")
flag.String("jaeger.collector", "", "Send opencensus traces to Jaeger.")
// See https://github.com/DataDog/opencensus-go-exporter-datadog/issues/34
// about the status of supporting annotation logs through the datadog exporter
flag.String("datadog.collector", "", "Send opencensus traces to Datadog. As of now, the trace"+
" exporter does not support annotation logs and would discard them.")

flag.StringP("wal", "w", "w", "Directory to store raft write-ahead logs.")
flag.String("whitelist", "",
"A comma separated list of IP addresses, IP ranges, CIDR blocks, or hostnames you "+
Expand All @@ -152,8 +138,6 @@ they form a Raft group and provide synchronous replication.
flag.String("export", "export", "Folder in which to store exports.")
flag.Int("pending_proposals", 256,
"Number of pending mutation proposals. Useful for rate limiting.")
flag.String("my", "",
"IP_ADDRESS:PORT of this Dgraph Alpha, so other Dgraph Alphas can talk to this.")
flag.StringP("zero", "z", fmt.Sprintf("localhost:%d", x.PortZeroGrpc),
"Comma separated list of Dgraph zero addresses of the form IP_ADDRESS:PORT.")
flag.Uint64("idx", 0,
Expand All @@ -165,7 +149,6 @@ they form a Raft group and provide synchronous replication.
"If set, all Alter requests to Dgraph would need to have this token."+
" The token can be passed as follows: For HTTP requests, in X-Dgraph-AuthToken header."+
" For Grpc, in auth-token key in the context.")
flag.Bool("enable_sentry", true, "Turn on/off sending events to Sentry. (default on)")

flag.String("acl_secret_file", "", "The file that stores the HMAC secret, "+
"which is used for signing the JWT and should have at least 32 ASCII characters. "+
Expand All @@ -175,13 +158,13 @@ they form a Raft group and provide synchronous replication.
flag.Duration("acl_refresh_ttl", 30*24*time.Hour, "The TTL for the refresh jwt. "+
"Enterprise feature.")
flag.Duration("acl_cache_ttl", 30*time.Second, "DEPRECATED: The interval to refresh the acl "+
"cache. Enterprise feature.")
flag.Float64P("lru_mb", "l", -1,
"cache. Enterprise feature.") // TODO: Remove this flag.

flag.Float64P("lru_mb", "l", -1, // TODO: Remove this flag.
"Estimated memory the LRU cache can take. "+
"Actual usage by the process would be more than specified here.")
flag.String("mutations", "allow",
"Set mutation mode to allow, disallow, or strict.")
flag.Bool("telemetry", true, "Send anonymous telemetry data to Dgraph devs.")

// Useful for running multiple servers on the same machine.
flag.IntP("port_offset", "o", 0,
Expand Down Expand Up @@ -210,16 +193,18 @@ they form a Raft group and provide synchronous replication.

flag.Bool("graphql_introspection", true, "Set to false for no GraphQL schema introspection")
flag.Bool("graphql_debug", false, "Enable debug mode in GraphQL. This returns auth errors to clients. We do not recommend turning it on for production.")
flag.Bool("ludicrous_mode", false, "Run alpha in ludicrous mode")

// Ludicrous mode
flag.Bool("ludicrous_mode", false, "Run Dgraph in ludicrous mode.")
flag.Int("ludicrous_concurrency", 2000, "Number of concurrent threads in ludicrous mode")

flag.Bool("graphql_extensions", true, "Set to false if extensions not required in GraphQL response body")
flag.Duration("graphql_poll_interval", time.Second, "polling interval for graphql subscription.")

// Cache flags
flag.Int64("cache_mb", 0, "Total size of cache (in MB) to be used in alpha.")
flag.String("cache_percentage", "0,65,25,0,10",
flag.String("cache_percentage", "0,65,35,0",
`Cache percentages summing up to 100 for various caches (FORMAT:
PostingListCache,PstoreBlockCache,PstoreIndexCache,WstoreBlockCache,WstoreIndexCache).`)
PostingListCache,PstoreBlockCache,PstoreIndexCache,WAL).`)
}

func setupCustomTokenizers() {
Expand Down Expand Up @@ -616,60 +601,29 @@ func run() {
x.AssertTruef(totalCache >= 0, "ERROR: Cache size must be non-negative")

cachePercentage := Alpha.Conf.GetString("cache_percentage")
cachePercent, err := x.GetCachePercentages(cachePercentage, 5)
cachePercent, err := x.GetCachePercentages(cachePercentage, 4)
x.Check(err)
postingListCacheSize := (cachePercent[0] * (totalCache << 20)) / 100
pstoreBlockCacheSize := (cachePercent[1] * (totalCache << 20)) / 100
pstoreIndexCacheSize := (cachePercent[2] * (totalCache << 20)) / 100
wstoreBlockCacheSize := (cachePercent[3] * (totalCache << 20)) / 100
wstoreIndexCacheSize := (cachePercent[4] * (totalCache << 20)) / 100

compressionLevelString := Alpha.Conf.GetString("badger.compression_level")
compressionLevels, err := x.GetCompressionLevels(compressionLevelString)
x.Check(err)
postingDirCompressionLevel := compressionLevels[0]
walDirCompressionLevel := compressionLevels[1]
walCache := (cachePercent[3] * (totalCache << 20)) / 100

level := x.ParseCompressionLevel(Alpha.Conf.GetString("badger.compression_level"))
opts := worker.Options{
PostingDir: Alpha.Conf.GetString("postings"),
WALDir: Alpha.Conf.GetString("wal"),
PostingDirCompressionLevel: postingDirCompressionLevel,
WALDirCompressionLevel: walDirCompressionLevel,
PostingDirCompressionLevel: level,
PBlockCacheSize: pstoreBlockCacheSize,
PIndexCacheSize: pstoreIndexCacheSize,
WBlockCacheSize: wstoreBlockCacheSize,
WIndexCacheSize: wstoreIndexCacheSize,
WalCache: walCache,

MutationsMode: worker.AllowMutations,
AuthToken: Alpha.Conf.GetString("auth_token"),
AllottedMemory: Alpha.Conf.GetFloat64("lru_mb"),
}

badgerTables := strings.Split(Alpha.Conf.GetString("badger.tables"), ",")
if len(badgerTables) != 1 && len(badgerTables) != 2 {
glog.Fatalf("Unable to read badger.tables options. Expected single value or two "+
"comma-separated values. Got %s", Alpha.Conf.GetString("badger.tables"))
}
if len(badgerTables) == 1 {
opts.BadgerTables = badgerTables[0]
opts.BadgerWalTables = badgerTables[0]
} else {
opts.BadgerTables = badgerTables[0]
opts.BadgerWalTables = badgerTables[1]
}

badgerVlog := strings.Split(Alpha.Conf.GetString("badger.vlog"), ",")
if len(badgerVlog) != 1 && len(badgerVlog) != 2 {
glog.Fatalf("Unable to read badger.vlog options. Expected single value or two "+
"comma-separated values. Got %s", Alpha.Conf.GetString("badger.vlog"))
}
if len(badgerVlog) == 1 {
opts.BadgerVlog = badgerVlog[0]
opts.BadgerWalVlog = badgerVlog[0]
} else {
opts.BadgerVlog = badgerVlog[0]
opts.BadgerWalVlog = badgerVlog[1]
}
opts.BadgerTables = Alpha.Conf.GetString("badger.tables")
opts.BadgerVlog = Alpha.Conf.GetString("badger.vlog")

secretFile := Alpha.Conf.GetString("acl_secret_file")
if secretFile != "" {
Expand Down Expand Up @@ -711,8 +665,6 @@ func run() {
x.WorkerConfig = x.WorkerOptions{
ExportPath: Alpha.Conf.GetString("export"),
NumPendingProposals: Alpha.Conf.GetInt("pending_proposals"),
Tracing: Alpha.Conf.GetFloat64("trace"),
MyAddr: Alpha.Conf.GetString("my"),
ZeroAddr: strings.Split(Alpha.Conf.GetString("zero"), ","),
RaftId: cast.ToUint64(Alpha.Conf.GetString("idx")),
WhiteListedIPRanges: ips,
Expand All @@ -725,6 +677,8 @@ func run() {
LudicrousMode: Alpha.Conf.GetBool("ludicrous_mode"),
LudicrousConcurrency: Alpha.Conf.GetInt("ludicrous_concurrency"),
}
x.WorkerConfig.Parse(Alpha.Conf)

if x.WorkerConfig.EncryptionKey, err = enc.ReadKey(Alpha.Conf); err != nil {
glog.Infof("unable to read key %v", err)
return
Expand Down
30 changes: 25 additions & 5 deletions dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/x"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -62,6 +63,7 @@ type flagOptions struct {
wdir string
wtruncateUntil uint64
wsetSnapshot string
oldWalFormat bool
}

func init() {
Expand All @@ -88,6 +90,8 @@ func init() {
flag.BoolVar(&opt.sizeHistogram, "histogram", false,
"Show a histogram of the key and value sizes.")
flag.StringVarP(&opt.wdir, "wal", "w", "", "Directory where Raft write-ahead logs are stored.")
flag.BoolVar(&opt.oldWalFormat, "old-wal", false,
"Denotes that the directory pointed by --wal is a wal directory in old format.")
flag.Uint64VarP(&opt.wtruncateUntil, "truncate", "t", 0,
"Remove data from Raft entries until but not including this index.")
flag.StringVarP(&opt.wsetSnapshot, "snap", "s", "",
Expand Down Expand Up @@ -777,12 +781,28 @@ func run() {
x.AssertTruef(len(bopts.Dir) > 0, "No posting or wal dir specified.")
fmt.Printf("Opening DB: %s\n", bopts.Dir)

var db *badger.DB
if isWal {
db, err = badger.Open(bopts)
} else {
db, err = badger.OpenManaged(bopts)
// If this is a new format WAL, print and return.
if isWal && !opt.oldWalFormat {
store := raftwal.Init(dir)
fmt.Printf("RaftID: %+v\n", store.Uint(raftwal.RaftId))

// TODO: Fix the pending logic.
pending := make(map[uint64]bool)

start, last := printBasic(store)
for start < last-1 {
entries, err := store.Entries(start, last+1, 64<<20)
x.Check(err)
for _, e := range entries {
printEntry(e, pending)
start = x.Max(start, e.Index)
}
}
fmt.Println("Done")
return
}

db, err := badger.OpenManaged(bopts)
x.Check(err)
// Not using posting list cache
posting.Init(db, 0)
Expand Down
26 changes: 19 additions & 7 deletions dgraph/cmd/debug/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import (
"strings"

"github.com/dgraph-io/badger/v2"
raftmigrate "github.com/dgraph-io/dgraph/dgraph/cmd/raft-migrate"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/x"
humanize "github.com/dustin/go-humanize"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
)

Expand All @@ -53,7 +54,13 @@ func printEntry(es raftpb.Entry, pending map[uint64]bool) {
fmt.Printf("%s\n", buf.Bytes())
}

func printRaft(db *badger.DB, store *raftwal.DiskStorage) {
type RaftStore interface {
raft.Storage
Checkpoint() (uint64, error)
HardState() (raftpb.HardState, error)
}

func printBasic(store RaftStore) (uint64, uint64) {
fmt.Println()
snap, err := store.Snapshot()
if err != nil {
Expand Down Expand Up @@ -100,13 +107,19 @@ func printRaft(db *badger.DB, store *raftwal.DiskStorage) {
lastIdx, err := store.LastIndex()
if err != nil {
fmt.Printf("Got error while retrieving last index: %v\n", err)
return
}
startIdx := snap.Metadata.Index + 1
fmt.Printf("Last Index: %d . Num Entries: %d .\n\n", lastIdx, lastIdx-startIdx)
return startIdx, lastIdx
}

func printRaft(db *badger.DB, store *raftmigrate.OldDiskStorage) {
startIdx, lastIdx := printBasic(store)

commitTs, err := db.MaxVersion()
x.Check(err)
// In case we need to truncate raft entries.
batch := db.NewWriteBatch()
batch := db.NewWriteBatchAt(commitTs)
defer batch.Cancel()
var numTruncates int

Expand Down Expand Up @@ -149,7 +162,7 @@ func printRaft(db *badger.DB, store *raftwal.DiskStorage) {
}
}

func overwriteSnapshot(db *badger.DB, store *raftwal.DiskStorage) error {
func overwriteSnapshot(db *badger.DB, store *raftmigrate.OldDiskStorage) error {
snap, err := store.Snapshot()
x.Checkf(err, "Unable to get snapshot")
cs := snap.Metadata.ConfState
Expand Down Expand Up @@ -197,7 +210,6 @@ func overwriteSnapshot(db *badger.DB, store *raftwal.DiskStorage) error {
if err = txn.Set(store.EntryKey(ent.Index), data); err != nil {
return err
}

data, err = hs.Marshal()
if err != nil {
return err
Expand Down Expand Up @@ -264,7 +276,7 @@ func handleWal(db *badger.DB) error {
for rid := range rids {
for gid := range gids {
fmt.Printf("Iterating with Raft Id = %d Groupd Id = %d\n", rid, gid)
store := raftwal.Init(db, rid, gid)
store := raftmigrate.Init(db, rid, gid)
switch {
case len(opt.wsetSnapshot) > 0:
err := overwriteSnapshot(db, store)
Expand Down
Loading

0 comments on commit fa62774

Please sign in to comment.