Skip to content

Commit

Permalink
refactor on ACL (#4640)
Browse files Browse the repository at this point in the history
  • Loading branch information
davies authored Apr 8, 2024
1 parent 72de033 commit 732b7a5
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 231 deletions.
9 changes: 9 additions & 0 deletions pkg/acl/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ func (r *Rule) String() string {
r.Owner, r.Group, r.Mask, r.Other, r.NamedUsers, r.NamedGroups)
}

func (r *Rule) Dup() *Rule {
if r != nil {
newRule := *r
// NamedUsers and NamedGroups are never modified
return &newRule
}
return nil
}

func (r *Rule) Encode() []byte {
w := utils.NewBuffer(uint32(16 + (len(r.NamedUsers)+len(r.NamedGroups))*6))
w.Put16(r.Owner)
Expand Down
42 changes: 4 additions & 38 deletions pkg/acl/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
package acl

import (
"sort"
"sync"
"sync/atomic"
)

const None = 0
Expand All @@ -37,7 +35,6 @@ type Cache interface {
Size() int
GetMissIds() []uint32
Clear()
GetOrPut(r *Rule, currId *uint32) (id uint32, got bool)
}

func NewCache() Cache {
Expand Down Expand Up @@ -67,22 +64,6 @@ func (c *cache) GetAll() map[uint32]*Rule {
return cpy
}

// GetOrPut returns id for the Rule r if exists.
// Otherwise, it stores r with a new id (atomically increment curId) and returns the new id.
// The got result is true if the Rule was already exists, false if stored.
func (c *cache) GetOrPut(r *Rule, curId *uint32) (id uint32, got bool) {
c.lock.Lock()
defer c.lock.Unlock()

if id = c.getId(r); id != None {
return id, true
}

id = atomic.AddUint32(curId, 1)
c.put(id, r)
return id, false
}

func (c *cache) Clear() {
c.lock.Lock()
defer c.lock.Unlock()
Expand Down Expand Up @@ -129,10 +110,6 @@ func (c *cache) Put(id uint32, r *Rule) {
c.lock.Lock()
defer c.lock.Unlock()

c.put(id, r)
}

func (c *cache) put(id uint32, r *Rule) {
if _, ok := c.id2Rule[id]; ok {
return
}
Expand All @@ -148,29 +125,18 @@ func (c *cache) put(id uint32, r *Rule) {
return
}

sort.Sort(&c.id2Rule[id].NamedUsers)
sort.Sort(&c.id2Rule[id].NamedGroups)

cksum := r.Checksum()
if _, ok := c.cksum2Id[cksum]; ok {
c.cksum2Id[cksum] = append(c.cksum2Id[cksum], id)
} else {
c.cksum2Id[r.Checksum()] = []uint32{id}
}
c.cksum2Id[cksum] = append(c.cksum2Id[cksum], id)
}

func (c *cache) GetId(r *Rule) uint32 {
c.lock.RLock()
defer c.lock.RUnlock()

return c.getId(r)
}

func (c *cache) getId(r *Rule) uint32 {
if r == nil {
return None
}

c.lock.RLock()
defer c.lock.RUnlock()

if ids, ok := c.cksum2Id[r.Checksum()]; ok {
for _, id := range ids {
if r.IsEqual(c.id2Rule[id]) {
Expand Down
1 change: 0 additions & 1 deletion pkg/acl/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ func TestCache(t *testing.T) {
assert.True(t, rule.IsEqual(c.Get(1)))
assert.True(t, rule.IsEqual(c.Get(2)))
assert.Equal(t, uint32(1), c.GetId(rule))
assert.Equal(t, uint32(1), c.Get(1).NamedUsers[0].Id) // sorted

rule2 := &Rule{}
*rule2 = *rule
Expand Down
13 changes: 13 additions & 0 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -2726,6 +2726,19 @@ func setXAttrACL(xattrs *[]byte, accessACL, defaultACL uint32) {
}
}

func (m *baseMeta) saveACL(rule *aclAPI.Rule, aclMaxId *uint32) uint32 {
if rule == nil {
return aclAPI.None
}
id := m.aclCache.GetId(rule)
if id == aclAPI.None {
(*aclMaxId)++
id = *aclMaxId
m.aclCache.Put(id, rule)
}
return id
}

func (m *baseMeta) SetFacl(ctx Context, ino Ino, aclType uint8, rule *aclAPI.Rule) syscall.Errno {
if aclType != aclAPI.TypeAccess && aclType != aclAPI.TypeDefault {
return syscall.EINVAL
Expand Down
3 changes: 3 additions & 0 deletions pkg/meta/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,9 @@ func dumpACLEntries(entries aclAPI.Entries) []DumpedACLEntry {
}

func loadACL(dumped *DumpedACL) *aclAPI.Rule {
if dumped == nil {
return nil
}
return &aclAPI.Rule{
Owner: dumped.Owner,
Group: dumped.Group,
Expand Down
101 changes: 35 additions & 66 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,14 +829,6 @@ func (m *redisMeta) doGetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno
return err
}
m.parseAttr(val, attr)

if attr != nil && attr.AccessACL != aclAPI.None {
rule, err := m.getACL(ctx, tx, attr.AccessACL)
if err != nil {
return err
}
attr.Mode = (rule.GetMode() & 0777) | (attr.Mode & 07000)
}
return nil
}, m.inodeKey(inode)))
}
Expand Down Expand Up @@ -1129,17 +1121,12 @@ func (m *redisMeta) doSetAttr(ctx Context, inode Ino, set uint16, sugidclearmode
}
now := time.Now()

// get acl
var rule *aclAPI.Rule
if cur.AccessACL != aclAPI.None {
oldRule, err := m.getACL(ctx, tx, cur.AccessACL)
if err != nil {
return err
}
rule = &aclAPI.Rule{}
*rule = *oldRule
rule, err := m.getACL(ctx, tx, cur.AccessACL)
if err != nil {
return err
}

rule = rule.Dup()
dirtyAttr, st := m.mergeAttr(ctx, inode, set, &cur, attr, now, rule)
if st != 0 {
return st
Expand All @@ -1148,17 +1135,9 @@ func (m *redisMeta) doSetAttr(ctx Context, inode Ino, set uint16, sugidclearmode
return nil
}

// set acl
if rule != nil {
if err = m.tryLoadMissACLs(ctx, tx); err != nil {
logger.Warnf("SetAttr: load miss acls error: %s", err)
}

aclId, err := m.insertACL(ctx, tx, rule)
if err != nil {
return err
}
setAttrACLId(dirtyAttr, aclAPI.TypeAccess, aclId)
dirtyAttr.AccessACL, err = m.insertACL(ctx, tx, rule)
if err != nil {
return err
}

dirtyAttr.Ctime = now.Unix()
Expand Down Expand Up @@ -1278,10 +1257,6 @@ func (m *redisMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, m
// simple acl as default
attr.Mode = (mode & 0xFE00) | rule.GetMode()
} else {
if err = m.tryLoadMissACLs(ctx, tx); err != nil {
logger.Warnf("Mknode: load miss acls error: %s", err)
}

cRule := rule.ChildAccessACL(mode)
id, err := m.insertACL(ctx, tx, cRule)
if err != nil {
Expand Down Expand Up @@ -3633,20 +3608,16 @@ func (m *redisMeta) dumpEntries(es ...*DumpedEntry) error {
e.Xattrs = xattrs
}

if attr.AccessACL != aclAPI.None {
accessACl, err := m.getACL(ctx, tx, attr.AccessACL)
if err != nil {
return err
}
e.AccessACL = dumpACL(accessACl)
accessACl, err := m.getACL(ctx, tx, attr.AccessACL)
if err != nil {
return err
}
if attr.DefaultACL != aclAPI.None {
defaultACL, err := m.getACL(ctx, tx, attr.DefaultACL)
if err != nil {
return err
}
e.DefaultACL = dumpACL(defaultACL)
e.AccessACL = dumpACL(accessACl)
defaultACL, err := m.getACL(ctx, tx, attr.DefaultACL)
if err != nil {
return err
}
e.DefaultACL = dumpACL(defaultACL)

switch typ {
case TypeFile:
Expand Down Expand Up @@ -4052,15 +4023,8 @@ func (m *redisMeta) loadEntry(e *DumpedEntry, p redis.Pipeliner, tryExec func(),
p.HSet(ctx, m.xattrKey(inode), xattrs)
}

if e.AccessACL != nil {
r := loadACL(e.AccessACL)
attr.AccessACL, _ = m.aclCache.GetOrPut(r, aclMaxId)
}

if e.DefaultACL != nil {
r := loadACL(e.DefaultACL)
attr.DefaultACL, _ = m.aclCache.GetOrPut(r, aclMaxId)
}
attr.AccessACL = m.saveACL(loadACL(e.AccessACL), aclMaxId)
attr.DefaultACL = m.saveACL(loadACL(e.DefaultACL), aclMaxId)

p.Set(ctx, m.inodeKey(inode), m.marshal(attr), 0)
tryExec()
Expand Down Expand Up @@ -4449,11 +4413,6 @@ func (m *redisMeta) doSetFacl(ctx Context, ino Ino, aclType uint8, rule *aclAPI.
attr.Mode &= 07000
attr.Mode |= ((rule.Owner & 7) << 6) | ((rule.Group & 7) << 3) | (rule.Other & 7)
} else {
if err = m.tryLoadMissACLs(ctx, tx); err != nil {
logger.Warnf("SetFacl: load miss acls error: %s", err)
}

// set acl
rule.InheritPerms(attr.Mode)
aclId, err := m.insertACL(ctx, tx, rule)
if err != nil {
Expand Down Expand Up @@ -4497,19 +4456,22 @@ func (m *redisMeta) doGetFacl(ctx Context, ino Ino, aclType uint8, aclId uint32,
aclId = getAttrACLId(attr, aclType)
}

if aclId == aclAPI.None {
return ENOATTR
}
a, err := m.getACL(ctx, tx, aclId)
if err != nil {
return err
}
if a == nil {
return ENOATTR
}
*rule = *a
return nil
}, m.inodeKey(ino)))
}

func (m *redisMeta) getACL(ctx Context, tx *redis.Tx, id uint32) (*aclAPI.Rule, error) {
if id == aclAPI.None {
return nil, nil
}
if cRule := m.aclCache.Get(id); cRule != nil {
return cRule, nil
}
Expand All @@ -4527,7 +4489,7 @@ func (m *redisMeta) getACL(ctx Context, tx *redis.Tx, id uint32) (*aclAPI.Rule,
return nil, err
}
if val == nil {
return nil, ENOATTR
return nil, syscall.EIO
}

rule := &aclAPI.Rule{}
Expand All @@ -4537,6 +4499,15 @@ func (m *redisMeta) getACL(ctx Context, tx *redis.Tx, id uint32) (*aclAPI.Rule,
}

func (m *redisMeta) insertACL(ctx Context, tx *redis.Tx, rule *aclAPI.Rule) (uint32, error) {
if rule == nil || rule.IsEmpty() {
return aclAPI.None, nil
}

if err := m.tryLoadMissACLs(ctx, tx); err != nil {
logger.Warnf("SetFacl: load miss acls error: %s", err)
}

// set acl
var aclId uint32
if aclId = m.aclCache.GetId(rule); aclId == aclAPI.None {
// TODO failures may result in some id wastage.
Expand Down Expand Up @@ -4567,13 +4538,11 @@ func (m *redisMeta) tryLoadMissACLs(ctx Context, tx *redis.Tx) error {
return err
}
for i, data := range vals {
var rule *aclAPI.Rule
var rule aclAPI.Rule
if data != nil {
rule = &aclAPI.Rule{}
rule.Decode([]byte(data.(string)))
}
// may have empty slot
m.aclCache.Put(missIds[i], rule)
m.aclCache.Put(missIds[i], &rule)
}
}
return nil
Expand Down
Loading

0 comments on commit 732b7a5

Please sign in to comment.