Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry pick: release/v21.03: The Acl cache should be updated on restart and restore. #7964

Merged
merged 6 commits into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,8 +814,9 @@ func run() {

// initialization of the admin account can only be done after raft nodes are running
// and health check passes
edgraph.ResetAcl(updaters)
edgraph.RefreshAcls(updaters)
edgraph.InitializeAcl(updaters)
edgraph.RefreshACLs(updaters.Ctx())
edgraph.SubscribeForAclUpdates(updaters)
}()

// Graphql subscribes to alpha to get schema updates. We need to close that before we
Expand Down
15 changes: 12 additions & 3 deletions edgraph/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,26 @@ func (s *Server) Login(ctx context.Context,
}

// ResetAcl is an empty method since ACL is only supported in the enterprise version.
func ResetAcl(closer *z.Closer) {
func InitializeAcl(closer *z.Closer) {
// do nothing
}

// ResetAcls is an empty method since ACL is only supported in the enterprise version.
func RefreshAcls(closer *z.Closer) {
func upsertGuardianAndGroot(closer *z.Closer, ns uint64) {
// do nothing
}

// SubscribeForAclUpdates is an empty method since ACL is only supported in the enterprise version.
func SubscribeForAclUpdates(closer *z.Closer) {
// do nothing
<-closer.HasBeenClosed()
closer.Done()
}

// RefreshACLs is an empty method since ACL is only supported in the enterprise version.
func RefreshACLs(ctx context.Context) {
return
}

func authorizeAlter(ctx context.Context, op *api.Operation) error {
return nil
}
Expand Down
90 changes: 51 additions & 39 deletions edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,43 @@ func authorizeUser(ctx context.Context, userid string, password string) (
return user, nil
}

// RefreshAcls queries for the ACL triples and refreshes the ACLs accordingly.
func RefreshAcls(closer *z.Closer) {
func refreshAclCache(ctx context.Context, ns, refreshTs uint64) error {
req := &Request{
req: &api.Request{
Query: queryAcls,
ReadOnly: true,
StartTs: refreshTs,
},
doAuth: NoAuthorize,
}

ctx = x.AttachNamespace(ctx, ns)
queryResp, err := (&Server{}).doQuery(ctx, req)
if err != nil {
return errors.Errorf("unable to retrieve acls: %v", err)
}
groups, err := acl.UnmarshalGroups(queryResp.GetJson(), "allAcls")
if err != nil {
return err
}

worker.AclCachePtr.Update(ns, groups)
glog.V(2).Infof("Updated the ACL cache for namespace: %#x", ns)
return nil

}

func RefreshACLs(ctx context.Context) {
for ns := range schema.State().Namespaces() {
if err := refreshAclCache(ctx, ns, 0); err != nil {
glog.Errorf("Error while retrieving acls for namespace %#x: %v", ns, err)
}
}
worker.AclCachePtr.Set()
}

// SubscribeForAclUpdates subscribes for ACL predicates and updates the acl cache.
func SubscribeForAclUpdates(closer *z.Closer) {
defer func() {
glog.Infoln("RefreshAcls closed")
closer.Done()
Expand All @@ -323,38 +358,13 @@ func RefreshAcls(closer *z.Closer) {
return
}

// retrieve the full data set of ACLs from the corresponding alpha server, and update the
// aclCachePtr
var maxRefreshTs uint64
retrieveAcls := func(ns uint64, refreshTs uint64) error {
if refreshTs <= maxRefreshTs {
return nil
}
maxRefreshTs = refreshTs

glog.V(3).Infof("Refreshing ACLs")
req := &Request{
req: &api.Request{
Query: queryAcls,
ReadOnly: true,
StartTs: refreshTs,
},
doAuth: NoAuthorize,
}

ctx := x.AttachNamespace(closer.Ctx(), ns)
queryResp, err := (&Server{}).doQuery(ctx, req)
if err != nil {
return errors.Errorf("unable to retrieve acls: %v", err)
}
groups, err := acl.UnmarshalGroups(queryResp.GetJson(), "allAcls")
if err != nil {
return err
}

aclCachePtr.update(ns, groups)
glog.V(3).Infof("Updated the ACL cache")
return nil
return refreshAclCache(closer.Ctx(), ns, refreshTs)
}

closer.AddRunning(1)
Expand Down Expand Up @@ -402,10 +412,10 @@ var aclPrefixes = [][]byte{
x.PredicatePrefix(x.GalaxyAttr("dgraph.xid")),
}

// clears the aclCachePtr and upserts the Groot account.
func ResetAcl(closer *z.Closer) {
// upserts the Groot account.
func InitializeAcl(closer *z.Closer) {
defer func() {
glog.Infof("ResetAcl closed")
glog.Infof("InitializeAcl closed")
closer.Done()
}()

Expand Down Expand Up @@ -611,12 +621,16 @@ func authorizePreds(ctx context.Context, userData *userData, preds []string,
if err != nil {
return nil, errors.Wrapf(err, "While authorizing preds")
}
if !worker.AclCachePtr.Loaded() {
RefreshACLs(ctx)
}

userId := userData.userId
groupIds := userData.groupIds
blockedPreds := make(map[string]struct{})
for _, pred := range preds {
nsPred := x.NamespaceAttr(ns, pred)
if err := aclCachePtr.authorizePredicate(groupIds, nsPred, aclOp); err != nil {
if err := worker.AclCachePtr.AuthorizePredicate(groupIds, nsPred, aclOp); err != nil {
logAccess(&accessEntry{
userId: userId,
groups: groupIds,
Expand All @@ -628,22 +642,20 @@ func authorizePreds(ctx context.Context, userData *userData, preds []string,
blockedPreds[pred] = struct{}{}
}
}
aclCachePtr.RLock()
allowedPreds := make([]string, len(aclCachePtr.userPredPerms[userId]))
// User can have multiple permission for same predicate, add predicate
allowedPreds := make([]string, len(worker.AclCachePtr.GetUserPredPerms(userId)))
// only if the acl.Op is covered in the set of permissions for the user
for predicate, perm := range aclCachePtr.userPredPerms[userId] {
for predicate, perm := range worker.AclCachePtr.GetUserPredPerms(userId) {
if (perm & aclOp.Code) > 0 {
allowedPreds = append(allowedPreds, predicate)
}
}
aclCachePtr.RUnlock()
return &authPredResult{allowed: allowedPreds, blocked: blockedPreds}, nil
}

// authorizeAlter parses the Schema in the operation and authorizes the operation
// using the aclCachePtr. It will return error if any one of the predicates specified in alter
// are not authorized.
// using the worker.AclCachePtr. It will return error if any one of the predicates
// specified in alter are not authorized.
func authorizeAlter(ctx context.Context, op *api.Operation) error {
if len(worker.Config.HmacSecret) == 0 {
// the user has not turned on the acl feature
Expand Down Expand Up @@ -768,7 +780,7 @@ func isAclPredMutation(nquads []*api.NQuad) bool {
return false
}

// authorizeMutation authorizes the mutation using the aclCachePtr. It will return permission
// authorizeMutation authorizes the mutation using the worker.AclCachePtr. It will return permission
// denied error if any one of the predicates in mutation(set or delete) is unauthorized.
// At this stage, namespace is not attached in the predicates.
func authorizeMutation(ctx context.Context, gmu *gql.Mutation) error {
Expand Down
4 changes: 2 additions & 2 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
// reset their in-memory GraphQL schema
_, err = UpdateGQLSchema(ctx, "", "")
// recreate the admin account after a drop all operation
ResetAcl(nil)
InitializeAcl(nil)
return empty, err
}

Expand Down Expand Up @@ -447,7 +447,7 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
// just reinsert the GraphQL schema, no need to alter dgraph schema as this was drop_data
_, err = UpdateGQLSchema(ctx, graphQLSchema, "")
// recreate the admin account after a drop data operation
ResetAcl(nil)
InitializeAcl(nil)
return empty, err
}

Expand Down
2 changes: 1 addition & 1 deletion graphql/admin/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func resolveRestore(ctx context.Context, m schema.Mutation) (*resolve.Resolved,

go func() {
wg.Wait()
edgraph.ResetAcl(nil)
edgraph.InitializeAcl(nil)
}()

return resolve.DataResult(
Expand Down
1 change: 1 addition & 0 deletions systest/acl/restore/acl-secret
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
12345678901234567890123456789012
124 changes: 124 additions & 0 deletions systest/acl/restore/acl_restore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package main

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"testing"

"github.com/dgraph-io/dgo/v210"
"github.com/dgraph-io/dgo/v210/protos/api"
"github.com/dgraph-io/dgraph/graphql/e2e/common"
"github.com/dgraph-io/dgraph/testutil"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

// disableDraining disables draining mode before each test for increased reliability.
func disableDraining(t *testing.T) {
drainRequest := `mutation draining {
draining(enable: false) {
response {
code
message
}
}
}`

params := testutil.GraphQLParams{
Query: drainRequest,
}
b, err := json.Marshal(params)
require.NoError(t, err)

token := testutil.Login(t, &testutil.LoginParams{UserID: "groot", Passwd: "password", Namespace: 0})

client := &http.Client{}
req, err := http.NewRequest("POST", testutil.AdminUrl(), bytes.NewBuffer(b))
require.Nil(t, err)
req.Header.Add("content-type", "application/json")
req.Header.Add("X-Dgraph-AccessToken", token.AccessJwt)

resp, err := client.Do(req)
require.NoError(t, err)
buf, err := ioutil.ReadAll(resp.Body)
fmt.Println(string(buf))
require.NoError(t, err)
require.Contains(t, string(buf), "draining mode has been set to false")
}

func sendRestoreRequest(t *testing.T, location, backupId string, backupNum int) {
if location == "" {
location = "/data/backup2"
}
params := &testutil.GraphQLParams{
Query: `mutation restore($location: String!, $backupId: String, $backupNum: Int) {
restore(input: {location: $location, backupId: $backupId, backupNum: $backupNum}) {
code
message
}
}`,
Variables: map[string]interface{}{
"location": location,
"backupId": backupId,
"backupNum": backupNum,
},
}

token := testutil.Login(t, &testutil.LoginParams{UserID: "groot", Passwd: "password", Namespace: 0})
resp := testutil.MakeGQLRequestWithAccessJwt(t, params, token.AccessJwt)
resp.RequireNoGraphQLErrors(t)

var restoreResp struct {
Restore struct {
Code string
Message string
RestoreId int
}
}
require.NoError(t, json.Unmarshal(resp.Data, &restoreResp))
require.Equal(t, restoreResp.Restore.Code, "Success")
}

func TestAclCacheRestore(t *testing.T) {
disableDraining(t)
conn, err := grpc.Dial(testutil.SockAddr, grpc.WithInsecure())
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))
dg.Login(context.Background(), "groot", "password")

sendRestoreRequest(t, "/backups", "vibrant_euclid5", 1)
testutil.WaitForRestore(t, dg)

token := testutil.Login(t,
&testutil.LoginParams{UserID: "alice1", Passwd: "password", Namespace: 0})
params := &common.GraphQLParams{
Query: `query{
queryPerson{
name
age
}
}`,

Headers: make(http.Header),
}
params.Headers.Set("X-Dgraph-AccessToken", token.AccessJwt)

resp := params.ExecuteAsPost(t, common.GraphqlURL)
require.Nil(t, resp.Errors)

expected := `
{
"queryPerson": [
{
"name": "MinhajSh",
"age": 20
}
]
}
`
require.JSONEq(t, expected, string(resp.Data))
}
Binary file not shown.
33 changes: 33 additions & 0 deletions systest/acl/restore/data/backups/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"Manifests":[
{
"type":"full",
"since":0,
"read_ts":21,
"groups":{
"1":[
"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.password",
"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.p_query",
"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.acl.rule",
"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.drop.op",
"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.xid",
"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.xid",
"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.rule.predicate",
"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.schema",
"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.rule.permission",
"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000Person.name",
"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.user.group",
"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000Person.age",
"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.type"
]
},
"backup_id":"vibrant_euclid5",
"backup_num":1,
"version":2103,
"path":"dgraph.20210730.124449.146",
"encrypted":false,
"drop_operations":null,
"compression":"snappy"
}
]
}
Loading