Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor on ACL #4640

Merged
merged 3 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 4 additions & 38 deletions pkg/acl/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
package acl

import (
"sort"
"sync"
"sync/atomic"
)

const None = 0
Expand All @@ -37,7 +35,6 @@ type Cache interface {
Size() int
GetMissIds() []uint32
Clear()
GetOrPut(r *Rule, currId *uint32) (id uint32, got bool)
}

func NewCache() Cache {
Expand Down Expand Up @@ -67,22 +64,6 @@ func (c *cache) GetAll() map[uint32]*Rule {
return cpy
}

// GetOrPut returns id for the Rule r if exists.
// Otherwise, it stores r with a new id (atomically increment curId) and returns the new id.
// The got result is true if the Rule was already exists, false if stored.
func (c *cache) GetOrPut(r *Rule, curId *uint32) (id uint32, got bool) {
c.lock.Lock()
defer c.lock.Unlock()

if id = c.getId(r); id != None {
return id, true
}

id = atomic.AddUint32(curId, 1)
c.put(id, r)
return id, false
}

func (c *cache) Clear() {
c.lock.Lock()
defer c.lock.Unlock()
Expand Down Expand Up @@ -129,10 +110,6 @@ func (c *cache) Put(id uint32, r *Rule) {
c.lock.Lock()
defer c.lock.Unlock()

c.put(id, r)
}

func (c *cache) put(id uint32, r *Rule) {
if _, ok := c.id2Rule[id]; ok {
return
}
Expand All @@ -148,29 +125,18 @@ func (c *cache) put(id uint32, r *Rule) {
return
}

sort.Sort(&c.id2Rule[id].NamedUsers)
jiefenghuang marked this conversation as resolved.
Show resolved Hide resolved
sort.Sort(&c.id2Rule[id].NamedGroups)

cksum := r.Checksum()
if _, ok := c.cksum2Id[cksum]; ok {
c.cksum2Id[cksum] = append(c.cksum2Id[cksum], id)
} else {
c.cksum2Id[r.Checksum()] = []uint32{id}
}
c.cksum2Id[cksum] = append(c.cksum2Id[cksum], id)
}

func (c *cache) GetId(r *Rule) uint32 {
c.lock.RLock()
defer c.lock.RUnlock()

return c.getId(r)
}

func (c *cache) getId(r *Rule) uint32 {
if r == nil {
return None
}

c.lock.RLock()
defer c.lock.RUnlock()

if ids, ok := c.cksum2Id[r.Checksum()]; ok {
for _, id := range ids {
if r.IsEqual(c.id2Rule[id]) {
Expand Down
13 changes: 13 additions & 0 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -2582,6 +2582,19 @@ func setXAttrACL(xattrs *[]byte, accessACL, defaultACL uint32) {
}
}

func (m *baseMeta) saveACL(rule *aclAPI.Rule, aclMaxId *uint32) uint32 {
if rule == nil {
return aclAPI.None
}
id := m.aclCache.GetId(rule)
if id == aclAPI.None {
(*aclMaxId)++
id = *aclMaxId
m.aclCache.Put(id, rule)
}
return id
}

func (m *baseMeta) SetFacl(ctx Context, ino Ino, aclType uint8, rule *aclAPI.Rule) syscall.Errno {
if aclType != aclAPI.TypeAccess && aclType != aclAPI.TypeDefault {
return syscall.EINVAL
Expand Down
3 changes: 3 additions & 0 deletions pkg/meta/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,9 @@ func dumpACLEntries(entries aclAPI.Entries) []DumpedACLEntry {
}

func loadACL(dumped *DumpedACL) *aclAPI.Rule {
if dumped == nil {
return nil
}
return &aclAPI.Rule{
Owner: dumped.Owner,
Group: dumped.Group,
Expand Down
101 changes: 35 additions & 66 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,14 +829,6 @@
return err
}
m.parseAttr(val, attr)

if attr != nil && attr.AccessACL != aclAPI.None {
rule, err := m.getACL(ctx, tx, attr.AccessACL)
if err != nil {
return err
}
attr.Mode = (rule.GetMode() & 0777) | (attr.Mode & 07000)
}
return nil
}, m.inodeKey(inode)))
}
Expand Down Expand Up @@ -1129,17 +1121,12 @@
}
now := time.Now()

// get acl
var rule *aclAPI.Rule
if cur.AccessACL != aclAPI.None {
oldRule, err := m.getACL(ctx, tx, cur.AccessACL)
if err != nil {
return err
}
rule = &aclAPI.Rule{}
*rule = *oldRule
rule, err := m.getACL(ctx, tx, cur.AccessACL)
if err != nil {
return err
}

rule = dupAcl(rule)

Check failure on line 1129 in pkg/meta/redis.go

View workflow job for this annotation

GitHub Actions / build (1.20)

undefined: dupAcl

Check failure on line 1129 in pkg/meta/redis.go

View workflow job for this annotation

GitHub Actions / build (1.21)

undefined: dupAcl

Check failure on line 1129 in pkg/meta/redis.go

View workflow job for this annotation

GitHub Actions / build (1.22)

undefined: dupAcl
dirtyAttr, st := m.mergeAttr(ctx, inode, set, &cur, attr, now, rule)
if st != 0 {
return st
Expand All @@ -1148,17 +1135,9 @@
return nil
}

// set acl
if rule != nil {
if err = m.tryLoadMissACLs(ctx, tx); err != nil {
logger.Warnf("SetAttr: load miss acls error: %s", err)
}

aclId, err := m.insertACL(ctx, tx, rule)
if err != nil {
return err
}
setAttrACLId(dirtyAttr, aclAPI.TypeAccess, aclId)
dirtyAttr.AccessACL, err = m.insertACL(ctx, tx, rule)
if err != nil {
return err
}

dirtyAttr.Ctime = now.Unix()
Expand Down Expand Up @@ -1278,10 +1257,6 @@
// simple acl as default
attr.Mode = (mode & 0xFE00) | rule.GetMode()
} else {
if err = m.tryLoadMissACLs(ctx, tx); err != nil {
logger.Warnf("Mknode: load miss acls error: %s", err)
}

cRule := rule.ChildAccessACL(mode)
id, err := m.insertACL(ctx, tx, cRule)
if err != nil {
Expand Down Expand Up @@ -3750,20 +3725,16 @@
e.Xattrs = xattrs
}

if attr.AccessACL != aclAPI.None {
accessACl, err := m.getACL(ctx, tx, attr.AccessACL)
if err != nil {
return err
}
e.AccessACL = dumpACL(accessACl)
accessACl, err := m.getACL(ctx, tx, attr.AccessACL)
if err != nil {
return err
}
if attr.DefaultACL != aclAPI.None {
defaultACL, err := m.getACL(ctx, tx, attr.DefaultACL)
if err != nil {
return err
}
e.DefaultACL = dumpACL(defaultACL)
e.AccessACL = dumpACL(accessACl)
defaultACL, err := m.getACL(ctx, tx, attr.DefaultACL)
if err != nil {
return err
}
e.DefaultACL = dumpACL(defaultACL)

switch typ {
case TypeFile:
Expand Down Expand Up @@ -4169,15 +4140,8 @@
p.HSet(ctx, m.xattrKey(inode), xattrs)
}

if e.AccessACL != nil {
r := loadACL(e.AccessACL)
attr.AccessACL, _ = m.aclCache.GetOrPut(r, aclMaxId)
}

if e.DefaultACL != nil {
r := loadACL(e.DefaultACL)
attr.DefaultACL, _ = m.aclCache.GetOrPut(r, aclMaxId)
}
attr.AccessACL = m.saveACL(loadACL(e.AccessACL), aclMaxId)
attr.DefaultACL = m.saveACL(loadACL(e.DefaultACL), aclMaxId)

p.Set(ctx, m.inodeKey(inode), m.marshal(attr), 0)
tryExec()
Expand Down Expand Up @@ -4566,11 +4530,6 @@
attr.Mode &= 07000
attr.Mode |= ((rule.Owner & 7) << 6) | ((rule.Group & 7) << 3) | (rule.Other & 7)
} else {
if err = m.tryLoadMissACLs(ctx, tx); err != nil {
logger.Warnf("SetFacl: load miss acls error: %s", err)
}

// set acl
rule.InheritPerms(attr.Mode)
aclId, err := m.insertACL(ctx, tx, rule)
if err != nil {
Expand Down Expand Up @@ -4614,19 +4573,22 @@
aclId = getAttrACLId(attr, aclType)
}

if aclId == aclAPI.None {
return ENOATTR
}
a, err := m.getACL(ctx, tx, aclId)
if err != nil {
return err
}
if a == nil {
return ENOATTR
}
*rule = *a
return nil
}, m.inodeKey(ino)))
}

func (m *redisMeta) getACL(ctx Context, tx *redis.Tx, id uint32) (*aclAPI.Rule, error) {
if id == aclAPI.None {
return nil, nil
}
if cRule := m.aclCache.Get(id); cRule != nil {
return cRule, nil
}
Expand All @@ -4644,7 +4606,7 @@
return nil, err
}
if val == nil {
return nil, ENOATTR
return nil, syscall.EIO
}

rule := &aclAPI.Rule{}
Expand All @@ -4654,6 +4616,15 @@
}

func (m *redisMeta) insertACL(ctx Context, tx *redis.Tx, rule *aclAPI.Rule) (uint32, error) {
if rule == nil || rule.IsEmpty() {
return aclAPI.None, nil
}

if err := m.tryLoadMissACLs(ctx, tx); err != nil {
logger.Warnf("SetFacl: load miss acls error: %s", err)
}

// set acl
var aclId uint32
if aclId = m.aclCache.GetId(rule); aclId == aclAPI.None {
// TODO failures may result in some id wastage.
Expand Down Expand Up @@ -4684,13 +4655,11 @@
return err
}
for i, data := range vals {
var rule *aclAPI.Rule
var rule aclAPI.Rule
if data != nil {
rule = &aclAPI.Rule{}
rule.Decode([]byte(data.(string)))
}
// may have empty slot
m.aclCache.Put(missIds[i], rule)
m.aclCache.Put(missIds[i], &rule)
}
}
return nil
Expand Down
Loading
Loading