diff --git a/edgraph/access.go b/edgraph/access.go index 88bb5b985b8..6a57d2527d3 100644 --- a/edgraph/access.go +++ b/edgraph/access.go @@ -46,6 +46,10 @@ func ResetAcl(closer *z.Closer) { // do nothing } +func upsertGuardianAndGroot(closer *z.Closer, ns uint64) { + // do nothing +} + // ResetAcls is an empty method since ACL is only supported in the enterprise version. func RefreshAcls(closer *z.Closer) { // do nothing diff --git a/edgraph/access_ee.go b/edgraph/access_ee.go index 58654a64c29..5c735ed7827 100644 --- a/edgraph/access_ee.go +++ b/edgraph/access_ee.go @@ -418,34 +418,39 @@ func ResetAcl(closer *z.Closer) { return } - upsertGuardianAndGroot := func(ns uint64) { - for closer.Ctx().Err() == nil { - ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute) - defer cancel() - ctx = x.AttachNamespace(ctx, ns) - if err := upsertGuardian(ctx); err != nil { - glog.Infof("Unable to upsert the guardian group. Error: %v", err) - time.Sleep(100 * time.Millisecond) - continue - } - break - } + for ns := range schema.State().Namespaces() { + upsertGuardianAndGroot(closer, ns) + } +} - for closer.Ctx().Err() == nil { - ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute) - defer cancel() - ctx = x.AttachNamespace(ctx, ns) - if err := upsertGroot(ctx, "password"); err != nil { - glog.Infof("Unable to upsert the groot account. Error: %v", err) - time.Sleep(100 * time.Millisecond) - continue - } - break +// Note: The handling of closer should be done by caller. +func upsertGuardianAndGroot(closer *z.Closer, ns uint64) { + if len(worker.Config.HmacSecret) == 0 { + // The acl feature is not turned on. + return + } + for closer.Ctx().Err() == nil { + ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute) + defer cancel() + ctx = x.AttachNamespace(ctx, ns) + if err := upsertGuardian(ctx); err != nil { + glog.Infof("Unable to upsert the guardian group. Error: %v", err) + time.Sleep(100 * time.Millisecond) + continue } + break } - for ns := range schema.State().Namespaces() { - upsertGuardianAndGroot(ns) + for closer.Ctx().Err() == nil { + ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute) + defer cancel() + ctx = x.AttachNamespace(ctx, ns) + if err := upsertGroot(ctx, "password"); err != nil { + glog.Infof("Unable to upsert the groot account. Error: %v", err) + time.Sleep(100 * time.Millisecond) + continue + } + break } } diff --git a/edgraph/server.go b/edgraph/server.go index 97674cb47a5..e31e7efc60b 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -426,14 +426,6 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er } if op.DropOp == api.Operation_DATA { - if x.Config.BlockClusterWideDrop { - glog.V(2).Info("Blocked drop-data because it is not permitted.") - return empty, errors.New("Drop data operation is not permitted.") - } - if err := AuthGuardianOfTheGalaxy(ctx); err != nil { - return empty, errors.Wrapf(err, "Drop data can only be called by the guardian of the"+ - " galaxy") - } if len(op.DropValue) > 0 { return empty, errors.Errorf("If DropOp is set to DATA, DropValue must be empty") } @@ -445,13 +437,14 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er } m.DropOp = pb.Mutations_DATA + m.DropValue = fmt.Sprintf("%#x", namespace) _, err = query.ApplyMutations(ctx, m) if err != nil { return empty, err } // insert a helper record for backup & restore, indicating that drop_data was done - err = InsertDropRecord(ctx, "DROP_DATA;") + err = InsertDropRecord(ctx, fmt.Sprintf("DROP_DATA;%#x", namespace)) if err != nil { return empty, err } @@ -459,7 +452,7 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er // just reinsert the GraphQL schema, no need to alter dgraph schema as this was drop_data _, err = UpdateGQLSchema(ctx, graphQLSchema, "") // recreate the admin account after a drop data operation - ResetAcl(nil) + upsertGuardianAndGroot(nil, namespace) return empty, err } diff --git a/posting/index.go b/posting/index.go index d714524b568..f3a9ae17369 100644 --- a/posting/index.go +++ b/posting/index.go @@ -19,6 +19,7 @@ package posting import ( "bytes" "context" + "encoding/binary" "encoding/hex" "fmt" "io/ioutil" @@ -1222,9 +1223,12 @@ func DeleteAll() error { return pstore.DropAll() } -// DeleteData deletes all data but leaves types and schema intact. -func DeleteData() error { - return pstore.DropPrefix([]byte{x.DefaultPrefix}) +// DeleteData deletes all data for the namespace but leaves types and schema intact. +func DeleteData(ns uint64) error { + prefix := make([]byte, 9) + prefix[0] = x.DefaultPrefix + binary.BigEndian.PutUint64(prefix[1:], ns) + return pstore.DropPrefix(prefix) } // DeletePredicate deletes all entries and indices for a given predicate. The delete may be logical diff --git a/posting/oracle.go b/posting/oracle.go index a55f0813af4..9623888209c 100644 --- a/posting/oracle.go +++ b/posting/oracle.go @@ -18,6 +18,7 @@ package posting import ( "context" + "encoding/hex" "math" "sync" "sync/atomic" @@ -316,6 +317,23 @@ func (o *oracle) ResetTxns() { o.pendingTxns = make(map[uint64]*Txn) } +// ResetTxnForNs deletes all the pending transactions for a given namespace. +func (o *oracle) ResetTxnsForNs(ns uint64) { + txns := o.IterateTxns(func(key []byte) bool { + pk, err := x.Parse(key) + if err != nil { + glog.Errorf("error %v while parsing key %v", err, hex.EncodeToString(key)) + return false + } + return x.ParseNamespace(pk.Attr) == ns + }) + o.Lock() + defer o.Unlock() + for _, txn := range txns { + delete(o.pendingTxns, txn) + } +} + func (o *oracle) GetTxn(startTs uint64) *Txn { o.RLock() defer o.RUnlock() diff --git a/systest/backup/multi-tenancy/backup_test.go b/systest/backup/multi-tenancy/backup_test.go index 5ba453ba572..e466ce9f950 100644 --- a/systest/backup/multi-tenancy/backup_test.go +++ b/systest/backup/multi-tenancy/backup_test.go @@ -50,13 +50,17 @@ func TestBackupMultiTenancy(t *testing.T) { dg := testutil.DgClientWithLogin(t, "groot", "password", x.GalaxyNamespace) testutil.DropAll(t, dg) - galaxyCreds := &testutil.LoginParams{UserID: "groot", Passwd: "password", Namespace: x.GalaxyNamespace} + galaxyCreds := &testutil.LoginParams{ + UserID: "groot", Passwd: "password", Namespace: x.GalaxyNamespace} galaxyToken := testutil.Login(t, galaxyCreds) // Create a new namespace - ns, err := testutil.CreateNamespaceWithRetry(t, galaxyToken) + ns1, err := testutil.CreateNamespaceWithRetry(t, galaxyToken) require.NoError(t, err) - dg1 := testutil.DgClientWithLogin(t, "groot", "password", ns) + ns2, err := testutil.CreateNamespaceWithRetry(t, galaxyToken) + require.NoError(t, err) + dg1 := testutil.DgClientWithLogin(t, "groot", "password", ns1) + dg2 := testutil.DgClientWithLogin(t, "groot", "password", ns2) addSchema := func(dg *dgo.Dgraph) { // Add schema and types. @@ -69,6 +73,7 @@ func TestBackupMultiTenancy(t *testing.T) { addSchema(dg) addSchema(dg1) + addSchema(dg2) addData := func(dg *dgo.Dgraph, name string) *api.Response { var buf bytes.Buffer @@ -96,7 +101,8 @@ func TestBackupMultiTenancy(t *testing.T) { original := make(map[uint64]*api.Response) original[x.GalaxyNamespace] = addData(dg, "galaxy") - original[ns] = addData(dg1, "ns") + original[ns1] = addData(dg1, "ns1") + original[ns2] = addData(dg2, "ns2") // Setup test directories. common.DirSetup(t) @@ -111,11 +117,23 @@ func TestBackupMultiTenancy(t *testing.T) { expectedResponse := `{ "q": [{ "count": 5 }]}` testutil.VerifyQueryResponse(t, dg, query, expectedResponse) testutil.VerifyQueryResponse(t, dg1, query, expectedResponse) + testutil.VerifyQueryResponse(t, dg2, query, expectedResponse) + + // Call drop data from namespace ns2. + require.NoError(t, dg2.Alter(ctx, &api.Operation{DropOp: api.Operation_DATA})) + // Send backup request. + _ = runBackup(t, galaxyToken, 6, 2) + testutil.DropAll(t, dg) + sendRestoreRequest(t, alphaBackupDir, galaxyToken.AccessJwt) + testutil.WaitForRestore(t, dg) + testutil.VerifyQueryResponse(t, dg, query, expectedResponse) + testutil.VerifyQueryResponse(t, dg1, query, expectedResponse) + testutil.VerifyQueryResponse(t, dg2, query, `{ "q": [{ "count": 0 }]}`) // After deleting a namespace in incremental backup, we should not be able to get the data from // banned namespace. - require.NoError(t, testutil.DeleteNamespace(t, galaxyToken, ns)) - _ = runBackup(t, galaxyToken, 6, 2) + require.NoError(t, testutil.DeleteNamespace(t, galaxyToken, ns1)) + _ = runBackup(t, galaxyToken, 9, 3) testutil.DropAll(t, dg) sendRestoreRequest(t, alphaBackupDir, galaxyToken.AccessJwt) testutil.WaitForRestore(t, dg) diff --git a/worker/backup_ee.go b/worker/backup_ee.go index bcdc8cf5a51..e5fda2a8d39 100644 --- a/worker/backup_ee.go +++ b/worker/backup_ee.go @@ -682,8 +682,9 @@ func checkAndGetDropOp(key []byte, l *posting.List, readTs uint64) (*pb.DropOper } // A dgraph.drop.op record can have values in only one of the following formats: // * DROP_ALL; - // * DROP_DATA; + // * DROP_DATA;ns // * DROP_ATTR;attrName + // * DROP_NS;ns // So, accordingly construct the *pb.DropOperation. dropOp := &pb.DropOperation{} dropInfo := strings.Split(string(val), ";") @@ -695,6 +696,7 @@ func checkAndGetDropOp(key []byte, l *posting.List, readTs uint64) (*pb.DropOper dropOp.DropOp = pb.DropOperation_ALL case "DROP_DATA": dropOp.DropOp = pb.DropOperation_DATA + dropOp.DropValue = dropInfo[1] // contains namespace. case "DROP_ATTR": dropOp.DropOp = pb.DropOperation_ATTR dropOp.DropValue = dropInfo[1] diff --git a/worker/cdc_ee.go b/worker/cdc_ee.go index 63d0041b572..652c21db98d 100644 --- a/worker/cdc_ee.go +++ b/worker/cdc_ee.go @@ -16,6 +16,7 @@ import ( "bytes" "encoding/json" "math" + "strconv" "strings" "sync" "sync/atomic" @@ -104,6 +105,19 @@ func (cdc *CDC) resetPendingEvents() { cdc.pendingTxnEvents = make(map[uint64][]CDCEvent) } +func (cdc *CDC) resetPendingEventsForNs(ns uint64) { + if cdc == nil { + return + } + cdc.Lock() + defer cdc.Unlock() + for ts, events := range cdc.pendingTxnEvents { + if len(events) > 0 && events[0].Meta.Namespace == ns { + delete(cdc.pendingTxnEvents, ts) + } + } +} + func (cdc *CDC) hasPending(attr string) bool { if cdc == nil { return false @@ -230,9 +244,15 @@ func (cdc *CDC) processCDCEvents() { switch { case proposal.Mutations.DropOp != pb.Mutations_NONE: // this means its a drop operation // if there is DROP ALL or DROP DATA operation, clear pending events also. - if proposal.Mutations.DropOp == pb.Mutations_ALL || - proposal.Mutations.DropOp == pb.Mutations_DATA { + if proposal.Mutations.DropOp == pb.Mutations_ALL { cdc.resetPendingEvents() + } else if proposal.Mutations.DropOp == pb.Mutations_DATA { + ns, err := strconv.ParseUint(proposal.Mutations.DropValue, 0, 64) + if err != nil { + glog.Warningf("CDC: parsing namespace failed with error %v. Ignoring.", err) + return + } + cdc.resetPendingEventsForNs(ns) } if err := sendToSink(events, proposal.Mutations.StartTs); err != nil { rerr = errors.Wrapf(err, "unable to send messages to sink") @@ -380,14 +400,26 @@ func toCDCEvent(index uint64, mutation *pb.Mutations) []CDCEvent { } // If drop operation - // todo (aman): right now drop all and data operations are still cluster wide. - // Fix these once we have namespace specific operations. + // todo (aman): right now drop all operation is still cluster wide. + // Fix this once we have namespace specific operation. if mutation.DropOp != pb.Mutations_NONE { - ns := x.GalaxyNamespace + var ns uint64 var t string - if mutation.DropOp == pb.Mutations_TYPE { - // drop type are namespace specific. + switch mutation.DropOp { + case pb.Mutations_ALL: + // Drop all is cluster wide. + ns = x.GalaxyNamespace + case pb.Mutations_DATA: + var err error + ns, err = strconv.ParseUint(mutation.DropValue, 0, 64) + if err != nil { + glog.Warningf("CDC: parsing namespace failed with error %v. Ignoring.", err) + return nil + } + case pb.Mutations_TYPE: ns, t = x.ParseNamespaceAttr(mutation.DropValue) + default: + glog.Error("CDC: got unhandled drop operation") } return []CDCEvent{ diff --git a/worker/draft.go b/worker/draft.go index e683fa69b01..de824f4af9c 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -24,6 +24,7 @@ import ( "fmt" "math" "sort" + "strconv" "strings" "sync" "sync/atomic" @@ -504,6 +505,10 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr span := otrace.FromContext(ctx) if proposal.Mutations.DropOp == pb.Mutations_DATA { + ns, err := strconv.ParseUint(proposal.Mutations.DropValue, 0, 64) + if err != nil { + return err + } // Ensures nothing get written to disk due to commit proposals. n.keysWritten.rejectBeforeIndex = proposal.Index @@ -511,13 +516,14 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr n.stopTask(opRollup) defer n.startTask(opRollup) - posting.Oracle().ResetTxns() - if err := posting.DeleteData(); err != nil { + posting.Oracle().ResetTxnsForNs(ns) + if err := posting.DeleteData(ns); err != nil { return err } - // Clear entire cache. - posting.ResetCache() + // TODO: Revisit this when we work on posting cache. Clear entire cache. + // We don't want to drop entire cache, just due to one namespace. + // posting.ResetCache() return nil } diff --git a/worker/restore_map.go b/worker/restore_map.go index 931580862c1..c626241f05e 100644 --- a/worker/restore_map.go +++ b/worker/restore_map.go @@ -103,11 +103,10 @@ func (br *backupReader) WithCompression(comp string) *backupReader { } type loadBackupInput struct { - restoreTs uint64 - preds predicateSet - dropOperations []*pb.DropOperation - isOld bool - keepSchema bool + preds predicateSet + dropNs map[uint64]struct{} + isOld bool + keepSchema bool } type listReq struct { @@ -296,7 +295,7 @@ func (m *mapper) processReqCh(ctx context.Context) error { "Unexpected meta: %v for key: %s", kv.UserMeta, hex.Dump(kv.Key)) } - restoreKey, _, err := fromBackupKey(kv.Key) + restoreKey, ns, err := fromBackupKey(kv.Key) if err != nil { return errors.Wrap(err, "fromBackupKey") } @@ -317,6 +316,9 @@ func (m *mapper) processReqCh(ctx context.Context) error { switch kv.GetUserMeta()[0] { case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting: + if _, ok := in.dropNs[ns]; ok { + return nil + } backupPl := &pb.BackupPostingList{} if err := backupPl.Unmarshal(kv.Value); err != nil { return errors.Wrapf(err, "while reading backup posting list") @@ -577,6 +579,7 @@ func RunMapper(req *pb.RestoreRequest, mapDir string) error { dropAll := false dropAttr := make(map[string]struct{}) + dropNs := make(map[uint64]struct{}) // manifests are ordered as: latest..full for i, manifest := range manifests { @@ -610,11 +613,14 @@ func RunMapper(req *pb.RestoreRequest, mapDir string) error { delete(predSet, p) } } + localDropNs := make(map[uint64]struct{}) + for ns := range dropNs { + localDropNs[ns] = struct{}{} + } in := &loadBackupInput{ - preds: predSet, - dropOperations: manifest.DropOperations, - isOld: manifest.Version == 0, - restoreTs: req.RestoreTs, + preds: predSet, + dropNs: localDropNs, + isOld: manifest.Version == 0, // Only map the schema keys corresponding to the latest backup. keepSchema: i == 0, } @@ -633,12 +639,15 @@ func RunMapper(req *pb.RestoreRequest, mapDir string) error { case pb.DropOperation_ALL: dropAll = true case pb.DropOperation_DATA: - dropAll = true + ns, err := strconv.ParseUint(op.DropValue, 0, 64) + if err != nil { + return errors.Wrap(err, "Map phase failed to parse namespace") + } + dropNs[ns] = struct{}{} case pb.DropOperation_ATTR: dropAttr[op.DropValue] = struct{}{} case pb.DropOperation_NS: // If there is a drop namespace, we just ban the namespace in the pstore. - // TODO: We probably need to propose ban request. ns, err := strconv.ParseUint(op.DropValue, 0, 64) if err != nil { return errors.Wrapf(err, "Map phase failed to parse namespace")