diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index 43d73ce8607..9a103acdec2 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -49,7 +49,6 @@ type options struct { OutDir string ReplaceOutDir bool TmpDir string - BadgerKeyFile string // used only in enterprise build. nil otherwise. NumGoroutines int MapBufSize uint64 SkipMapPhase bool @@ -67,6 +66,12 @@ type options struct { ReduceShards int shardOutputDirs []string + + // ........... Badger options .......... + // BadgerKeyFile is the file containing the key used for encryption. Enterprise only feature. + BadgerKeyFile string + // BadgerCompressionlevel is the compression level to use while writing to badger. + BadgerCompressionLevel int } type state struct { diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 20f9b783928..6bb45e04b0f 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -108,8 +108,8 @@ func (r *reducer) createBadger(i int) *badger.DB { WithLogger(nil).WithMaxCacheSize(1 << 20). WithEncryptionKey(enc.ReadEncryptionKeyFile(r.opt.BadgerKeyFile)) - // TOOD(Ibrahim): Remove this once badger is updated. - opt.ZSTDCompressionLevel = 1 + // Over-write badger options based on the options provided by the user. + r.setBadgerOptions(&opt) db, err := badger.OpenManaged(opt) x.Check(err) @@ -121,6 +121,15 @@ func (r *reducer) createBadger(i int) *badger.DB { return db } +func (r *reducer) setBadgerOptions(opt *badger.Options) { + // Set the compression level. + opt.ZSTDCompressionLevel = r.state.opt.BadgerCompressionLevel + if r.state.opt.BadgerCompressionLevel < 1 { + x.Fatalf("Invalid compression level: %d. It should be greater than zero", + r.state.opt.BadgerCompressionLevel) + } +} + type mapIterator struct { fd *os.File reader *bufio.Reader diff --git a/dgraph/cmd/bulk/run.go b/dgraph/cmd/bulk/run.go index 23c6a23fb43..56027c31bd1 100644 --- a/dgraph/cmd/bulk/run.go +++ b/dgraph/cmd/bulk/run.go @@ -65,10 +65,6 @@ func init() { flag.String("tmp", "tmp", "Temp directory used to use for on-disk scratch space. Requires free space proportional"+ " to the size of the RDF file and the amount of indexing used.") - flag.String("encryption_key_file", "", - "The file that stores the encryption key. The key size must be 16, 24, or 32 bytes long. "+ - "The key size determines the corresponding block size for AES encryption "+ - "(AES-128, AES-192, and AES-256 respectively). Enterprise feature.") flag.IntP("num_go_routines", "j", int(math.Ceil(float64(runtime.NumCPU())/4.0)), "Number of worker threads to use. MORE THREADS LEAD TO HIGHER RAM USAGE.") @@ -101,6 +97,14 @@ func init() { "Comma separated list of tokenizer plugins") flag.Bool("new_uids", false, "Ignore UIDs in load files and assign new ones.") + + // Options around how to set up Badger. + flag.String("encryption_key_file", "", + "The file that stores the encryption key. The key size must be 16, 24, or 32 bytes long. "+ + "The key size determines the corresponding block size for AES encryption "+ + "(AES-128, AES-192, and AES-256 respectively). Enterprise feature.") + flag.Int("badger.compression_level", 1, + "The compression level for Badger. A higher value uses more resources.") } func run() { @@ -111,7 +115,6 @@ func run() { OutDir: Bulk.Conf.GetString("out"), ReplaceOutDir: Bulk.Conf.GetBool("replace_out"), TmpDir: Bulk.Conf.GetString("tmp"), - BadgerKeyFile: Bulk.Conf.GetString("encryption_key_file"), NumGoroutines: Bulk.Conf.GetInt("num_go_routines"), MapBufSize: uint64(Bulk.Conf.GetInt("mapoutput_mb")), SkipMapPhase: Bulk.Conf.GetBool("skip_map_phase"), @@ -126,6 +129,9 @@ func run() { ReduceShards: Bulk.Conf.GetInt("reduce_shards"), CustomTokenizers: Bulk.Conf.GetString("custom_tokenizers"), NewUids: Bulk.Conf.GetBool("new_uids"), + + BadgerKeyFile: Bulk.Conf.GetString("encryption_key_file"), + BadgerCompressionLevel: Bulk.Conf.GetInt("badger.compression_level"), } x.PrintVersion()