Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose Badger Compression Level option in Bulk Loader #4669

Merged
merged 7 commits into from
Feb 5, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ type options struct {
OutDir string
ReplaceOutDir bool
TmpDir string
BadgerKeyFile string // used only in enterprise build. nil otherwise.
NumGoroutines int
MapBufSize uint64
SkipMapPhase bool
Expand All @@ -67,6 +66,16 @@ type options struct {
ReduceShards int

shardOutputDirs []string

// ........... Badger options ..........
// BadgerTables is the name of the mode used to load the badger tables.
BadgerTables string
// BadgerVlog is the name of the mode used to load the badger value log.
BadgerVlog string
// BadgerKeyFile is the file containing the key used for encryption. Enterprise only feature.
BadgerKeyFile string
// Badger is the compression level to use while writing to badger.
BadgerCompressionLevel int
}

type state struct {
Expand Down
36 changes: 34 additions & 2 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
"github.com/gogo/protobuf/proto"
"github.com/golang/glog"
)

type reducer struct {
Expand Down Expand Up @@ -108,8 +109,8 @@ func (r *reducer) createBadger(i int) *badger.DB {
WithLogger(nil).WithMaxCacheSize(1 << 20).
WithEncryptionKey(enc.ReadEncryptionKeyFile(r.opt.BadgerKeyFile))

// TOOD(Ibrahim): Remove this once badger is updated.
opt.ZSTDCompressionLevel = 1
// Over-write badger options based on the options provided by the user.
r.setBadgerOptions(&opt)

db, err := badger.OpenManaged(opt)
x.Check(err)
Expand All @@ -121,6 +122,37 @@ func (r *reducer) createBadger(i int) *badger.DB {
return db
}

func (r *reducer) setBadgerOptions(opt *badger.Options) {
glog.Infof("Setting Badger table load option: %s", r.state.opt.BadgerTables)
switch r.state.opt.BadgerTables {
case "mmap":
opt.TableLoadingMode = bo.MemoryMap
case "ram":
opt.TableLoadingMode = bo.LoadToRAM
case "disk":
opt.TableLoadingMode = bo.FileIO
default:
x.Fatalf("Invalid Badger Table Loading mode: %s", r.state.opt.BadgerTables)
}

glog.Infof("Setting Badger value log load option: %s", r.state.opt.BadgerVlog)
switch r.state.opt.BadgerVlog {
case "mmap":
opt.ValueLogLoadingMode = bo.MemoryMap
case "disk":
opt.ValueLogLoadingMode = bo.FileIO
default:
x.Fatalf("Invalid Badger ValueLog Loading mode: %s", r.state.opt.BadgerVlog)
}

// Set the compression level.
opt.ZSTDCompressionLevel = r.state.opt.BadgerCompressionLevel
if r.state.opt.BadgerCompressionLevel < 1 {
x.Fatalf("Invalid compression level: %d. It should be greater than zero",
r.state.opt.BadgerCompressionLevel)
}
}

type mapIterator struct {
fd *os.File
reader *bufio.Reader
Expand Down
25 changes: 20 additions & 5 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ func init() {
flag.String("tmp", "tmp",
"Temp directory used to use for on-disk scratch space. Requires free space proportional"+
" to the size of the RDF file and the amount of indexing used.")
flag.String("encryption_key_file", "",
"The file that stores the encryption key. The key size must be 16, 24, or 32 bytes long. "+
"The key size determines the corresponding block size for AES encryption "+
"(AES-128, AES-192, and AES-256 respectively). Enterprise feature.")

flag.IntP("num_go_routines", "j", int(math.Ceil(float64(runtime.NumCPU())/4.0)),
"Number of worker threads to use. MORE THREADS LEAD TO HIGHER RAM USAGE.")
Expand Down Expand Up @@ -101,6 +97,21 @@ func init() {
"Comma separated list of tokenizer plugins")
flag.Bool("new_uids", false,
"Ignore UIDs in load files and assign new ones.")

// Options around how to set up Badger.
flag.String("badger.tables", "mmap",
"[ram, mmap, disk] Specifies how Badger LSM tree is stored. "+
"Option sequence consume most to least RAM while providing best to worst read "+
"performance respectively.")
flag.String("badger.vlog", "mmap",
"[mmap, disk] Specifies how Badger Value log is stored."+
" mmap consumes more RAM, but provides better performance.")
flag.String("encryption_key_file", "",
"The file that stores the encryption key. The key size must be 16, 24, or 32 bytes long. "+
"The key size determines the corresponding block size for AES encryption "+
"(AES-128, AES-192, and AES-256 respectively). Enterprise feature.")
flag.Int("badger.compression_level", 1,
"The compression level for Badger. A higher value uses more resources.")
}

func run() {
Expand All @@ -111,7 +122,6 @@ func run() {
OutDir: Bulk.Conf.GetString("out"),
ReplaceOutDir: Bulk.Conf.GetBool("replace_out"),
TmpDir: Bulk.Conf.GetString("tmp"),
BadgerKeyFile: Bulk.Conf.GetString("encryption_key_file"),
NumGoroutines: Bulk.Conf.GetInt("num_go_routines"),
MapBufSize: uint64(Bulk.Conf.GetInt("mapoutput_mb")),
SkipMapPhase: Bulk.Conf.GetBool("skip_map_phase"),
Expand All @@ -126,6 +136,11 @@ func run() {
ReduceShards: Bulk.Conf.GetInt("reduce_shards"),
CustomTokenizers: Bulk.Conf.GetString("custom_tokenizers"),
NewUids: Bulk.Conf.GetBool("new_uids"),

BadgerTables: Bulk.Conf.GetString("badger.tables"),
BadgerVlog: Bulk.Conf.GetString("badger.vlog"),
BadgerKeyFile: Bulk.Conf.GetString("encryption_key_file"),
BadgerCompressionLevel: Bulk.Conf.GetInt("badger.compression_level"),
}

x.PrintVersion()
Expand Down