Skip to content

Commit

Permalink
Support custom tokenizers during bulk load.
Browse files Browse the repository at this point in the history
  • Loading branch information
Javier Alvarado committed Dec 1, 2018
1 parent 6fdc4d4 commit 55c72db
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 32 deletions.
31 changes: 16 additions & 15 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,22 @@ import (
)

type options struct {
RDFDir string
SchemaFile string
DgraphsDir string
TmpDir string
NumGoroutines int
MapBufSize int64
ExpandEdges bool
SkipMapPhase bool
CleanupTmp bool
NumShufflers int
Version bool
StoreXids bool
ZeroAddr string
HttpAddr string
IgnoreErrors bool
RDFDir string
SchemaFile string
DgraphsDir string
TmpDir string
NumGoroutines int
MapBufSize int64
ExpandEdges bool
SkipMapPhase bool
CleanupTmp bool
NumShufflers int
Version bool
StoreXids bool
ZeroAddr string
HttpAddr string
IgnoreErrors bool
CustomTokenizers string

MapShards int
ReduceShards int
Expand Down
44 changes: 27 additions & 17 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package bulk
import (
"encoding/json"
"fmt"
"github.com/dgraph-io/dgraph/tok"
"log"
"net/http"
_ "net/http/pprof"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"

"github.com/dgraph-io/dgraph/x"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -84,27 +86,30 @@ func init() {
"Number of reduce shards. This determines the number of dgraph instances in the final "+
"cluster. Increasing this potentially decreases the reduce stage runtime by using "+
"more parallelism, but increases memory usage.")
flag.String("custom_tokenizers", "",
"Comma separated list of tokenizer plugins")
}

func run() {
opt := options{
RDFDir: Bulk.Conf.GetString("rdfs"),
SchemaFile: Bulk.Conf.GetString("schema_file"),
DgraphsDir: Bulk.Conf.GetString("out"),
TmpDir: Bulk.Conf.GetString("tmp"),
NumGoroutines: Bulk.Conf.GetInt("num_go_routines"),
MapBufSize: int64(Bulk.Conf.GetInt("mapoutput_mb")),
ExpandEdges: Bulk.Conf.GetBool("expand_edges"),
SkipMapPhase: Bulk.Conf.GetBool("skip_map_phase"),
CleanupTmp: Bulk.Conf.GetBool("cleanup_tmp"),
NumShufflers: Bulk.Conf.GetInt("shufflers"),
Version: Bulk.Conf.GetBool("version"),
StoreXids: Bulk.Conf.GetBool("store_xids"),
ZeroAddr: Bulk.Conf.GetString("zero"),
HttpAddr: Bulk.Conf.GetString("http"),
IgnoreErrors: Bulk.Conf.GetBool("ignore_errors"),
MapShards: Bulk.Conf.GetInt("map_shards"),
ReduceShards: Bulk.Conf.GetInt("reduce_shards"),
RDFDir: Bulk.Conf.GetString("rdfs"),
SchemaFile: Bulk.Conf.GetString("schema_file"),
DgraphsDir: Bulk.Conf.GetString("out"),
TmpDir: Bulk.Conf.GetString("tmp"),
NumGoroutines: Bulk.Conf.GetInt("num_go_routines"),
MapBufSize: int64(Bulk.Conf.GetInt("mapoutput_mb")),
ExpandEdges: Bulk.Conf.GetBool("expand_edges"),
SkipMapPhase: Bulk.Conf.GetBool("skip_map_phase"),
CleanupTmp: Bulk.Conf.GetBool("cleanup_tmp"),
NumShufflers: Bulk.Conf.GetInt("shufflers"),
Version: Bulk.Conf.GetBool("version"),
StoreXids: Bulk.Conf.GetBool("store_xids"),
ZeroAddr: Bulk.Conf.GetString("zero"),
HttpAddr: Bulk.Conf.GetString("http"),
IgnoreErrors: Bulk.Conf.GetBool("ignore_errors"),
MapShards: Bulk.Conf.GetInt("map_shards"),
ReduceShards: Bulk.Conf.GetInt("reduce_shards"),
CustomTokenizers: Bulk.Conf.GetString("custom_tokenizers"),
}

x.PrintVersion()
Expand All @@ -125,6 +130,11 @@ func run() {
opt.NumShufflers, opt.ReduceShards)
os.Exit(1)
}
if opt.CustomTokenizers != "" {
for _, soFile := range strings.Split(opt.CustomTokenizers, ",") {
tok.LoadCustomTokenizer(soFile)
}
}

opt.MapBufSize <<= 20 // Convert from MB to B.

Expand Down

0 comments on commit 55c72db

Please sign in to comment.