Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support passing GraphQL schema to bulk loader. #5521

Merged
merged 1 commit into from
May 27, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@ import (
"log"
"os"
"path/filepath"
"strconv"
"sync"
"time"

@@ -47,6 +48,7 @@ type options struct {
DataFiles string
DataFormat string
SchemaFile string
GqlSchemaFile string
OutDir string
ReplaceOutDir bool
TmpDir string
@@ -228,6 +230,9 @@ func (ld *loader) mapStage() {
}
x.Check(thr.Finish())

// Send the graphql triples
ld.processGqlSchema(loadType)

close(ld.readerChunkCh)
mapperWg.Wait()

@@ -239,6 +244,51 @@ func (ld *loader) mapStage() {
ld.xids = nil
}

func (ld *loader) processGqlSchema(loadType chunker.InputFormat) {
if ld.opt.GqlSchemaFile == "" {
return
}

f, err := os.Open(ld.opt.GqlSchemaFile)
x.Check(err)
defer f.Close()

key := ld.opt.BadgerKeyFile
if !ld.opt.Encrypted {
key = ""
}
r, err := enc.GetReader(key, f)
x.Check(err)
if filepath.Ext(ld.opt.GqlSchemaFile) == ".gz" {
r, err = gzip.NewReader(r)
x.Check(err)
}

buf, err := ioutil.ReadAll(r)
x.Check(err)

rdfSchema := `_:gqlschema <dgraph.type> "dgraph.graphql" .
_:gqlschema <dgraph.graphql.xid> "dgraph.graphql.schema" .
_:gqlschema <dgraph.graphql.schema> %s .
`

jsonSchema := `{
"dgraph.type": "dgraph.graphql",
"dgraph.graphql.xid": "dgraph.graphql.schema",
"dgraph.graphql.schema": %s
}`

gqlBuf := &bytes.Buffer{}
schema := strconv.Quote(string(buf))
switch loadType {
case chunker.RdfFormat:
x.Check2(gqlBuf.Write([]byte(fmt.Sprintf(rdfSchema, schema))))
case chunker.JsonFormat:
x.Check2(gqlBuf.Write([]byte(fmt.Sprintf(jsonSchema, schema))))
}
ld.readerChunkCh <- gqlBuf
}

func (ld *loader) reduceStage() {
ld.prog.setPhase(reducePhase)

2 changes: 2 additions & 0 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
@@ -56,6 +56,7 @@ func init() {
"Location of *.rdf(.gz) or *.json(.gz) file(s) to load.")
flag.StringP("schema", "s", "",
"Location of schema file.")
flag.StringP("graphql_schema", "g", "", "Location of the GraphQL schema file.")
flag.String("format", "",
"Specify file format (rdf or json) instead of getting it from filename.")
flag.Bool("encrypted", false,
@@ -115,6 +116,7 @@ func run() {
DataFiles: Bulk.Conf.GetString("files"),
DataFormat: Bulk.Conf.GetString("format"),
SchemaFile: Bulk.Conf.GetString("schema"),
GqlSchemaFile: Bulk.Conf.GetString("graphql_schema"),
Encrypted: Bulk.Conf.GetBool("encrypted"),
OutDir: Bulk.Conf.GetString("out"),
ReplaceOutDir: Bulk.Conf.GetBool("replace_out"),
27 changes: 25 additions & 2 deletions systest/bulk_live_cases_test.go
Original file line number Diff line number Diff line change
@@ -345,7 +345,7 @@ func TestBulkSingleUid(t *testing.T) {
_:erin <name> "Erin" .
_:frank <name> "Frank" .
_:grace <name> "Grace" .
`)
`, "")
defer s.cleanup()

// Ensures that the index keys are written to disk after commit.
@@ -476,7 +476,7 @@ func TestDeleteEdgeWithStar(t *testing.T) {

<0x2> <name> "Alice" .
<0x3> <name> "Bob" .
`)
`, "")
defer s.cleanup()

_, err := s.bulkCluster.client.NewTxn().Mutate(context.Background(), &api.Mutation{
@@ -499,6 +499,28 @@ func TestDeleteEdgeWithStar(t *testing.T) {

}

func TestGqlSchema(t *testing.T) {
s := newBulkOnlySuite(t, "", "", "abc")
defer s.cleanup()

t.Run("Get GraphQL schema", s.testCase(`
{
schema(func: has(dgraph.graphql.schema)) {
dgraph.graphql.schema
dgraph.graphql.xid
dgraph.type
}
}`, `
{
"schema": [{
"dgraph.graphql.schema": "abc",
"dgraph.graphql.xid": "dgraph.graphql.schema",
"dgraph.type": ["dgraph.graphql"]
}]
}`))

}

// TODO: Fix this later.
func DONOTRUNTestGoldenData(t *testing.T) {
if testing.Short() {
@@ -508,6 +530,7 @@ func DONOTRUNTestGoldenData(t *testing.T) {
s := newSuiteFromFile(t,
os.ExpandEnv("$GOPATH/src/github.com/dgraph-io/dgraph/systest/data/goldendata.schema"),
os.ExpandEnv("$GOPATH/src/github.com/dgraph-io/dgraph/systest/data/goldendata.rdf.gz"),
"",
)
defer s.cleanup()

15 changes: 10 additions & 5 deletions systest/bulk_live_fixture_test.go
Original file line number Diff line number Diff line change
@@ -53,6 +53,7 @@ type suite struct {

type suiteOpts struct {
schema string
gqlSchema string
rdfs string
skipBulkLoader bool
skipLiveLoader bool
@@ -85,7 +86,9 @@ func newSuiteInternal(t *testing.T, opts suiteOpts) *suite {
s.checkFatal(ioutil.WriteFile(rdfFile, []byte(opts.rdfs), 0644))
schemaFile := filepath.Join(rootDir, "schema.txt")
s.checkFatal(ioutil.WriteFile(schemaFile, []byte(opts.schema), 0644))
s.setup(schemaFile, rdfFile)
gqlSchemaFile := filepath.Join(rootDir, "gql_schema.txt")
s.checkFatal(ioutil.WriteFile(gqlSchemaFile, []byte(opts.gqlSchema), 0644))
s.setup(schemaFile, rdfFile, gqlSchemaFile)
return s
}

@@ -97,26 +100,27 @@ func newSuite(t *testing.T, schema, rdfs string) *suite {
return newSuiteInternal(t, opts)
}

func newBulkOnlySuite(t *testing.T, schema, rdfs string) *suite {
func newBulkOnlySuite(t *testing.T, schema, rdfs, gqlSchema string) *suite {
opts := suiteOpts{
schema: schema,
gqlSchema: gqlSchema,
rdfs: rdfs,
skipLiveLoader: true,
}
return newSuiteInternal(t, opts)
}

func newSuiteFromFile(t *testing.T, schemaFile, rdfFile string) *suite {
func newSuiteFromFile(t *testing.T, schemaFile, rdfFile, gqlSchemaFile string) *suite {
if testing.Short() {
t.Skip("Skipping system test with long runtime.")
}
s := &suite{t: t}

s.setup(schemaFile, rdfFile)
s.setup(schemaFile, rdfFile, gqlSchemaFile)
return s
}

func (s *suite) setup(schemaFile, rdfFile string) {
func (s *suite) setup(schemaFile, rdfFile, gqlSchemaFile string) {
var (
bulkDir = filepath.Join(rootDir, "bulk")
liveDir = filepath.Join(rootDir, "live")
@@ -137,6 +141,7 @@ func (s *suite) setup(schemaFile, rdfFile string) {
bulkCmd := exec.Command(os.ExpandEnv("$GOPATH/bin/dgraph"), "bulk",
"-f", rdfFile,
"-s", schemaFile,
"-g", gqlSchemaFile,
"--http", "localhost:"+strconv.Itoa(freePort(0)),
"-j=1",
"-x=true",