Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release/v21.03: fix(bulk): upsert guardian/groot for all existing namespaces (#7759) #7769

Merged
merged 1 commit into from
Apr 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ func run() {

loader.prog.mapEdgeCount = bulkMeta.EdgeCount
loader.schema.schemaMap = bulkMeta.SchemaMap
loader.schema.types = bulkMeta.Types
} else {
loader.mapStage()
mergeMapShardsIntoReduceShards(&opt)
Expand All @@ -331,6 +332,7 @@ func run() {
bulkMeta := pb.BulkMeta{
EdgeCount: loader.prog.mapEdgeCount,
SchemaMap: loader.schema.schemaMap,
Types: loader.schema.types,
}
bulkMetaData, err := bulkMeta.Marshal()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/bulk/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func newSchemaStore(initial *schema.ParsedSchema, opt *options, state *state) *s
// whenever we see data for a new namespace.
s.checkAndSetInitialSchema(x.GalaxyNamespace)

s.types = initial.Types
// This is from the schema read from the schema file.
for _, sch := range initial.Preds {
p := sch.Predicate
Expand All @@ -63,8 +64,6 @@ func newSchemaStore(initial *schema.ParsedSchema, opt *options, state *state) *s
s.schemaMap[p] = sch
}

s.types = initial.Types

return s
}

Expand Down Expand Up @@ -102,6 +101,7 @@ func (s *schemaStore) checkAndSetInitialSchema(namespace uint64) {
for _, update := range schema.CompleteInitialSchema(namespace) {
s.schemaMap[update.Predicate] = update
}
s.types = append(s.types, schema.CompleteInitialTypes(namespace)...)

if s.opt.StoreXids {
s.schemaMap[x.NamespaceAttr(namespace, "xid")] = &pb.SchemaUpdate{
Expand Down
45 changes: 26 additions & 19 deletions edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,28 +413,35 @@ func ResetAcl(closer *z.Closer) {
// The acl feature is not turned on.
return
}
for closer.Ctx().Err() == nil {
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
defer cancel()
ctx = x.AttachNamespace(ctx, x.GalaxyNamespace)
if err := upsertGuardian(ctx); err != nil {
glog.Infof("Unable to upsert the guardian group. Error: %v", err)
time.Sleep(100 * time.Millisecond)
continue

upsertGuardianAndGroot := func(ns uint64) {
for closer.Ctx().Err() == nil {
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
defer cancel()
ctx = x.AttachNamespace(ctx, ns)
if err := upsertGuardian(ctx); err != nil {
glog.Infof("Unable to upsert the guardian group. Error: %v", err)
time.Sleep(100 * time.Millisecond)
continue
}
break
}
break
}

for closer.Ctx().Err() == nil {
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
defer cancel()
ctx = x.AttachNamespace(ctx, x.GalaxyNamespace)
if err := upsertGroot(ctx, "password"); err != nil {
glog.Infof("Unable to upsert the groot account. Error: %v", err)
time.Sleep(100 * time.Millisecond)
continue
for closer.Ctx().Err() == nil {
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
defer cancel()
ctx = x.AttachNamespace(ctx, ns)
if err := upsertGroot(ctx, "password"); err != nil {
glog.Infof("Unable to upsert the groot account. Error: %v", err)
time.Sleep(100 * time.Millisecond)
continue
}
break
}
break
}

for ns := range schema.State().Namespaces() {
upsertGuardianAndGroot(ns)
}
}

Expand Down
1 change: 1 addition & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,7 @@ message UpdateGraphQLSchemaResponse {
message BulkMeta {
int64 edge_count = 1;
map<string, SchemaUpdate> schema_map = 2;
repeated TypeUpdate types = 3;
}

message DeleteNsRequest {
Expand Down
715 changes: 389 additions & 326 deletions protos/pb/pb.pb.go

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,22 @@ func (s *state) DeleteType(typeName string) error {
return nil
}

// Namespaces returns the active namespaces based on the current types.
func (s *state) Namespaces() map[uint64]struct{} {
if s == nil {
return nil
}

s.RLock()
defer s.RUnlock()

ns := make(map[uint64]struct{})
for typ := range s.types {
ns[x.ParseNamespace(typ)] = struct{}{}
}
return ns
}

// DeletePredsForNs deletes the predicate information for the namespace from the schema.
func (s *state) DeletePredsForNs(delNs uint64) {
if s == nil {
Expand Down
25 changes: 25 additions & 0 deletions systest/bulk_live/bulk/alpha_acl.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
version: "3.5"
services:
alpha1:
image: dgraph/dgraph:latest
working_dir: /data/alpha1
labels:
cluster: test
ports:
- "8080"
- "9080"
volumes:
- type: bind
source: $GOPATH/bin
target: /gobin
read_only: true
- type: bind
source: ./data/out/0/p
target: /posting
read_only: false
- type: bind
source: ../../../ee/acl/hmac-secret
target: /dgraph-acl/hmac-secret
read_only: true
command: /gobin/dgraph alpha --my=alpha1:7080 --zero=zero1:5080 --logtostderr -v=2 -p=/posting
--security "whitelist=0.0.0.0/0;" --acl "secret-file=/dgraph-acl/hmac-secret; "
4 changes: 4 additions & 0 deletions systest/bulk_live/bulk/bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ import (
func TestBulkCases(t *testing.T) {
t.Run("bulk test cases", common.RunBulkCases)
}

func TestBulkCasesAcl(t *testing.T) {
t.Run("bulk test cases with acl", common.RunBulkCasesAcl)
}
25 changes: 25 additions & 0 deletions systest/bulk_live/common/bulk_live_cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"net/http"
"os"
"os/exec"
Expand Down Expand Up @@ -77,6 +78,29 @@ func RunBulkCases(t *testing.T) {
suite.cleanup(t)
}

func RunBulkCasesAcl(t *testing.T) {
opts := suiteOpts{
schema: helloWorldSchema,
gqlSchema: "",
rdfs: helloWorldData,
bulkSuite: true,
bulkOpts: bulkOpts{alpha: "../bulk/alpha_acl.yml", forceNs: 0x10},
}
suite := newSuiteInternal(t, opts)

t.Run("Pan and Jackson", testCaseWithAcl(`
{q(func: anyofterms(name, "Peter")) {
name
}}
`, `
{"q": [
{ "name": "Peter Pan" },
{ "name": "Peter Jackson" }
]}
`, "groot", "password", 0x10))
suite.cleanup(t)
}

// run this in sequential order. cleanup is necessary for live loader to work
func RunLiveCases(t *testing.T) {
suite := helloWorldSetup(t, false)
Expand Down Expand Up @@ -130,6 +154,7 @@ func remoteHelloWorldSetup(t *testing.T, isBulkLoader bool) *suite {
rdfs: helloWorldData,
bulkSuite: isBulkLoader,
remote: true,
bulkOpts: bulkOpts{alpha: "../bulk/alpha.yml", forceNs: math.MaxUint64},
})
}

Expand Down
30 changes: 27 additions & 3 deletions systest/bulk_live/common/bulk_live_fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package common
import (
"context"
"io/ioutil"
"math"
"os"
"path/filepath"
"testing"
Expand All @@ -42,9 +43,15 @@ type suiteOpts struct {
gqlSchema string
rdfs string
bulkSuite bool
bulkOpts bulkOpts
remote bool
}

type bulkOpts struct {
alpha string
forceNs uint64
}

func newSuiteInternal(t *testing.T, opts suiteOpts) *suite {
if testing.Short() {
t.Skip("Skipping system test with long runtime.")
Expand All @@ -54,7 +61,6 @@ func newSuiteInternal(t *testing.T, opts suiteOpts) *suite {
t: t,
opts: opts,
}

require.NoError(s.t, makeDirEmpty(rootDir))
rdfFile := filepath.Join(rootDir, "rdfs.rdf")
require.NoError(s.t, ioutil.WriteFile(rdfFile, []byte(opts.rdfs), 0644))
Expand Down Expand Up @@ -95,6 +101,7 @@ func newBulkOnlySuite(t *testing.T, schema, rdfs, gqlSchema string) *suite {
gqlSchema: gqlSchema,
rdfs: rdfs,
bulkSuite: true,
bulkOpts: bulkOpts{alpha: "../bulk/alpha.yml", forceNs: math.MaxUint64}, // preserve ns
}
return newSuiteInternal(t, opts)
}
Expand Down Expand Up @@ -125,10 +132,11 @@ func (s *suite) setup(t *testing.T, schemaFile, rdfFile, gqlSchemaFile string) {
GQLSchemaFile: gqlSchemaFile,
Dir: rootDir,
Env: env,
Namespace: s.opts.bulkOpts.forceNs,
})

require.NoError(t, err)
err = testutil.StartAlphas("../bulk/alpha.yml")
err = testutil.StartAlphas(s.opts.bulkOpts.alpha)
require.NoError(t, err)
return
}
Expand Down Expand Up @@ -157,7 +165,7 @@ func (s *suite) cleanup(t *testing.T) {
// NOTE: Shouldn't raise any errors here or fail a test, since this is
// called when we detect an error (don't want to mask the original problem).
if s.opts.bulkSuite {
isRace := testutil.StopAlphasAndDetectRace("../bulk/alpha.yml")
isRace := testutil.StopAlphasAndDetectRace(s.opts.bulkOpts.alpha)
_ = os.RemoveAll(rootDir)
if isRace {
t.Fatalf("Failing because race condition is detected. " +
Expand Down Expand Up @@ -188,3 +196,19 @@ func testCase(query, wantResult string) func(*testing.T) {
testutil.CompareJSON(t, wantResult, string(resp.GetJson()))
}
}

func testCaseWithAcl(query, wantResult, user, password string, ns uint64) func(*testing.T) {
return func(t *testing.T) {
// Check results of the bulk loader.
dg, err := testutil.DgraphClient(testutil.ContainerAddr("alpha1", 9080))
require.NoError(t, err)
ctx2, cancel2 := context.WithTimeout(context.Background(), time.Minute)
defer cancel2()
require.NoError(t, dg.LoginIntoNamespace(ctx2, user, password, ns))

txn := dg.NewTxn()
resp, err := txn.Query(ctx2, query)
require.NoError(t, err)
testutil.CompareJSON(t, wantResult, string(resp.GetJson()))
}
}
9 changes: 6 additions & 3 deletions testutil/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os/exec"
"strconv"

"github.com/dgraph-io/dgraph/x"
"github.com/pkg/errors"
)

Expand All @@ -48,13 +49,13 @@ func LiveLoad(opts LiveOpts) error {
"--alpha", opts.Alpha,
"--zero", opts.Zero,
}
if opts.ForceNs != 0 {
args = append(args, "--force-namespace", strconv.FormatInt(opts.ForceNs, 10))
}
if opts.Ludicrous {
args = append(args, "--ludicrous")
}
if opts.Creds != nil {
if opts.Creds.Namespace == x.GalaxyNamespace || opts.ForceNs != 0 {
args = append(args, "--force-namespace", strconv.FormatInt(opts.ForceNs, 10))
}
args = append(args, "--creds")
args = append(args, fmt.Sprintf("user=%s;password=%s;namespace=%d",
opts.Creds.UserID, opts.Creds.Passwd, opts.Creds.Namespace))
Expand Down Expand Up @@ -88,6 +89,7 @@ type BulkOpts struct {
GQLSchemaFile string
Dir string
Env []string
Namespace uint64
}

func BulkLoad(opts BulkOpts) error {
Expand All @@ -100,6 +102,7 @@ func BulkLoad(opts BulkOpts) error {
"--map_shards="+strconv.Itoa(opts.Shards),
"--store_xids=true",
"--zero", opts.Zero,
"--force-namespace", strconv.FormatUint(opts.Namespace, 10),
)

if opts.Dir != "" {
Expand Down
5 changes: 0 additions & 5 deletions x/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,24 +84,20 @@ func GalaxyAttr(attr string) string {

// ParseNamespaceAttr returns the namespace and attr from the given value.
func ParseNamespaceAttr(attr string) (uint64, string) {
AssertTrue(len(attr) >= 8)
return binary.BigEndian.Uint64([]byte(attr[:8])), attr[8:]
}

func ParseNamespaceBytes(attr string) ([]byte, string) {
AssertTrue(len(attr) >= 8)
return []byte(attr[:8]), attr[8:]
}

// ParseAttr returns the attr from the given value.
func ParseAttr(attr string) string {
AssertTrue(len(attr) >= 8)
return attr[8:]
}

// ParseNamespace returns the namespace from the given value.
func ParseNamespace(attr string) uint64 {
AssertTrue(len(attr) >= 8)
return binary.BigEndian.Uint64([]byte(attr[:8]))
}

Expand All @@ -114,7 +110,6 @@ func ParseAttrList(attrs []string) []string {
}

func IsReverseAttr(attr string) bool {
AssertTrue(len(attr) >= 8)
return attr[8] == '~'
}

Expand Down