Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(multi-tenancy): fix live loader for case when namespace does not exist for data #7505

Merged
merged 8 commits into from
Mar 4, 2021
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions dgraph/cmd/live/batch.go
Original file line number Diff line number Diff line change
@@ -84,10 +84,11 @@ type loader struct {
conflicts map[uint64]struct{}
uidsLock sync.RWMutex

reqNum uint64
reqs chan *request
zeroconn *grpc.ClientConn
schema *schema
reqNum uint64
reqs chan *request
zeroconn *grpc.ClientConn
schema *schema
namespaces map[uint64]struct{}

upsertLock sync.RWMutex
}
126 changes: 102 additions & 24 deletions dgraph/cmd/live/run.go
Original file line number Diff line number Diff line change
@@ -33,7 +33,6 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"google.golang.org/grpc"
@@ -49,6 +48,7 @@ import (
"github.com/dgraph-io/dgraph/chunker"
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/filestore"
schemapkg "github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/dgraph/xidmap"
@@ -209,8 +209,29 @@ func getSchema(ctx context.Context, dgraphClient *dgo.Dgraph, ns uint64) (*schem
return &sch, nil
}

// validate that the schema contains the predicates whose namespace exist.
func validateSchema(sch string, namespaces map[uint64]struct{}) error {
result, err := schemapkg.Parse(sch)
if err != nil {
return err
}
for _, pred := range result.Preds {
ns := x.ParseNamespace(pred.Predicate)
if _, ok := namespaces[ns]; !ok {
return errors.Errorf("Namespace %#x doesn't exist for pred %s.", ns, pred.Predicate)
}
}
for _, typ := range result.Types {
ns := x.ParseNamespace(typ.TypeName)
if _, ok := namespaces[ns]; !ok {
return errors.Errorf("Namespace %#x doesn't exist for type %s.", ns, typ.TypeName)
}
}
return nil
}

// processSchemaFile process schema for a given gz file.
func processSchemaFile(ctx context.Context, file string, key x.SensitiveByteSlice,
func (l *loader) processSchemaFile(ctx context.Context, file string, key x.SensitiveByteSlice,
dgraphClient *dgo.Dgraph) error {
fmt.Printf("\nProcessing schema file %q\n", file)
if len(opt.authToken) > 0 {
@@ -237,6 +258,12 @@ func processSchemaFile(ctx context.Context, file string, key x.SensitiveByteSlic

op := &api.Operation{}
op.Schema = string(b)
if opt.namespaceToLoad == math.MaxUint64 {
// Verify schema if we are loding into multiple namespaces.
if err := validateSchema(op.Schema, l.namespaces); err != nil {
return err
}
}
return dgraphClient.Alter(ctx, op)
}

@@ -453,12 +480,14 @@ func (l *loader) processFile(ctx context.Context, fs filestore.FileStore, filena
}

func (l *loader) processLoadFile(ctx context.Context, rd *bufio.Reader, ck chunker.Chunker) error {
var wg sync.WaitGroup
wg.Add(1)
nqbuf := ck.NQuads()
errCh := make(chan error, 1)
// Spin a goroutine to push NQuads to mutation channel.
go func() {
defer wg.Done()
var err error
defer func() {
errCh <- err
}()
buffer := make([]*api.NQuad, 0, opt.bufferSize*opt.batchSize)

drain := func() {
@@ -499,12 +528,16 @@ func (l *loader) processLoadFile(ctx context.Context, rd *bufio.Reader, ck chunk
continue
}

if opt.namespaceToLoad != math.MaxUint64 {
// If do not preserve namespace, use the namespace passed through
// `--force-namespace` flag.
for _, nq := range nqs {
for _, nq := range nqs {
if opt.namespaceToLoad != math.MaxUint64 {
// If do not preserve namespace, use the namespace passed through
// `--force-namespace` flag.
nq.Namespace = opt.namespaceToLoad
}
if _, ok := l.namespaces[nq.Namespace]; !ok {
err = errors.Errorf("Cannot load nquad:%+v as its namespace doesn't exist.", nq)
return
}
}

if opt.upsertPredicate == "" {
@@ -539,6 +572,8 @@ func (l *loader) processLoadFile(ctx context.Context, rd *bufio.Reader, ck chunk
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errCh:
return err
default:
}

@@ -556,9 +591,7 @@ func (l *loader) processLoadFile(ctx context.Context, rd *bufio.Reader, ck chunk
}
}
nqbuf.Flush()
wg.Wait()

return nil
return <-errCh
}

func setup(opts batchMutationOptions, dc *dgo.Dgraph, conf *viper.Viper) *loader {
@@ -599,14 +632,15 @@ func setup(opts batchMutationOptions, dc *dgo.Dgraph, conf *viper.Viper) *loader

alloc := xidmap.New(connzero, db, "")
l := &loader{
opts: opts,
dc: dc,
start: time.Now(),
reqs: make(chan *request, opts.Pending*2),
conflicts: make(map[uint64]struct{}),
alloc: alloc,
db: db,
zeroconn: connzero,
opts: opts,
dc: dc,
start: time.Now(),
reqs: make(chan *request, opts.Pending*2),
conflicts: make(map[uint64]struct{}),
alloc: alloc,
db: db,
zeroconn: connzero,
namespaces: make(map[uint64]struct{}),
}

l.requestsWg.Add(opts.Pending)
@@ -618,6 +652,36 @@ func setup(opts batchMutationOptions, dc *dgo.Dgraph, conf *viper.Viper) *loader
return l
}

// populateNamespace fetches the schema and extracts the information about the existing namespaces.
func (l *loader) populateNamespaces(ctx context.Context, dc *dgo.Dgraph, isGalaxyOp bool) error {
if !isGalaxyOp {
// The below schema query returns the predicates without the namespace if context does not
// have the galaxy operation set. As we are not loading data across namespaces, so existence
// of namespace is verified when the user logs in.
l.namespaces[opt.namespaceToLoad] = struct{}{}
return nil
}

txn := dc.NewTxn()
defer txn.Discard(ctx)
res, err := txn.Query(ctx, "schema {}")
if err != nil {
return err
}

var sch schema
err = json.Unmarshal(res.GetJson(), &sch)
if err != nil {
return err
}

for _, pred := range sch.Predicates {
ns := x.ParseNamespace(pred.Predicate)
l.namespaces[ns] = struct{}{}
}
return nil
}

func run() error {
var zero string
if Live.Conf.GetString("slash_grpc_endpoint") != "" {
@@ -673,8 +737,12 @@ func run() error {
}
}()
ctx := context.Background()
var galaxyOp bool
if len(creds.GetString("user")) > 0 && creds.GetUint64("namespace") == x.GalaxyNamespace &&
opt.namespaceToLoad != x.GalaxyNamespace {
galaxyOp = true
}
if galaxyOp {
// Attach the galaxy to the context to specify that the query/mutations with this context
// will be galaxy-wide.
ctx = x.AttachGalaxyOperation(ctx, opt.namespaceToLoad)
@@ -703,8 +771,20 @@ func run() error {
l := setup(bmOpts, dg, Live.Conf)
defer l.zeroconn.Close()

if err := l.populateNamespaces(ctx, dg, galaxyOp); err != nil {
fmt.Printf("Error while populating namespaces %s\n", err)
return err
}

if opt.namespaceToLoad != math.MaxUint64 {
if _, ok := l.namespaces[opt.namespaceToLoad]; !ok {
return errors.Errorf("Cannot load into namespace %#x. It does not exist.",
opt.namespaceToLoad)
}
}

if len(opt.schemaFile) > 0 {
err := processSchemaFile(ctx, opt.schemaFile, opt.key, dg)
err := l.processSchemaFile(ctx, opt.schemaFile, opt.key, dg)
if err != nil {
if err == context.Canceled {
fmt.Printf("Interrupted while processing schema file %q\n", opt.schemaFile)
@@ -716,8 +796,7 @@ func run() error {
fmt.Printf("Processed schema file %q\n\n", opt.schemaFile)
}

l.schema, err = getSchema(ctx, dg, opt.namespaceToLoad)
if err != nil {
if l.schema, err = getSchema(ctx, dg, opt.namespaceToLoad); err != nil {
fmt.Printf("Error while loading schema from alpha %s\n", err)
return err
}
@@ -735,7 +814,6 @@ func run() error {
}
fmt.Printf("Found %d data file(s) to process\n", totalFiles)

// x.Check(dgraphClient.NewSyncMarks(filesList))
errCh := make(chan error, totalFiles)
for _, file := range filesList {
file = strings.Trim(file, " \t")
32 changes: 32 additions & 0 deletions systest/multi-tenancy/basic_test.go
Original file line number Diff line number Diff line change
@@ -288,6 +288,38 @@ func TestLiveLoadMulti(t *testing.T) {
`{"me": [{"name":"ns alice"}, {"name": "ns bob"},{"name":"ns chew"},
{"name": "ns dan"},{"name":"ns eon"}]}`, string(resp))

// Try loading data into a namespace that does not exist. Expect a failure.
err = liveLoadData(t, &liveOpts{
rdfs: fmt.Sprintf(`_:c <name> "ns eon" <%#x> .`, ns),
schema: `name: string @index(term) .`,
creds: &testutil.LoginParams{UserID: "groot", Passwd: "password",
Namespace: x.GalaxyNamespace},
forceNs: int64(0x123456), // Assuming this namespace does not exist.
})
require.Error(t, err)
require.Contains(t, err.Error(), "Cannot load into namespace 0x123456")

// Try loading into a multiple namespaces.
err = liveLoadData(t, &liveOpts{
rdfs: fmt.Sprintf(`_:c <name> "ns eon" <%#x> .`, ns),
schema: `[0x123456] name: string @index(term) .`,
creds: &testutil.LoginParams{UserID: "groot", Passwd: "password",
Namespace: x.GalaxyNamespace},
forceNs: -1,
})
require.Error(t, err)
require.Contains(t, err.Error(), "Namespace 0x123456 doesn't exist for pred")

err = liveLoadData(t, &liveOpts{
rdfs: fmt.Sprintf(`_:c <name> "ns eon" <0x123456> .`),
schema: `name: string @index(term) .`,
creds: &testutil.LoginParams{UserID: "groot", Passwd: "password",
Namespace: x.GalaxyNamespace},
forceNs: -1,
})
require.Error(t, err)
require.Contains(t, err.Error(), "Cannot load nquad")

// Load data by non-galaxy user.
require.NoError(t, liveLoadData(t, &liveOpts{
rdfs: fmt.Sprintf(`
7 changes: 4 additions & 3 deletions testutil/bulk.go
Original file line number Diff line number Diff line change
@@ -18,13 +18,14 @@ package testutil

import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"os"
"os/exec"
"strconv"

"github.com/pkg/errors"
)

type LiveOpts struct {
@@ -65,11 +66,11 @@ func LiveLoad(opts LiveOpts) error {
liveCmd.Env = append(os.Environ(), opts.Env...)
}

out, err := liveCmd.Output()
out, err := liveCmd.CombinedOutput()
if err != nil {
fmt.Printf("Error %v\n", err)
fmt.Printf("Output %v\n", string(out))
return err
return errors.Wrapf(err, string(out))
}
if CheckIfRace(out) {
return errors.New("race condition detected. check logs for more details")