diff --git a/pkg/acl/acl.go b/pkg/acl/acl.go index 459a78363bd0..91aa89aaacb9 100644 --- a/pkg/acl/acl.go +++ b/pkg/acl/acl.go @@ -82,6 +82,15 @@ func (r *Rule) String() string { r.Owner, r.Group, r.Mask, r.Other, r.NamedUsers, r.NamedGroups) } +func (r *Rule) Dup() *Rule { + if r != nil { + newRule := *r + // NamedUsers and NamedGroups are never modified + return &newRule + } + return nil +} + func (r *Rule) Encode() []byte { w := utils.NewBuffer(uint32(16 + (len(r.NamedUsers)+len(r.NamedGroups))*6)) w.Put16(r.Owner) diff --git a/pkg/acl/cache.go b/pkg/acl/cache.go index 1c842a2f5bfe..8cdb9e05a63a 100644 --- a/pkg/acl/cache.go +++ b/pkg/acl/cache.go @@ -17,9 +17,7 @@ package acl import ( - "sort" "sync" - "sync/atomic" ) const None = 0 @@ -37,7 +35,6 @@ type Cache interface { Size() int GetMissIds() []uint32 Clear() - GetOrPut(r *Rule, currId *uint32) (id uint32, got bool) } func NewCache() Cache { @@ -67,22 +64,6 @@ func (c *cache) GetAll() map[uint32]*Rule { return cpy } -// GetOrPut returns id for the Rule r if exists. -// Otherwise, it stores r with a new id (atomically increment curId) and returns the new id. -// The got result is true if the Rule was already exists, false if stored. -func (c *cache) GetOrPut(r *Rule, curId *uint32) (id uint32, got bool) { - c.lock.Lock() - defer c.lock.Unlock() - - if id = c.getId(r); id != None { - return id, true - } - - id = atomic.AddUint32(curId, 1) - c.put(id, r) - return id, false -} - func (c *cache) Clear() { c.lock.Lock() defer c.lock.Unlock() @@ -129,10 +110,6 @@ func (c *cache) Put(id uint32, r *Rule) { c.lock.Lock() defer c.lock.Unlock() - c.put(id, r) -} - -func (c *cache) put(id uint32, r *Rule) { if _, ok := c.id2Rule[id]; ok { return } @@ -148,29 +125,18 @@ func (c *cache) put(id uint32, r *Rule) { return } - sort.Sort(&c.id2Rule[id].NamedUsers) - sort.Sort(&c.id2Rule[id].NamedGroups) - cksum := r.Checksum() - if _, ok := c.cksum2Id[cksum]; ok { - c.cksum2Id[cksum] = append(c.cksum2Id[cksum], id) - } else { - c.cksum2Id[r.Checksum()] = []uint32{id} - } + c.cksum2Id[cksum] = append(c.cksum2Id[cksum], id) } func (c *cache) GetId(r *Rule) uint32 { - c.lock.RLock() - defer c.lock.RUnlock() - - return c.getId(r) -} - -func (c *cache) getId(r *Rule) uint32 { if r == nil { return None } + c.lock.RLock() + defer c.lock.RUnlock() + if ids, ok := c.cksum2Id[r.Checksum()]; ok { for _, id := range ids { if r.IsEqual(c.id2Rule[id]) { diff --git a/pkg/acl/cache_test.go b/pkg/acl/cache_test.go index 5dc267f731c3..2a99d3be9ff4 100644 --- a/pkg/acl/cache_test.go +++ b/pkg/acl/cache_test.go @@ -56,7 +56,6 @@ func TestCache(t *testing.T) { assert.True(t, rule.IsEqual(c.Get(1))) assert.True(t, rule.IsEqual(c.Get(2))) assert.Equal(t, uint32(1), c.GetId(rule)) - assert.Equal(t, uint32(1), c.Get(1).NamedUsers[0].Id) // sorted rule2 := &Rule{} *rule2 = *rule diff --git a/pkg/meta/base.go b/pkg/meta/base.go index e3ee6102a0cc..6cf5c4db88d0 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -2726,6 +2726,19 @@ func setXAttrACL(xattrs *[]byte, accessACL, defaultACL uint32) { } } +func (m *baseMeta) saveACL(rule *aclAPI.Rule, aclMaxId *uint32) uint32 { + if rule == nil { + return aclAPI.None + } + id := m.aclCache.GetId(rule) + if id == aclAPI.None { + (*aclMaxId)++ + id = *aclMaxId + m.aclCache.Put(id, rule) + } + return id +} + func (m *baseMeta) SetFacl(ctx Context, ino Ino, aclType uint8, rule *aclAPI.Rule) syscall.Errno { if aclType != aclAPI.TypeAccess && aclType != aclAPI.TypeDefault { return syscall.EINVAL diff --git a/pkg/meta/dump.go b/pkg/meta/dump.go index c8b2a103eb9c..1d8ad4e7e01f 100644 --- a/pkg/meta/dump.go +++ b/pkg/meta/dump.go @@ -590,6 +590,9 @@ func dumpACLEntries(entries aclAPI.Entries) []DumpedACLEntry { } func loadACL(dumped *DumpedACL) *aclAPI.Rule { + if dumped == nil { + return nil + } return &aclAPI.Rule{ Owner: dumped.Owner, Group: dumped.Group, diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index d51cfd72304c..f7cd2569246b 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -829,14 +829,6 @@ func (m *redisMeta) doGetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno return err } m.parseAttr(val, attr) - - if attr != nil && attr.AccessACL != aclAPI.None { - rule, err := m.getACL(ctx, tx, attr.AccessACL) - if err != nil { - return err - } - attr.Mode = (rule.GetMode() & 0777) | (attr.Mode & 07000) - } return nil }, m.inodeKey(inode))) } @@ -1129,17 +1121,12 @@ func (m *redisMeta) doSetAttr(ctx Context, inode Ino, set uint16, sugidclearmode } now := time.Now() - // get acl - var rule *aclAPI.Rule - if cur.AccessACL != aclAPI.None { - oldRule, err := m.getACL(ctx, tx, cur.AccessACL) - if err != nil { - return err - } - rule = &aclAPI.Rule{} - *rule = *oldRule + rule, err := m.getACL(ctx, tx, cur.AccessACL) + if err != nil { + return err } + rule = rule.Dup() dirtyAttr, st := m.mergeAttr(ctx, inode, set, &cur, attr, now, rule) if st != 0 { return st @@ -1148,17 +1135,9 @@ func (m *redisMeta) doSetAttr(ctx Context, inode Ino, set uint16, sugidclearmode return nil } - // set acl - if rule != nil { - if err = m.tryLoadMissACLs(ctx, tx); err != nil { - logger.Warnf("SetAttr: load miss acls error: %s", err) - } - - aclId, err := m.insertACL(ctx, tx, rule) - if err != nil { - return err - } - setAttrACLId(dirtyAttr, aclAPI.TypeAccess, aclId) + dirtyAttr.AccessACL, err = m.insertACL(ctx, tx, rule) + if err != nil { + return err } dirtyAttr.Ctime = now.Unix() @@ -1278,10 +1257,6 @@ func (m *redisMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, m // simple acl as default attr.Mode = (mode & 0xFE00) | rule.GetMode() } else { - if err = m.tryLoadMissACLs(ctx, tx); err != nil { - logger.Warnf("Mknode: load miss acls error: %s", err) - } - cRule := rule.ChildAccessACL(mode) id, err := m.insertACL(ctx, tx, cRule) if err != nil { @@ -3633,20 +3608,16 @@ func (m *redisMeta) dumpEntries(es ...*DumpedEntry) error { e.Xattrs = xattrs } - if attr.AccessACL != aclAPI.None { - accessACl, err := m.getACL(ctx, tx, attr.AccessACL) - if err != nil { - return err - } - e.AccessACL = dumpACL(accessACl) + accessACl, err := m.getACL(ctx, tx, attr.AccessACL) + if err != nil { + return err } - if attr.DefaultACL != aclAPI.None { - defaultACL, err := m.getACL(ctx, tx, attr.DefaultACL) - if err != nil { - return err - } - e.DefaultACL = dumpACL(defaultACL) + e.AccessACL = dumpACL(accessACl) + defaultACL, err := m.getACL(ctx, tx, attr.DefaultACL) + if err != nil { + return err } + e.DefaultACL = dumpACL(defaultACL) switch typ { case TypeFile: @@ -4052,15 +4023,8 @@ func (m *redisMeta) loadEntry(e *DumpedEntry, p redis.Pipeliner, tryExec func(), p.HSet(ctx, m.xattrKey(inode), xattrs) } - if e.AccessACL != nil { - r := loadACL(e.AccessACL) - attr.AccessACL, _ = m.aclCache.GetOrPut(r, aclMaxId) - } - - if e.DefaultACL != nil { - r := loadACL(e.DefaultACL) - attr.DefaultACL, _ = m.aclCache.GetOrPut(r, aclMaxId) - } + attr.AccessACL = m.saveACL(loadACL(e.AccessACL), aclMaxId) + attr.DefaultACL = m.saveACL(loadACL(e.DefaultACL), aclMaxId) p.Set(ctx, m.inodeKey(inode), m.marshal(attr), 0) tryExec() @@ -4449,11 +4413,6 @@ func (m *redisMeta) doSetFacl(ctx Context, ino Ino, aclType uint8, rule *aclAPI. attr.Mode &= 07000 attr.Mode |= ((rule.Owner & 7) << 6) | ((rule.Group & 7) << 3) | (rule.Other & 7) } else { - if err = m.tryLoadMissACLs(ctx, tx); err != nil { - logger.Warnf("SetFacl: load miss acls error: %s", err) - } - - // set acl rule.InheritPerms(attr.Mode) aclId, err := m.insertACL(ctx, tx, rule) if err != nil { @@ -4497,19 +4456,22 @@ func (m *redisMeta) doGetFacl(ctx Context, ino Ino, aclType uint8, aclId uint32, aclId = getAttrACLId(attr, aclType) } - if aclId == aclAPI.None { - return ENOATTR - } a, err := m.getACL(ctx, tx, aclId) if err != nil { return err } + if a == nil { + return ENOATTR + } *rule = *a return nil }, m.inodeKey(ino))) } func (m *redisMeta) getACL(ctx Context, tx *redis.Tx, id uint32) (*aclAPI.Rule, error) { + if id == aclAPI.None { + return nil, nil + } if cRule := m.aclCache.Get(id); cRule != nil { return cRule, nil } @@ -4527,7 +4489,7 @@ func (m *redisMeta) getACL(ctx Context, tx *redis.Tx, id uint32) (*aclAPI.Rule, return nil, err } if val == nil { - return nil, ENOATTR + return nil, syscall.EIO } rule := &aclAPI.Rule{} @@ -4537,6 +4499,15 @@ func (m *redisMeta) getACL(ctx Context, tx *redis.Tx, id uint32) (*aclAPI.Rule, } func (m *redisMeta) insertACL(ctx Context, tx *redis.Tx, rule *aclAPI.Rule) (uint32, error) { + if rule == nil || rule.IsEmpty() { + return aclAPI.None, nil + } + + if err := m.tryLoadMissACLs(ctx, tx); err != nil { + logger.Warnf("SetFacl: load miss acls error: %s", err) + } + + // set acl var aclId uint32 if aclId = m.aclCache.GetId(rule); aclId == aclAPI.None { // TODO failures may result in some id wastage. @@ -4567,13 +4538,11 @@ func (m *redisMeta) tryLoadMissACLs(ctx Context, tx *redis.Tx) error { return err } for i, data := range vals { - var rule *aclAPI.Rule + var rule aclAPI.Rule if data != nil { - rule = &aclAPI.Rule{} rule.Decode([]byte(data.(string))) } - // may have empty slot - m.aclCache.Put(missIds[i], rule) + m.aclCache.Put(missIds[i], &rule) } } return nil diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 05408f834645..b3735ec52c72 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -982,14 +982,6 @@ func (m *dbMeta) doGetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno { return syscall.ENOENT } m.parseAttr(&n, attr) - - if attr != nil && attr.AccessACL != aclAPI.None { - rule, err := m.getACL(s, attr.AccessACL) - if err != nil { - return err - } - attr.Mode = (rule.GetMode() & 0777) | (attr.Mode & 07000) - } return nil })) } @@ -1011,17 +1003,12 @@ func (m *dbMeta) doSetAttr(ctx Context, inode Ino, set uint16, sugidclearmode ui } now := time.Now() - // get acl - var rule *aclAPI.Rule - if curAttr.AccessACL != aclAPI.None { - oldRule, err := m.getACL(s, curAttr.AccessACL) - if err != nil { - return err - } - rule = &aclAPI.Rule{} - *rule = *oldRule + rule, err := m.getACL(s, curAttr.AccessACL) + if err != nil { + return err } + rule = rule.Dup() dirtyAttr, st := m.mergeAttr(ctx, inode, set, &curAttr, attr, now, rule) if st != 0 { return st @@ -1030,17 +1017,9 @@ func (m *dbMeta) doSetAttr(ctx Context, inode Ino, set uint16, sugidclearmode ui return nil } - // set acl - if rule != nil { - if err = m.tryLoadMissACLs(s); err != nil { - logger.Warnf("SetAttr: load miss acls error: %s", err) - } - - aclId, err := m.insertACL(s, rule) - if err != nil { - return err - } - setAttrACLId(dirtyAttr, aclAPI.TypeAccess, aclId) + dirtyAttr.AccessACL, err = m.insertACL(s, rule) + if err != nil { + return err } var dirtyNode node @@ -1358,10 +1337,6 @@ func (m *dbMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, mode // simple acl as default n.Mode = (mode & 0xFE00) | rule.GetMode() } else { - if err = m.tryLoadMissACLs(s); err != nil { - logger.Warnf("Mknode: load miss acls error: %s", err) - } - cRule := rule.ChildAccessACL(mode) id, err := m.insertACL(s, cRule) if err != nil { @@ -3312,20 +3287,16 @@ func (m *dbMeta) dumpEntry(s *xorm.Session, inode Ino, typ uint8, e *DumpedEntry e.Xattrs = xattrs } - if attr.AccessACL != aclAPI.None { - accessACl, err := m.getACL(s, attr.AccessACL) - if err != nil { - return err - } - e.AccessACL = dumpACL(accessACl) + accessACl, err := m.getACL(s, attr.AccessACL) + if err != nil { + return err } - if attr.DefaultACL != aclAPI.None { - defaultACL, err := m.getACL(s, attr.DefaultACL) - if err != nil { - return err - } - e.DefaultACL = dumpACL(defaultACL) + e.AccessACL = dumpACL(accessACl) + defaultACL, err := m.getACL(s, attr.DefaultACL) + if err != nil { + return err } + e.DefaultACL = dumpACL(defaultACL) if attr.Typ == TypeFile { for indx := uint32(0); uint64(indx)*ChunkSize < attr.Length; indx++ { @@ -3880,15 +3851,8 @@ func (m *dbMeta) loadEntry(e *DumpedEntry, chs []chan interface{}, aclMaxId *uin chs[4] <- &xattr{Inode: inode, Name: x.Name, Value: unescape(x.Value)} } - if e.AccessACL != nil { - r := loadACL(e.AccessACL) - n.AccessACLId, _ = m.aclCache.GetOrPut(r, aclMaxId) - } - - if e.DefaultACL != nil { - r := loadACL(e.DefaultACL) - n.DefaultACLId, _ = m.aclCache.GetOrPut(r, aclMaxId) - } + n.AccessACLId = m.saveACL(loadACL(e.AccessACL), aclMaxId) + n.DefaultACLId = m.saveACL(loadACL(e.DefaultACL), aclMaxId) chs[0] <- n } @@ -4293,6 +4257,12 @@ func (m *dbMeta) doTouchAtime(ctx Context, inode Ino, attr *Attr, now time.Time) } func (m *dbMeta) insertACL(s *xorm.Session, rule *aclAPI.Rule) (uint32, error) { + if rule == nil { + return aclAPI.None, nil + } + if err := m.tryLoadMissACLs(s); err != nil { + logger.Warnf("Mknode: load miss acls error: %s", err) + } var aclId uint32 if aclId = m.aclCache.GetId(rule); aclId == aclAPI.None { // TODO conflicts from multiple clients are rare and result in only minor duplicates, thus not addressed for now. @@ -4314,14 +4284,26 @@ func (m *dbMeta) tryLoadMissACLs(s *xorm.Session) error { return err } + got := make(map[uint32]struct{}, len(acls)) for _, data := range acls { + got[data.Id] = struct{}{} m.aclCache.Put(data.Id, data.toRule()) } + if len(acls) < len(missIds) { + for _, id := range missIds { + if _, ok := got[id]; !ok { + m.aclCache.Put(id, aclAPI.EmptyRule()) + } + } + } } return nil } func (m *dbMeta) getACL(s *xorm.Session, id uint32) (*aclAPI.Rule, error) { + if id == aclAPI.None { + return nil, nil + } if cRule := m.aclCache.Get(id); cRule != nil { return cRule, nil } @@ -4330,7 +4312,7 @@ func (m *dbMeta) getACL(s *xorm.Session, id uint32) (*aclAPI.Rule, error) { if ok, err := s.Get(aclVal); err != nil { return nil, err } else if !ok { - return nil, ENOATTR + return nil, syscall.EIO } r := aclVal.toRule() @@ -4376,10 +4358,6 @@ func (m *dbMeta) doSetFacl(ctx Context, ino Ino, aclType uint8, rule *aclAPI.Rul attr.Mode &= 07000 attr.Mode |= ((rule.Owner & 7) << 6) | ((rule.Group & 7) << 3) | (rule.Other & 7) } else { - if err := m.tryLoadMissACLs(s); err != nil { - logger.Warnf("SetFacl: load miss acls error: %s", err) - } - // set acl rule.InheritPerms(attr.Mode) aclId, err := m.insertACL(s, rule) @@ -4434,13 +4412,13 @@ func (m *dbMeta) doGetFacl(ctx Context, ino Ino, aclType uint8, aclId uint32, ru aclId = getAttrACLId(attr, aclType) } - if aclId == aclAPI.None { - return ENOATTR - } a, err := m.getACL(s, aclId) if err != nil { return err } + if a == nil { + return ENOATTR + } *rule = *a return nil })) diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 0f6afa804663..613d6569f9f4 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -893,14 +893,6 @@ func (m *kvMeta) doGetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno { return syscall.ENOENT } m.parseAttr(val, attr) - - if attr != nil && attr.AccessACL != aclAPI.None { - rule, err := m.getACL(tx, attr.AccessACL) - if err != nil { - return err - } - attr.Mode = (rule.GetMode() & 0777) | (attr.Mode & 07000) - } return nil }, 0)) } @@ -918,18 +910,12 @@ func (m *kvMeta) doSetAttr(ctx Context, inode Ino, set uint16, sugidclearmode ui } now := time.Now() - // get acl - var rule *aclAPI.Rule - if cur.AccessACL != aclAPI.None { - var err error - oldRule, err := m.getACL(tx, cur.AccessACL) - if err != nil { - return err - } - rule = &aclAPI.Rule{} - *rule = *oldRule + rule, err := m.getACL(tx, cur.AccessACL) + if err != nil { + return err } + rule = rule.Dup() dirtyAttr, st := m.mergeAttr(ctx, inode, set, &cur, attr, now, rule) if st != 0 { return st @@ -938,17 +924,9 @@ func (m *kvMeta) doSetAttr(ctx Context, inode Ino, set uint16, sugidclearmode ui return nil } - // set acl - if rule != nil { - if err := m.tryLoadMissACLs(tx); err != nil { - logger.Warnf("SetAttr: load miss acls error: %s", err) - } - - aclId, err := m.insertACL(tx, rule) - if err != nil { - return err - } - setAttrACLId(dirtyAttr, aclAPI.TypeAccess, aclId) + dirtyAttr.AccessACL, err = m.insertACL(tx, rule) + if err != nil { + return err } dirtyAttr.Ctime = now.Unix() @@ -1171,10 +1149,6 @@ func (m *kvMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, mode // simple acl as default attr.Mode = (mode & 0xFE00) | rule.GetMode() } else { - if err = m.tryLoadMissACLs(tx); err != nil { - logger.Warnf("Mknode: load miss acls error: %s", err) - } - cRule := rule.ChildAccessACL(mode) id, err := m.insertACL(tx, cRule) if err != nil { @@ -2793,20 +2767,16 @@ func (m *kvMeta) dumpEntry(inode Ino, e *DumpedEntry, showProgress func(totalInc e.Xattrs = xattrs } - if attr.AccessACL != aclAPI.None { - accessACl, err := m.getACL(tx, attr.AccessACL) - if err != nil { - return err - } - e.AccessACL = dumpACL(accessACl) + accessACl, err := m.getACL(tx, attr.AccessACL) + if err != nil { + return err } - if attr.DefaultACL != aclAPI.None { - defaultACL, err := m.getACL(tx, attr.DefaultACL) - if err != nil { - return err - } - e.DefaultACL = dumpACL(defaultACL) + e.AccessACL = dumpACL(accessACl) + defaultACL, err := m.getACL(tx, attr.DefaultACL) + if err != nil { + return err } + e.DefaultACL = dumpACL(defaultACL) if attr.Typ == TypeFile { e.Chunks = e.Chunks[:0] @@ -3311,16 +3281,9 @@ func (m *kvMeta) loadEntry(e *DumpedEntry, kv chan *pair, aclMaxId *uint32) { for _, x := range e.Xattrs { kv <- &pair{m.xattrKey(inode, x.Name), []byte(unescape(x.Value))} } - // put dumped acls into cache, then store back to sql - if e.AccessACL != nil { - r := loadACL(e.AccessACL) - attr.AccessACL, _ = m.aclCache.GetOrPut(r, aclMaxId) - } - if e.DefaultACL != nil { - r := loadACL(e.DefaultACL) - attr.DefaultACL, _ = m.aclCache.GetOrPut(r, aclMaxId) - } + attr.AccessACL = m.saveACL(loadACL(e.AccessACL), aclMaxId) + attr.DefaultACL = m.saveACL(loadACL(e.DefaultACL), aclMaxId) kv <- &pair{m.inodeKey(inode), m.marshal(attr)} } @@ -3664,10 +3627,6 @@ func (m *kvMeta) doSetFacl(ctx Context, ino Ino, aclType uint8, rule *aclAPI.Rul attr.Mode &= 07000 attr.Mode |= ((rule.Owner & 7) << 6) | ((rule.Group & 7) << 3) | (rule.Other & 7) } else { - if err := m.tryLoadMissACLs(tx); err != nil { - logger.Warnf("SetFacl: load miss acls error: %s", err) - } - // set acl rule.InheritPerms(attr.Mode) aclId, err := m.insertACL(tx, rule) @@ -3708,19 +3667,27 @@ func (m *kvMeta) doGetFacl(ctx Context, ino Ino, aclType uint8, aclId uint32, ru aclId = getAttrACLId(attr, aclType) } - if aclId == aclAPI.None { - return ENOATTR - } a, err := m.getACL(tx, aclId) if err != nil { return err } + if a == nil { + return ENOATTR + } *rule = *a return nil }, 0)) } func (m *kvMeta) insertACL(tx *kvTxn, rule *aclAPI.Rule) (uint32, error) { + if rule == nil || rule.IsEmpty() { + return aclAPI.None, nil + } + + if err := m.tryLoadMissACLs(tx); err != nil { + logger.Warnf("load miss acls error: %s", err) + } + var aclId uint32 if aclId = m.aclCache.GetId(rule); aclId == aclAPI.None { newId, err := m.incrCounter(aclCounter, 1) @@ -3745,22 +3712,27 @@ func (m *kvMeta) tryLoadMissACLs(tx *kvTxn) error { acls := tx.gets(missKeys...) for i, data := range acls { - r := &aclAPI.Rule{} - r.Decode(data) - m.aclCache.Put(missIds[i], r) + var rule aclAPI.Rule + if len(data) > 0 { + rule.Decode(data) + } + m.aclCache.Put(missIds[i], &rule) } } return nil } func (m *kvMeta) getACL(tx *kvTxn, id uint32) (*aclAPI.Rule, error) { + if id == aclAPI.None { + return nil, nil + } if cRule := m.aclCache.Get(id); cRule != nil { return cRule, nil } val := tx.get(m.aclKey(id)) if val == nil { - return nil, ENOATTR + return nil, syscall.EIO } rule := &aclAPI.Rule{}