Skip to content

Commit 69dac5f

Browse files
NamanJain8mangalaman93
authored andcommitted
fix(backup): make drop data namespace aware
cherry-picks: * 260a789: This commit makes DropData namespace-aware in a multi-tenant system. Now, guardians of a namespace can call drop data on their namespace. Earlier only the guardian of the galaxy was allowed to do cluster-wide drop operation. * 7981a4d: use galaxynamespace when restore from old backup
1 parent 18154c7 commit 69dac5f

File tree

10 files changed

+162
-69
lines changed

10 files changed

+162
-69
lines changed

edgraph/access.go

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
//go:build oss
12
// +build oss
23

34
/*

edgraph/access_ee.go

+29-24
Original file line numberDiff line numberDiff line change
@@ -437,34 +437,39 @@ func InitializeAcl(closer *z.Closer) {
437437
return
438438
}
439439

440-
upsertGuardianAndGroot := func(ns uint64) {
441-
for closer.Ctx().Err() == nil {
442-
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
443-
defer cancel()
444-
ctx = x.AttachNamespace(ctx, ns)
445-
if err := upsertGuardian(ctx); err != nil {
446-
glog.Infof("Unable to upsert the guardian group. Error: %v", err)
447-
time.Sleep(100 * time.Millisecond)
448-
continue
449-
}
450-
break
451-
}
440+
for ns := range schema.State().Namespaces() {
441+
upsertGuardianAndGroot(closer, ns)
442+
}
443+
}
452444

453-
for closer.Ctx().Err() == nil {
454-
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
455-
defer cancel()
456-
ctx = x.AttachNamespace(ctx, ns)
457-
if err := upsertGroot(ctx, "password"); err != nil {
458-
glog.Infof("Unable to upsert the groot account. Error: %v", err)
459-
time.Sleep(100 * time.Millisecond)
460-
continue
461-
}
462-
break
445+
// Note: The handling of closer should be done by caller.
446+
func upsertGuardianAndGroot(closer *z.Closer, ns uint64) {
447+
if len(worker.Config.HmacSecret) == 0 {
448+
// The acl feature is not turned on.
449+
return
450+
}
451+
for closer.Ctx().Err() == nil {
452+
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
453+
defer cancel()
454+
ctx = x.AttachNamespace(ctx, ns)
455+
if err := upsertGuardian(ctx); err != nil {
456+
glog.Infof("Unable to upsert the guardian group. Error: %v", err)
457+
time.Sleep(100 * time.Millisecond)
458+
continue
463459
}
460+
break
464461
}
465462

466-
for ns := range schema.State().Namespaces() {
467-
upsertGuardianAndGroot(ns)
463+
for closer.Ctx().Err() == nil {
464+
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
465+
defer cancel()
466+
ctx = x.AttachNamespace(ctx, ns)
467+
if err := upsertGroot(ctx, "password"); err != nil {
468+
glog.Infof("Unable to upsert the groot account. Error: %v", err)
469+
time.Sleep(100 * time.Millisecond)
470+
continue
471+
}
472+
break
468473
}
469474
}
470475

edgraph/server.go

+2-9
Original file line numberDiff line numberDiff line change
@@ -429,14 +429,6 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
429429
}
430430

431431
if op.DropOp == api.Operation_DATA {
432-
if x.Config.BlockClusterWideDrop {
433-
glog.V(2).Info("Blocked drop-data because it is not permitted.")
434-
return empty, errors.New("Drop data operation is not permitted.")
435-
}
436-
if err := AuthGuardianOfTheGalaxy(ctx); err != nil {
437-
return empty, errors.Wrapf(err, "Drop data can only be called by the guardian of the"+
438-
" galaxy")
439-
}
440432
if len(op.DropValue) > 0 {
441433
return empty, errors.Errorf("If DropOp is set to DATA, DropValue must be empty")
442434
}
@@ -448,13 +440,14 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
448440
}
449441

450442
m.DropOp = pb.Mutations_DATA
443+
m.DropValue = fmt.Sprintf("%#x", namespace)
451444
_, err = query.ApplyMutations(ctx, m)
452445
if err != nil {
453446
return empty, err
454447
}
455448

456449
// insert a helper record for backup & restore, indicating that drop_data was done
457-
err = InsertDropRecord(ctx, "DROP_DATA;")
450+
err = InsertDropRecord(ctx, fmt.Sprintf("DROP_DATA;%#x", namespace))
458451
if err != nil {
459452
return empty, err
460453
}

posting/index.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package posting
1919
import (
2020
"bytes"
2121
"context"
22+
"encoding/binary"
2223
"encoding/hex"
2324
"fmt"
2425
"io/ioutil"
@@ -1235,9 +1236,12 @@ func DeleteAll() error {
12351236
return pstore.DropAll()
12361237
}
12371238

1238-
// DeleteData deletes all data but leaves types and schema intact.
1239-
func DeleteData() error {
1240-
return pstore.DropPrefix([]byte{x.DefaultPrefix})
1239+
// DeleteData deletes all data for the namespace but leaves types and schema intact.
1240+
func DeleteData(ns uint64) error {
1241+
prefix := make([]byte, 9)
1242+
prefix[0] = x.DefaultPrefix
1243+
binary.BigEndian.PutUint64(prefix[1:], ns)
1244+
return pstore.DropPrefix(prefix)
12411245
}
12421246

12431247
// DeletePredicate deletes all entries and indices for a given predicate.

posting/oracle.go

+18
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package posting
1818

1919
import (
2020
"context"
21+
"encoding/hex"
2122
"math"
2223
"sync"
2324
"sync/atomic"
@@ -248,6 +249,23 @@ func (o *oracle) ResetTxns() {
248249
o.pendingTxns = make(map[uint64]*Txn)
249250
}
250251

252+
// ResetTxnForNs deletes all the pending transactions for a given namespace.
253+
func (o *oracle) ResetTxnsForNs(ns uint64) {
254+
txns := o.IterateTxns(func(key []byte) bool {
255+
pk, err := x.Parse(key)
256+
if err != nil {
257+
glog.Errorf("error %v while parsing key %v", err, hex.EncodeToString(key))
258+
return false
259+
}
260+
return x.ParseNamespace(pk.Attr) == ns
261+
})
262+
o.Lock()
263+
defer o.Unlock()
264+
for _, txn := range txns {
265+
delete(o.pendingTxns, txn)
266+
}
267+
}
268+
251269
func (o *oracle) GetTxn(startTs uint64) *Txn {
252270
o.RLock()
253271
defer o.RUnlock()

systest/backup/multi-tenancy/backup_test.go

+24-6
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,17 @@ func TestBackupMultiTenancy(t *testing.T) {
5050
dg := testutil.DgClientWithLogin(t, "groot", "password", x.GalaxyNamespace)
5151
testutil.DropAll(t, dg)
5252

53-
galaxyCreds := &testutil.LoginParams{UserID: "groot", Passwd: "password", Namespace: x.GalaxyNamespace}
53+
galaxyCreds := &testutil.LoginParams{
54+
UserID: "groot", Passwd: "password", Namespace: x.GalaxyNamespace}
5455
galaxyToken := testutil.Login(t, galaxyCreds)
5556

5657
// Create a new namespace
57-
ns, err := testutil.CreateNamespaceWithRetry(t, galaxyToken)
58+
ns1, err := testutil.CreateNamespaceWithRetry(t, galaxyToken)
5859
require.NoError(t, err)
59-
dg1 := testutil.DgClientWithLogin(t, "groot", "password", ns)
60+
ns2, err := testutil.CreateNamespaceWithRetry(t, galaxyToken)
61+
require.NoError(t, err)
62+
dg1 := testutil.DgClientWithLogin(t, "groot", "password", ns1)
63+
dg2 := testutil.DgClientWithLogin(t, "groot", "password", ns2)
6064

6165
addSchema := func(dg *dgo.Dgraph) {
6266
// Add schema and types.
@@ -69,6 +73,7 @@ func TestBackupMultiTenancy(t *testing.T) {
6973

7074
addSchema(dg)
7175
addSchema(dg1)
76+
addSchema(dg2)
7277

7378
addData := func(dg *dgo.Dgraph, name string) *api.Response {
7479
var buf bytes.Buffer
@@ -96,7 +101,8 @@ func TestBackupMultiTenancy(t *testing.T) {
96101

97102
original := make(map[uint64]*api.Response)
98103
original[x.GalaxyNamespace] = addData(dg, "galaxy")
99-
original[ns] = addData(dg1, "ns")
104+
original[ns1] = addData(dg1, "ns1")
105+
original[ns2] = addData(dg2, "ns2")
100106

101107
// Setup test directories.
102108
common.DirSetup(t)
@@ -111,11 +117,23 @@ func TestBackupMultiTenancy(t *testing.T) {
111117
expectedResponse := `{ "q": [{ "count": 5 }]}`
112118
testutil.VerifyQueryResponse(t, dg, query, expectedResponse)
113119
testutil.VerifyQueryResponse(t, dg1, query, expectedResponse)
120+
testutil.VerifyQueryResponse(t, dg2, query, expectedResponse)
121+
122+
// Call drop data from namespace ns2.
123+
require.NoError(t, dg2.Alter(ctx, &api.Operation{DropOp: api.Operation_DATA}))
124+
// Send backup request.
125+
_ = runBackup(t, galaxyToken, 6, 2)
126+
testutil.DropAll(t, dg)
127+
sendRestoreRequest(t, alphaBackupDir, galaxyToken.AccessJwt)
128+
testutil.WaitForRestore(t, dg)
129+
testutil.VerifyQueryResponse(t, dg, query, expectedResponse)
130+
testutil.VerifyQueryResponse(t, dg1, query, expectedResponse)
131+
testutil.VerifyQueryResponse(t, dg2, query, `{ "q": [{ "count": 0 }]}`)
114132

115133
// After deleting a namespace in incremental backup, we should not be able to get the data from
116134
// banned namespace.
117-
require.NoError(t, testutil.DeleteNamespace(t, galaxyToken, ns))
118-
_ = runBackup(t, galaxyToken, 6, 2)
135+
require.NoError(t, testutil.DeleteNamespace(t, galaxyToken, ns1))
136+
_ = runBackup(t, galaxyToken, 9, 3)
119137
testutil.DropAll(t, dg)
120138
sendRestoreRequest(t, alphaBackupDir, galaxyToken.AccessJwt)
121139
testutil.WaitForRestore(t, dg)

worker/backup_ee.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -669,8 +669,9 @@ func checkAndGetDropOp(key []byte, l *posting.List, readTs uint64) (*pb.DropOper
669669
}
670670
// A dgraph.drop.op record can have values in only one of the following formats:
671671
// * DROP_ALL;
672-
// * DROP_DATA;
672+
// * DROP_DATA;ns
673673
// * DROP_ATTR;attrName
674+
// * DROP_NS;ns
674675
// So, accordingly construct the *pb.DropOperation.
675676
dropOp := &pb.DropOperation{}
676677
dropInfo := strings.Split(string(val), ";")
@@ -682,6 +683,7 @@ func checkAndGetDropOp(key []byte, l *posting.List, readTs uint64) (*pb.DropOper
682683
dropOp.DropOp = pb.DropOperation_ALL
683684
case "DROP_DATA":
684685
dropOp.DropOp = pb.DropOperation_DATA
686+
dropOp.DropValue = dropInfo[1] // contains namespace.
685687
case "DROP_ATTR":
686688
dropOp.DropOp = pb.DropOperation_ATTR
687689
dropOp.DropValue = dropInfo[1]

worker/cdc_ee.go

+39-10
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"encoding/binary"
1919
"encoding/json"
2020
"math"
21+
"strconv"
2122
"strings"
2223
"sync"
2324
"sync/atomic"
@@ -113,6 +114,19 @@ func (cdc *CDC) resetPendingEvents() {
113114
cdc.pendingTxnEvents = make(map[uint64][]CDCEvent)
114115
}
115116

117+
func (cdc *CDC) resetPendingEventsForNs(ns uint64) {
118+
if cdc == nil {
119+
return
120+
}
121+
cdc.Lock()
122+
defer cdc.Unlock()
123+
for ts, events := range cdc.pendingTxnEvents {
124+
if len(events) > 0 && binary.BigEndian.Uint64(events[0].Meta.Namespace) == ns {
125+
delete(cdc.pendingTxnEvents, ts)
126+
}
127+
}
128+
}
129+
116130
func (cdc *CDC) hasPending(attr string) bool {
117131
if cdc == nil {
118132
return false
@@ -243,9 +257,15 @@ func (cdc *CDC) processCDCEvents() {
243257
switch {
244258
case proposal.Mutations.DropOp != pb.Mutations_NONE: // this means its a drop operation
245259
// if there is DROP ALL or DROP DATA operation, clear pending events also.
246-
if proposal.Mutations.DropOp == pb.Mutations_ALL ||
247-
proposal.Mutations.DropOp == pb.Mutations_DATA {
260+
if proposal.Mutations.DropOp == pb.Mutations_ALL {
248261
cdc.resetPendingEvents()
262+
} else if proposal.Mutations.DropOp == pb.Mutations_DATA {
263+
ns, err := strconv.ParseUint(proposal.Mutations.DropValue, 0, 64)
264+
if err != nil {
265+
glog.Warningf("CDC: parsing namespace failed with error %v. Ignoring.", err)
266+
return
267+
}
268+
cdc.resetPendingEventsForNs(ns)
249269
}
250270
if err := sendToSink(events, proposal.Mutations.StartTs); err != nil {
251271
rerr = errors.Wrapf(err, "unable to send messages to sink")
@@ -393,15 +413,24 @@ func toCDCEvent(index uint64, mutation *pb.Mutations) []CDCEvent {
393413
}
394414

395415
// If drop operation
396-
// todo (aman): right now drop all and data operations are still cluster wide.
397-
// Fix these once we have namespace specific operations.
398416
if mutation.DropOp != pb.Mutations_NONE {
399-
ns := make([]byte, 8)
400-
binary.BigEndian.PutUint64(ns, x.GalaxyNamespace)
417+
namespace := make([]byte, 8)
401418
var t string
402-
if mutation.DropOp == pb.Mutations_TYPE {
403-
// drop type are namespace specific.
404-
ns, t = x.ParseNamespaceBytes(mutation.DropValue)
419+
switch mutation.DropOp {
420+
case pb.Mutations_ALL:
421+
// Drop all is cluster wide.
422+
binary.BigEndian.PutUint64(namespace, x.GalaxyNamespace)
423+
case pb.Mutations_DATA:
424+
ns, err := strconv.ParseUint(mutation.DropValue, 0, 64)
425+
if err != nil {
426+
glog.Warningf("CDC: parsing namespace failed with error %v. Ignoring.", err)
427+
return nil
428+
}
429+
binary.BigEndian.PutUint64(namespace, ns)
430+
case pb.Mutations_TYPE:
431+
namespace, t = x.ParseNamespaceBytes(mutation.DropValue)
432+
default:
433+
glog.Error("CDC: got unhandled drop operation")
405434
}
406435

407436
return []CDCEvent{
@@ -413,7 +442,7 @@ func toCDCEvent(index uint64, mutation *pb.Mutations) []CDCEvent {
413442
},
414443
Meta: &EventMeta{
415444
RaftIndex: index,
416-
Namespace: ns,
445+
Namespace: namespace,
417446
},
418447
},
419448
}

worker/draft.go

+10-4
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"fmt"
2525
"math"
2626
"sort"
27+
"strconv"
2728
"sync"
2829
"sync/atomic"
2930
"time"
@@ -351,14 +352,19 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
351352
span := otrace.FromContext(ctx)
352353

353354
if proposal.Mutations.DropOp == pb.Mutations_DATA {
355+
ns, err := strconv.ParseUint(proposal.Mutations.DropValue, 0, 64)
356+
if err != nil {
357+
return err
358+
}
354359
// Ensures nothing get written to disk due to commit proposals.
355-
posting.Oracle().ResetTxns()
356-
if err := posting.DeleteData(); err != nil {
360+
posting.Oracle().ResetTxnsForNs(ns)
361+
if err := posting.DeleteData(ns); err != nil {
357362
return err
358363
}
359364

360-
// Clear entire cache.
361-
posting.ResetCache()
365+
// TODO: Revisit this when we work on posting cache. Clear entire cache.
366+
// We don't want to drop entire cache, just due to one namespace.
367+
// posting.ResetCache()
362368
return nil
363369
}
364370

0 commit comments

Comments
 (0)