Skip to content

Commit c4f4964

Browse files
authored
fix(bulk): upsert guardian/groot for all existing namespaces (#7759)
Issue: If the data was loaded into some namespace using bulk loader with --force-namespace, that data wasn't actually accessible. This is because the users/groups were not created for it, if they were not present originally in the RDF. Solution: This PR fixes that by upserting the default user and password for that namespace, when the alpha starts (to be precise, whenever ACL is reset).
1 parent 018517b commit c4f4964

File tree

12 files changed

+277
-114
lines changed

12 files changed

+277
-114
lines changed

dgraph/cmd/bulk/run.go

+2
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ func run() {
328328

329329
loader.prog.mapEdgeCount = bulkMeta.EdgeCount
330330
loader.schema.schemaMap = bulkMeta.SchemaMap
331+
loader.schema.types = bulkMeta.Types
331332
} else {
332333
loader.mapStage()
333334
mergeMapShardsIntoReduceShards(&opt)
@@ -336,6 +337,7 @@ func run() {
336337
bulkMeta := pb.BulkMeta{
337338
EdgeCount: loader.prog.mapEdgeCount,
338339
SchemaMap: loader.schema.schemaMap,
340+
Types: loader.schema.types,
339341
}
340342
bulkMetaData, err := bulkMeta.Marshal()
341343
if err != nil {

dgraph/cmd/bulk/schema.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ func newSchemaStore(initial *schema.ParsedSchema, opt *options, state *state) *s
5151
// whenever we see data for a new namespace.
5252
s.checkAndSetInitialSchema(x.GalaxyNamespace)
5353

54+
s.types = initial.Types
5455
// This is from the schema read from the schema file.
5556
for _, sch := range initial.Preds {
5657
p := sch.Predicate
@@ -63,8 +64,6 @@ func newSchemaStore(initial *schema.ParsedSchema, opt *options, state *state) *s
6364
s.schemaMap[p] = sch
6465
}
6566

66-
s.types = initial.Types
67-
6867
return s
6968
}
7069

@@ -102,6 +101,7 @@ func (s *schemaStore) checkAndSetInitialSchema(namespace uint64) {
102101
for _, update := range schema.CompleteInitialSchema(namespace) {
103102
s.schemaMap[update.Predicate] = update
104103
}
104+
s.types = append(s.types, schema.CompleteInitialTypes(namespace)...)
105105

106106
if s.opt.StoreXids {
107107
s.schemaMap[x.NamespaceAttr(namespace, "xid")] = &pb.SchemaUpdate{

edgraph/access_ee.go

+26-19
Original file line numberDiff line numberDiff line change
@@ -413,28 +413,35 @@ func ResetAcl(closer *z.Closer) {
413413
// The acl feature is not turned on.
414414
return
415415
}
416-
for closer.Ctx().Err() == nil {
417-
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
418-
defer cancel()
419-
ctx = x.AttachNamespace(ctx, x.GalaxyNamespace)
420-
if err := upsertGuardian(ctx); err != nil {
421-
glog.Infof("Unable to upsert the guardian group. Error: %v", err)
422-
time.Sleep(100 * time.Millisecond)
423-
continue
416+
417+
upsertGuardianAndGroot := func(ns uint64) {
418+
for closer.Ctx().Err() == nil {
419+
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
420+
defer cancel()
421+
ctx = x.AttachNamespace(ctx, ns)
422+
if err := upsertGuardian(ctx); err != nil {
423+
glog.Infof("Unable to upsert the guardian group. Error: %v", err)
424+
time.Sleep(100 * time.Millisecond)
425+
continue
426+
}
427+
break
424428
}
425-
break
426-
}
427429

428-
for closer.Ctx().Err() == nil {
429-
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
430-
defer cancel()
431-
ctx = x.AttachNamespace(ctx, x.GalaxyNamespace)
432-
if err := upsertGroot(ctx, "password"); err != nil {
433-
glog.Infof("Unable to upsert the groot account. Error: %v", err)
434-
time.Sleep(100 * time.Millisecond)
435-
continue
430+
for closer.Ctx().Err() == nil {
431+
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
432+
defer cancel()
433+
ctx = x.AttachNamespace(ctx, ns)
434+
if err := upsertGroot(ctx, "password"); err != nil {
435+
glog.Infof("Unable to upsert the groot account. Error: %v", err)
436+
time.Sleep(100 * time.Millisecond)
437+
continue
438+
}
439+
break
436440
}
437-
break
441+
}
442+
443+
for ns := range schema.State().Namespaces() {
444+
upsertGuardianAndGroot(ns)
438445
}
439446
}
440447

protos/pb.proto

+1
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,7 @@ message UpdateGraphQLSchemaResponse {
725725
message BulkMeta {
726726
int64 edge_count = 1;
727727
map<string, SchemaUpdate> schema_map = 2;
728+
repeated TypeUpdate types = 3;
728729
}
729730

730731
message DeleteNsRequest {

protos/pb/pb.pb.go

+144-82
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

schema/schema.go

+16
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,22 @@ func (s *state) DeleteType(typeName string) error {
138138
return nil
139139
}
140140

141+
// Namespaces returns the active namespaces based on the current types.
142+
func (s *state) Namespaces() map[uint64]struct{} {
143+
if s == nil {
144+
return nil
145+
}
146+
147+
s.RLock()
148+
defer s.RUnlock()
149+
150+
ns := make(map[uint64]struct{})
151+
for typ := range s.types {
152+
ns[x.ParseNamespace(typ)] = struct{}{}
153+
}
154+
return ns
155+
}
156+
141157
// DeletePredsForNs deletes the predicate information for the namespace from the schema.
142158
func (s *state) DeletePredsForNs(delNs uint64) {
143159
if s == nil {

systest/bulk_live/bulk/alpha_acl.yml

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
version: "3.5"
2+
services:
3+
alpha1:
4+
image: dgraph/dgraph:latest
5+
working_dir: /data/alpha1
6+
labels:
7+
cluster: test
8+
ports:
9+
- "8080"
10+
- "9080"
11+
volumes:
12+
- type: bind
13+
source: $GOPATH/bin
14+
target: /gobin
15+
read_only: true
16+
- type: bind
17+
source: ./data/out/0/p
18+
target: /posting
19+
read_only: false
20+
- type: bind
21+
source: ../../../ee/acl/hmac-secret
22+
target: /dgraph-acl/hmac-secret
23+
read_only: true
24+
command: /gobin/dgraph alpha --my=alpha1:7080 --zero=zero1:5080 --logtostderr -v=2 -p=/posting
25+
--security "whitelist=0.0.0.0/0;" --acl "secret-file=/dgraph-acl/hmac-secret; "

systest/bulk_live/bulk/bulk_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,7 @@ import (
2525
func TestBulkCases(t *testing.T) {
2626
t.Run("bulk test cases", common.RunBulkCases)
2727
}
28+
29+
func TestBulkCasesAcl(t *testing.T) {
30+
t.Run("bulk test cases with acl", common.RunBulkCasesAcl)
31+
}

systest/bulk_live/common/bulk_live_cases.go

+25
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"encoding/json"
2323
"fmt"
2424
"io/ioutil"
25+
"math"
2526
"net/http"
2627
"os"
2728
"os/exec"
@@ -77,6 +78,29 @@ func RunBulkCases(t *testing.T) {
7778
suite.cleanup(t)
7879
}
7980

81+
func RunBulkCasesAcl(t *testing.T) {
82+
opts := suiteOpts{
83+
schema: helloWorldSchema,
84+
gqlSchema: "",
85+
rdfs: helloWorldData,
86+
bulkSuite: true,
87+
bulkOpts: bulkOpts{alpha: "../bulk/alpha_acl.yml", forceNs: 0x10},
88+
}
89+
suite := newSuiteInternal(t, opts)
90+
91+
t.Run("Pan and Jackson", testCaseWithAcl(`
92+
{q(func: anyofterms(name, "Peter")) {
93+
name
94+
}}
95+
`, `
96+
{"q": [
97+
{ "name": "Peter Pan" },
98+
{ "name": "Peter Jackson" }
99+
]}
100+
`, "groot", "password", 0x10))
101+
suite.cleanup(t)
102+
}
103+
80104
// run this in sequential order. cleanup is necessary for live loader to work
81105
func RunLiveCases(t *testing.T) {
82106
suite := helloWorldSetup(t, false)
@@ -130,6 +154,7 @@ func remoteHelloWorldSetup(t *testing.T, isBulkLoader bool) *suite {
130154
rdfs: helloWorldData,
131155
bulkSuite: isBulkLoader,
132156
remote: true,
157+
bulkOpts: bulkOpts{alpha: "../bulk/alpha.yml", forceNs: math.MaxUint64},
133158
})
134159
}
135160

systest/bulk_live/common/bulk_live_fixture.go

+27-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package common
1919
import (
2020
"context"
2121
"io/ioutil"
22+
"math"
2223
"os"
2324
"path/filepath"
2425
"testing"
@@ -42,9 +43,15 @@ type suiteOpts struct {
4243
gqlSchema string
4344
rdfs string
4445
bulkSuite bool
46+
bulkOpts bulkOpts
4547
remote bool
4648
}
4749

50+
type bulkOpts struct {
51+
alpha string
52+
forceNs uint64
53+
}
54+
4855
func newSuiteInternal(t *testing.T, opts suiteOpts) *suite {
4956
if testing.Short() {
5057
t.Skip("Skipping system test with long runtime.")
@@ -54,7 +61,6 @@ func newSuiteInternal(t *testing.T, opts suiteOpts) *suite {
5461
t: t,
5562
opts: opts,
5663
}
57-
5864
require.NoError(s.t, makeDirEmpty(rootDir))
5965
rdfFile := filepath.Join(rootDir, "rdfs.rdf")
6066
require.NoError(s.t, ioutil.WriteFile(rdfFile, []byte(opts.rdfs), 0644))
@@ -95,6 +101,7 @@ func newBulkOnlySuite(t *testing.T, schema, rdfs, gqlSchema string) *suite {
95101
gqlSchema: gqlSchema,
96102
rdfs: rdfs,
97103
bulkSuite: true,
104+
bulkOpts: bulkOpts{alpha: "../bulk/alpha.yml", forceNs: math.MaxUint64}, // preserve ns
98105
}
99106
return newSuiteInternal(t, opts)
100107
}
@@ -125,10 +132,11 @@ func (s *suite) setup(t *testing.T, schemaFile, rdfFile, gqlSchemaFile string) {
125132
GQLSchemaFile: gqlSchemaFile,
126133
Dir: rootDir,
127134
Env: env,
135+
Namespace: s.opts.bulkOpts.forceNs,
128136
})
129137

130138
require.NoError(t, err)
131-
err = testutil.StartAlphas("../bulk/alpha.yml")
139+
err = testutil.StartAlphas(s.opts.bulkOpts.alpha)
132140
require.NoError(t, err)
133141
return
134142
}
@@ -157,7 +165,7 @@ func (s *suite) cleanup(t *testing.T) {
157165
// NOTE: Shouldn't raise any errors here or fail a test, since this is
158166
// called when we detect an error (don't want to mask the original problem).
159167
if s.opts.bulkSuite {
160-
isRace := testutil.StopAlphasAndDetectRace("../bulk/alpha.yml")
168+
isRace := testutil.StopAlphasAndDetectRace(s.opts.bulkOpts.alpha)
161169
_ = os.RemoveAll(rootDir)
162170
if isRace {
163171
t.Fatalf("Failing because race condition is detected. " +
@@ -188,3 +196,19 @@ func testCase(query, wantResult string) func(*testing.T) {
188196
testutil.CompareJSON(t, wantResult, string(resp.GetJson()))
189197
}
190198
}
199+
200+
func testCaseWithAcl(query, wantResult, user, password string, ns uint64) func(*testing.T) {
201+
return func(t *testing.T) {
202+
// Check results of the bulk loader.
203+
dg, err := testutil.DgraphClient(testutil.ContainerAddr("alpha1", 9080))
204+
require.NoError(t, err)
205+
ctx2, cancel2 := context.WithTimeout(context.Background(), time.Minute)
206+
defer cancel2()
207+
require.NoError(t, dg.LoginIntoNamespace(ctx2, user, password, ns))
208+
209+
txn := dg.NewTxn()
210+
resp, err := txn.Query(ctx2, query)
211+
require.NoError(t, err)
212+
testutil.CompareJSON(t, wantResult, string(resp.GetJson()))
213+
}
214+
}

testutil/bulk.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,10 @@ func LiveLoad(opts LiveOpts) error {
4848
"--alpha", opts.Alpha,
4949
"--zero", opts.Zero,
5050
}
51-
if opts.Creds.Namespace == x.GalaxyNamespace || opts.ForceNs != 0 {
52-
args = append(args, "--force-namespace", strconv.FormatInt(opts.ForceNs, 10))
53-
}
5451
if opts.Creds != nil {
52+
if opts.Creds.Namespace == x.GalaxyNamespace || opts.ForceNs != 0 {
53+
args = append(args, "--force-namespace", strconv.FormatInt(opts.ForceNs, 10))
54+
}
5555
args = append(args, "--creds")
5656
args = append(args, fmt.Sprintf("user=%s;password=%s;namespace=%d",
5757
opts.Creds.UserID, opts.Creds.Passwd, opts.Creds.Namespace))
@@ -85,6 +85,7 @@ type BulkOpts struct {
8585
GQLSchemaFile string
8686
Dir string
8787
Env []string
88+
Namespace uint64
8889
}
8990

9091
func BulkLoad(opts BulkOpts) error {
@@ -97,6 +98,7 @@ func BulkLoad(opts BulkOpts) error {
9798
"--map_shards="+strconv.Itoa(opts.Shards),
9899
"--store_xids=true",
99100
"--zero", opts.Zero,
101+
"--force-namespace", strconv.FormatUint(opts.Namespace, 10),
100102
)
101103

102104
if opts.Dir != "" {

x/keys.go

-5
Original file line numberDiff line numberDiff line change
@@ -84,24 +84,20 @@ func GalaxyAttr(attr string) string {
8484

8585
// ParseNamespaceAttr returns the namespace and attr from the given value.
8686
func ParseNamespaceAttr(attr string) (uint64, string) {
87-
AssertTrue(len(attr) >= 8)
8887
return binary.BigEndian.Uint64([]byte(attr[:8])), attr[8:]
8988
}
9089

9190
func ParseNamespaceBytes(attr string) ([]byte, string) {
92-
AssertTrue(len(attr) >= 8)
9391
return []byte(attr[:8]), attr[8:]
9492
}
9593

9694
// ParseAttr returns the attr from the given value.
9795
func ParseAttr(attr string) string {
98-
AssertTrue(len(attr) >= 8)
9996
return attr[8:]
10097
}
10198

10299
// ParseNamespace returns the namespace from the given value.
103100
func ParseNamespace(attr string) uint64 {
104-
AssertTrue(len(attr) >= 8)
105101
return binary.BigEndian.Uint64([]byte(attr[:8]))
106102
}
107103

@@ -114,7 +110,6 @@ func ParseAttrList(attrs []string) []string {
114110
}
115111

116112
func IsReverseAttr(attr string) bool {
117-
AssertTrue(len(attr) >= 8)
118113
return attr[8] == '~'
119114
}
120115

0 commit comments

Comments
 (0)