Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Breaking flag] fix(Backups): new badger SuperFlag, NumGoroutines option solves OOM crashes #7387

Merged
merged 21 commits into from
Feb 22, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,12 @@ they form a Raft group and provide synchronous replication.
flag.StringP("postings", "p", "p", "Directory to store posting lists.")
flag.String("tmp", "t", "Directory to store temporary buffers.")

// Options around how to set up Badger.
flag.String("badger.compression", "snappy",
"[none, zstd:level, snappy] Specifies the compression algorithm and the compression"+
"level (if applicable) for the postings directory. none would disable compression,"+
" while zstd:1 would set zstd compression at level 1.")
flag.String("badger", worker.BadgerDefaults,
`Various badger options.
goroutines=N provides the number of goroutines to use in badger.Stream.
karlmcguire marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this goroutines flag in Alpha for creating local threads during backups, and Badger uses the goroutines value as the default Stream.numGo.

compression=[none, zstd:level, snappy] specifies the compression algorithm and the compression
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this compression flag in Alpha for setting the compression type of Badger.

level (if applicable) for the postings directory. "none" would disable compression, while
"zstd:1" would set zstd compression at level 1.`)
enc.RegisterFlags(flag)

// Snapshot and Transactions.
Expand Down Expand Up @@ -605,7 +606,8 @@ func run() {
pstoreIndexCacheSize := (cachePercent[2] * (totalCache << 20)) / 100
walCache := (cachePercent[3] * (totalCache << 20)) / 100

ctype, clevel := x.ParseCompression(Alpha.Conf.GetString("badger.compression"))
badger := x.NewSuperFlag(Alpha.Conf.GetString("badger")).MergeAndCheckDefault(worker.BadgerDefaults)
ctype, clevel := x.ParseCompression(badger.GetString("compression"))

conf := audit.GetAuditConf(Alpha.Conf.GetString("audit"))
opts := worker.Options{
Expand Down Expand Up @@ -672,6 +674,7 @@ func run() {
NumPendingProposals: Alpha.Conf.GetInt("pending_proposals"),
ZeroAddr: strings.Split(Alpha.Conf.GetString("zero"), ","),
Raft: raft,
Badger: badger,
WhiteListedIPRanges: ips,
MaxRetries: Alpha.Conf.GetInt("max_retries"),
StrictMutations: opts.MutationsMode == worker.StrictMutations,
Expand Down
23 changes: 12 additions & 11 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,21 +115,22 @@ func init() {
"Ignore UIDs in load files and assign new ones.")

// Options around how to set up Badger.
flag.String("badger.compression", "snappy",
"[none, zstd:level, snappy] Specifies the compression algorithm and the compression"+
"level (if applicable) for the postings directory. none would disable compression,"+
" while zstd:1 would set zstd compression at level 1.")
flag.Int64("badger.cache_mb", 64, "Total size of cache (in MB) per shard in reducer.")
flag.String("badger.cache_percentage", "70,30",
"Cache percentages summing up to 100 for various caches"+
" (FORMAT: BlockCacheSize, IndexCacheSize).")
flag.String("badger", worker.BadgerDefaults,
`Various badger options.
compression=[none, zstd:level, snappy] specifies the compression algorithm and the compression
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this compression flag in Bulk for setting the compression type of Badger (similar to how Alpha uses it).

level (if applicable) for the postings directory. "none" would disable compression, while
"zstd:1" would set zstd compression at level 1.
cache_mb=N total size of cache (in MB) per shard in the reducer.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this cache_mb flag in Bulk for setting the Badger cache size. The default value is set in worker/server_state.go.

cache_percentage=N cache percentages summing up to 100 for various caches.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this cache_percentage flag in Bulk for setting the BlockCacheSize and IndexCacheSize.

(FORMAT: BlockCacheSize, IndexCacheSize)`)
x.RegisterClientTLSFlags(flag)
// Encryption and Vault options
enc.RegisterFlags(flag)
}

func run() {
ctype, clevel := x.ParseCompression(Bulk.Conf.GetString("badger.compression"))
badger := x.NewSuperFlag(Bulk.Conf.GetString("badger")).MergeAndCheckDefault(worker.BadgerDefaults)
ctype, clevel := x.ParseCompression(badger.GetString("compression"))
opt := options{
DataFiles: Bulk.Conf.GetString("files"),
DataFormat: Bulk.Conf.GetString("format"),
Expand Down Expand Up @@ -166,9 +167,9 @@ func run() {
os.Exit(0)
}

totalCache := int64(Bulk.Conf.GetInt("badger.cache_mb"))
totalCache := int64(badger.GetUint64("cache_mb"))
x.AssertTruef(totalCache >= 0, "ERROR: Cache size must be non-negative")
cachePercent, err := x.GetCachePercentages(Bulk.Conf.GetString("badger.cache_percentage"), 2)
cachePercent, err := x.GetCachePercentages(badger.GetString("cache_percentage"), 2)
x.Check(err)
totalCache <<= 20 // Convert to MB.
opt.BlockCacheSize = (cachePercent[0] * totalCache) / 100
Expand Down
8 changes: 1 addition & 7 deletions worker/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@ import (
"github.com/dgraph-io/dgraph/x"
)

const (
// backupNumGo is the number of go routines used by the backup stream writer.
backupNumGo = 16
)

// BackupProcessor handles the different stages of the backup process.
type BackupProcessor struct {
// DB is the Badger pstore managed by this node.
Expand All @@ -65,7 +60,7 @@ func NewBackupProcessor(db *badger.DB, req *pb.BackupRequest) *BackupProcessor {
bp := &BackupProcessor{
DB: db,
Request: req,
threads: make([]*threadLocal, backupNumGo),
threads: make([]*threadLocal, x.WorkerConfig.Badger.GetUint64("goroutines")),
karlmcguire marked this conversation as resolved.
Show resolved Hide resolved
}
for i := range bp.threads {
bp.threads[i] = &threadLocal{
Expand Down Expand Up @@ -128,7 +123,6 @@ func (pr *BackupProcessor) WriteBackup(ctx context.Context) (*pb.BackupResponse,

stream := pr.DB.NewStreamAt(pr.Request.ReadTs)
stream.LogPrefix = "Dgraph.Backup"
stream.NumGo = backupNumGo

stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
tl := pr.threads[itr.ThreadId]
Expand Down
5 changes: 5 additions & 0 deletions worker/server_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ import (
"github.com/golang/glog"
)

const (
BadgerDefaults = "goroutines=8; compression=snappy; cache_mb=64; cache_percentage=70,30"
Copy link
Contributor Author

@karlmcguire karlmcguire Feb 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of these defaults aren't used by Alpha, which is kind of confusing...

We use goroutines and compression in Alpha.

We use compression, cache_mb, and cache_percentage in Bulk.

It doesn't make sense to split up the BadgerDefaults across packages, but it might be worth adding a comment explaining which packages use which options.

)

// ServerState holds the state of the Dgraph server.
type ServerState struct {
FinishCh chan struct{} // channel to wait for all pending reqs to finish.
Expand Down Expand Up @@ -109,6 +113,7 @@ func (s *ServerState) initStorage() {
opt := badger.DefaultOptions(Config.PostingDir).
WithValueThreshold(1 << 10 /* 1KB */).
WithNumVersionsToKeep(math.MaxInt32).
WithNumGoroutines(int(x.WorkerConfig.Badger.GetUint64("goroutines"))).
WithBlockCacheSize(Config.PBlockCacheSize).
WithIndexCacheSize(Config.PIndexCacheSize)
opt = setBadgerOptions(opt)
Expand Down
2 changes: 2 additions & 0 deletions x/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type WorkerOptions struct {
TLSServerConfig *tls.Config
// Raft stores options related to Raft.
Raft *SuperFlag
// Badger stores options related to Badger.
Badger *SuperFlag
// WhiteListedIPRanges is a list of IP ranges from which requests will be allowed.
WhiteListedIPRanges []IPRange
// MaxRetries is the maximum number of times to retry a commit before giving up.
Expand Down