Skip to content

Commit

Permalink
Support bulk loader use-case to import unencrypted export and encrypt…
Browse files Browse the repository at this point in the history
… the result. (hypermodeinc#5209)

Fixes DGRAPH-1254
  • Loading branch information
parasssh authored and dna2github committed Jul 18, 2020
1 parent c4611af commit f8b3d8d
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 11 deletions.
7 changes: 4 additions & 3 deletions chunker/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,10 @@ func slurpQuoted(r *bufio.Reader, out *bytes.Buffer) error {
}
}

// FileReader returns an open reader and file on the given file. Gzip-compressed input is detected
// and decompressed automatically even without the gz extension. The caller is responsible for
// calling the returned cleanup function when done with the reader.
// FileReader returns an open reader on the given file. Gzip-compressed input is detected
// and decompressed automatically even without the gz extension. The keyfile, if non-nil,
// is used to decrypt the file. The caller is responsible for calling the returned cleanup
// function when done with the reader.
func FileReader(file string, keyfile string) (rd *bufio.Reader, cleanup func()) {
var f *os.File
var err error
Expand Down
20 changes: 15 additions & 5 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type options struct {
CustomTokenizers string
NewUids bool
ClientDir string
Encrypted bool

MapShards int
ReduceShards int
Expand Down Expand Up @@ -116,7 +117,7 @@ func newLoader(opt *options) *loader {
readerChunkCh: make(chan *bytes.Buffer, opt.NumGoroutines),
writeTs: getWriteTimestamp(zero),
}
st.schema = newSchemaStore(readSchema(opt.SchemaFile, opt.BadgerKeyFile), opt, st)
st.schema = newSchemaStore(readSchema(opt), opt, st)
ld := &loader{
state: st,
mappers: make([]*mapper, opt.NumGoroutines),
Expand All @@ -143,13 +144,18 @@ func getWriteTimestamp(zero *grpc.ClientConn) uint64 {
}
}

func readSchema(filename string, keyfile string) *schema.ParsedSchema {
f, err := os.Open(filename)
func readSchema(opt *options) *schema.ParsedSchema {
f, err := os.Open(opt.SchemaFile)
x.Check(err)
defer f.Close()

keyfile := opt.BadgerKeyFile
if !opt.Encrypted {
keyfile = ""
}
r, err := enc.GetReader(keyfile, f)
x.Check(err)
if filepath.Ext(filename) == ".gz" {
if filepath.Ext(opt.SchemaFile) == ".gz" {
r, err = gzip.NewReader(r)
x.Check(err)
}
Expand Down Expand Up @@ -208,7 +214,11 @@ func (ld *loader) mapStage() {
go func(file string) {
defer thr.Done(nil)

r, cleanup := chunker.FileReader(file, ld.opt.BadgerKeyFile)
keyfile := ld.opt.BadgerKeyFile
if !ld.opt.Encrypted {
keyfile = ""
}
r, cleanup := chunker.FileReader(file, keyfile)
defer cleanup()

chunk := chunker.NewChunker(loadType, 1000)
Expand Down
14 changes: 11 additions & 3 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func init() {
"Location of schema file.")
flag.String("format", "",
"Specify file format (rdf or json) instead of getting it from filename.")
flag.Bool("encrypted", false,
"Flag to indicate whether schema and data files are encrypted.")
flag.String("out", defaultOutDir,
"Location to write the final dgraph data directories.")
flag.Bool("replace_out", false,
Expand Down Expand Up @@ -101,9 +103,10 @@ func init() {

// Options around how to set up Badger.
flag.String("encryption_key_file", "",
"The file that stores the encryption key. The key size must be 16, 24, or 32 bytes long. "+
"The key size determines the corresponding block size for AES encryption "+
"(AES-128, AES-192, and AES-256 respectively). Enterprise feature.")
"The file that stores the encryption key. The key size must be 16/24/32 bytes long."+
" The key size indicates the chosen AES encryption (AES-128/192/256 respectively). "+
" This key is used to encrypt the output data directories and to decrypt the input "+
" schema and data files (if encrytped). Enterprise feature.")
flag.Int("badger.compression_level", 1,
"The compression level for Badger. A higher value uses more resources.")
}
Expand All @@ -113,6 +116,7 @@ func run() {
DataFiles: Bulk.Conf.GetString("files"),
DataFormat: Bulk.Conf.GetString("format"),
SchemaFile: Bulk.Conf.GetString("schema"),
Encrypted: Bulk.Conf.GetBool("encrypted"),
OutDir: Bulk.Conf.GetString("out"),
ReplaceOutDir: Bulk.Conf.GetBool("replace_out"),
TmpDir: Bulk.Conf.GetString("tmp"),
Expand Down Expand Up @@ -145,6 +149,10 @@ func run() {
fmt.Printf("Cannot enable encryption: %s", x.ErrNotSupported)
os.Exit(1)
}
if opt.Encrypted && opt.BadgerKeyFile == "" {
fmt.Printf("Must use --encryption_key_file option with --encrypted option.\n")
os.Exit(1)
}
if opt.SchemaFile == "" {
fmt.Fprint(os.Stderr, "Schema file must be specified.\n")
os.Exit(1)
Expand Down

0 comments on commit f8b3d8d

Please sign in to comment.