Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Breaking flag] fix(Backups): new badger SuperFlag, NumGoroutines option solves OOM crashes #7387

Merged
merged 21 commits into from
Feb 22, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,15 @@ they form a Raft group and provide synchronous replication.
flag.StringP("postings", "p", "p", "Directory to store posting lists.")
flag.String("tmp", "t", "Directory to store temporary buffers.")

// Options around how to set up Badger.
flag.String("badger.compression", "snappy",
"[none, zstd:level, snappy] Specifies the compression algorithm and the compression"+
"level (if applicable) for the postings directory. none would disable compression,"+
" while zstd:1 would set zstd compression at level 1.")
flag.String("badger", worker.BadgerDefaults,
`Various badger options.
goroutines=N provides the number of goroutines to use in badger.Stream.
karlmcguire marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this goroutines flag in Alpha for creating local threads during backups, and Badger uses the goroutines value as the default Stream.numGo.

compression=[none, zstd:level, snappy] specifies the compression algorithm and the compression
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this compression flag in Alpha for setting the compression type of Badger.

level (if applicable) for the postings directory. "none" would disable compression, while
"zstd:1" would set zstd compression at level 1.
cache_mb=N total size of cache (in MB) per shard in the reducer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's one more cache_mb flags in the x/flags.go file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also have the posting list cache. We can move both the badger caches inside the --badger flag and keep the posting list size flag outside.

Something like dgraph --badger="cache_mb=100; cache_percentage=50,50" --posting-cache-mb=50

cache_percentage=N cache percentages summing up to 100 for various caches.
(FORMAT: BlockCacheSize, IndexCacheSize)`)
enc.RegisterFlags(flag)

// Snapshot and Transactions.
Expand Down Expand Up @@ -605,7 +609,8 @@ func run() {
pstoreIndexCacheSize := (cachePercent[2] * (totalCache << 20)) / 100
walCache := (cachePercent[3] * (totalCache << 20)) / 100

ctype, clevel := x.ParseCompression(Alpha.Conf.GetString("badger.compression"))
badger := x.NewSuperFlag(Alpha.Conf.GetString("badger")).MergeAndCheckDefault(worker.BadgerDefaults)
ctype, clevel := x.ParseCompression(badger.GetString("compression"))

conf := audit.GetAuditConf(Alpha.Conf.GetString("audit"))
opts := worker.Options{
Expand Down Expand Up @@ -672,6 +677,7 @@ func run() {
NumPendingProposals: Alpha.Conf.GetInt("pending_proposals"),
ZeroAddr: strings.Split(Alpha.Conf.GetString("zero"), ","),
Raft: raft,
Badger: badger,
WhiteListedIPRanges: ips,
MaxRetries: Alpha.Conf.GetInt("max_retries"),
StrictMutations: opts.MutationsMode == worker.StrictMutations,
Expand Down
24 changes: 13 additions & 11 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,21 +115,23 @@ func init() {
"Ignore UIDs in load files and assign new ones.")

// Options around how to set up Badger.
flag.String("badger.compression", "snappy",
"[none, zstd:level, snappy] Specifies the compression algorithm and the compression"+
"level (if applicable) for the postings directory. none would disable compression,"+
" while zstd:1 would set zstd compression at level 1.")
flag.Int64("badger.cache_mb", 64, "Total size of cache (in MB) per shard in reducer.")
flag.String("badger.cache_percentage", "70,30",
"Cache percentages summing up to 100 for various caches"+
" (FORMAT: BlockCacheSize, IndexCacheSize).")
flag.String("badger", worker.BadgerDefaults,
`Various badger options.
goroutines=N provides the number of goroutines to use in badger.Stream.
compression=[none, zstd:level, snappy] specifies the compression algorithm and the compression
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this compression flag in Bulk for setting the compression type of Badger (similar to how Alpha uses it).

level (if applicable) for the postings directory. "none" would disable compression, while
"zstd:1" would set zstd compression at level 1.
cache_mb=N total size of cache (in MB) per shard in the reducer.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this cache_mb flag in Bulk for setting the Badger cache size. The default value is set in worker/server_state.go.

cache_percentage=N cache percentages summing up to 100 for various caches.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this cache_percentage flag in Bulk for setting the BlockCacheSize and IndexCacheSize.

(FORMAT: BlockCacheSize, IndexCacheSize)`)
x.RegisterClientTLSFlags(flag)
// Encryption and Vault options
enc.RegisterFlags(flag)
}

func run() {
ctype, clevel := x.ParseCompression(Bulk.Conf.GetString("badger.compression"))
badger := x.NewSuperFlag(Bulk.Conf.GetString("badger")).MergeAndCheckDefault(worker.BadgerDefaults)
ctype, clevel := x.ParseCompression(badger.GetString("compression"))
opt := options{
DataFiles: Bulk.Conf.GetString("files"),
DataFormat: Bulk.Conf.GetString("format"),
Expand Down Expand Up @@ -166,9 +168,9 @@ func run() {
os.Exit(0)
}

totalCache := int64(Bulk.Conf.GetInt("badger.cache_mb"))
totalCache := int64(badger.GetUint64("cache_mb"))
x.AssertTruef(totalCache >= 0, "ERROR: Cache size must be non-negative")
cachePercent, err := x.GetCachePercentages(Bulk.Conf.GetString("badger.cache_percentage"), 2)
cachePercent, err := x.GetCachePercentages(badger.GetString("cache_percentage"), 2)
x.Check(err)
totalCache <<= 20 // Convert to MB.
opt.BlockCacheSize = (cachePercent[0] * totalCache) / 100
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ go 1.12
// replace github.com/dgraph-io/badger/v2 => /home/mrjn/go/src/github.com/dgraph-io/badger
// replace github.com/dgraph-io/ristretto => /home/mrjn/go/src/github.com/dgraph-io/ristretto

replace github.com/dgraph-io/badger/v3 => /home/karl/go/src/github.com/dgraph-io/badger
karlmcguire marked this conversation as resolved.
Show resolved Hide resolved

require (
contrib.go.opencensus.io/exporter/jaeger v0.1.0
contrib.go.opencensus.io/exporter/prometheus v0.1.0
Expand Down
6 changes: 2 additions & 4 deletions worker/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ import (
)

const (
// backupNumGo is the number of go routines used by the backup stream writer.
backupNumGo = 16
BadgerDefaults = "goroutines=8; compression=snappy; cache_mb=64; cache_percentage=70,30;"
karlmcguire marked this conversation as resolved.
Show resolved Hide resolved
)

// BackupProcessor handles the different stages of the backup process.
Expand All @@ -65,7 +64,7 @@ func NewBackupProcessor(db *badger.DB, req *pb.BackupRequest) *BackupProcessor {
bp := &BackupProcessor{
DB: db,
Request: req,
threads: make([]*threadLocal, backupNumGo),
threads: make([]*threadLocal, x.WorkerConfig.Badger.GetUint64("goroutines")),
karlmcguire marked this conversation as resolved.
Show resolved Hide resolved
}
for i := range bp.threads {
bp.threads[i] = &threadLocal{
Expand Down Expand Up @@ -128,7 +127,6 @@ func (pr *BackupProcessor) WriteBackup(ctx context.Context) (*pb.BackupResponse,

stream := pr.DB.NewStreamAt(pr.Request.ReadTs)
stream.LogPrefix = "Dgraph.Backup"
stream.NumGo = backupNumGo

stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
tl := pr.threads[itr.ThreadId]
Expand Down
1 change: 1 addition & 0 deletions worker/server_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (s *ServerState) initStorage() {
opt := badger.DefaultOptions(Config.PostingDir).
WithValueThreshold(1 << 10 /* 1KB */).
WithNumVersionsToKeep(math.MaxInt32).
WithNumGoroutines(int(x.WorkerConfig.Badger.GetUint64("goroutines"))).
WithBlockCacheSize(Config.PBlockCacheSize).
WithIndexCacheSize(Config.PIndexCacheSize)
opt = setBadgerOptions(opt)
Expand Down
2 changes: 2 additions & 0 deletions x/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type WorkerOptions struct {
TLSServerConfig *tls.Config
// Raft stores options related to Raft.
Raft *SuperFlag
// Badger stores options related to Badger.
Badger *SuperFlag
// WhiteListedIPRanges is a list of IP ranges from which requests will be allowed.
WhiteListedIPRanges []IPRange
// MaxRetries is the maximum number of times to retry a commit before giving up.
Expand Down