diff --git a/pkg/acl/cache.go b/pkg/acl/cache.go index d39fbc82b1f9..5a1640ee3b8b 100644 --- a/pkg/acl/cache.go +++ b/pkg/acl/cache.go @@ -33,7 +33,7 @@ type Cache interface { Get(id uint32) *Rule GetId(r *Rule) uint32 Size() int - GetMissIds(maxId uint32) []uint32 + GetMissIds() []uint32 } func NewCache() Cache { @@ -52,20 +52,16 @@ type cache struct { cksum2Id map[uint32][]uint32 } -// GetMissIds return all miss ids from 1 to max(maxId, c.maxId) -func (c *cache) GetMissIds(maxId uint32) []uint32 { +// GetMissIds return all miss ids from 1 to c.maxId +func (c *cache) GetMissIds() []uint32 { c.lock.RLock() defer c.lock.RUnlock() - if c.maxId == maxId && uint32(len(c.id2Rule)) == maxId { + if uint32(len(c.id2Rule)) == c.maxId { return nil } - if c.maxId > maxId { - maxId = c.maxId - } - - n := maxId + 1 + n := c.maxId + 1 mark := make([]bool, n) for i := uint32(1); i < n; i++ { if _, ok := c.id2Rule[i]; ok { diff --git a/pkg/acl/cache_test.go b/pkg/acl/cache_test.go index 760d5d529185..5dc267f731c3 100644 --- a/pkg/acl/cache_test.go +++ b/pkg/acl/cache_test.go @@ -66,8 +66,7 @@ func TestCache(t *testing.T) { assert.Equal(t, uint32(3), c.GetId(rule2)) c.Put(8, rule2) - assert.Equal(t, []uint32{4, 5, 6, 7, 9, 10}, c.GetMissIds(10)) - assert.Equal(t, []uint32{4, 5, 6, 7}, c.GetMissIds(6)) + assert.Equal(t, []uint32{4, 5, 6, 7}, c.GetMissIds()) assert.NotPanics(t, func() { c.Put(10, nil) diff --git a/pkg/meta/base.go b/pkg/meta/base.go index b92175dfa730..95ff1bd93d60 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -1163,7 +1163,7 @@ func (m *baseMeta) parseAttr(buf []byte, attr *Attr) { attr.Nlink = rb.Get32() attr.Length = rb.Get64() attr.Rdev = rb.Get32() - if rb.Left() >= 16 { + if rb.Left() >= 8 { attr.Parent = Ino(rb.Get64()) } attr.Full = true @@ -1175,7 +1175,11 @@ func (m *baseMeta) parseAttr(buf []byte, attr *Attr) { } func (m *baseMeta) marshal(attr *Attr) []byte { - w := utils.NewBuffer(36 + 24 + 4 + 8 + 8) + size := uint32(36 + 24 + 4 + 8) + if attr.AccessACLId+attr.DefaultACLId > 0 { + size += 8 + } + w := utils.NewBuffer(size) w.Put8(attr.Flags) w.Put16((uint16(attr.Typ) << 12) | (attr.Mode & 0xfff)) w.Put32(attr.Uid) @@ -1190,8 +1194,10 @@ func (m *baseMeta) marshal(attr *Attr) []byte { w.Put64(attr.Length) w.Put32(attr.Rdev) w.Put64(uint64(attr.Parent)) - w.Put32(attr.AccessACLId) - w.Put32(attr.DefaultACLId) + if attr.AccessACLId+attr.DefaultACLId > 0 { + w.Put32(attr.AccessACLId) + w.Put32(attr.DefaultACLId) + } logger.Tracef("attr: %+v -> %+v", attr, w.Bytes()) return w.Bytes() } @@ -2811,7 +2817,7 @@ func (m *baseMeta) CheckSetAttr(ctx Context, inode Ino, set uint16, attr Attr) s return st } -const ACLCounterName = "acl" +const aclCounter = "acl" var errACLNotInCache = errors.New("acl not in cache") diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index 8ca1e6467126..b23ca43f37c7 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -323,18 +323,18 @@ func (m *redisMeta) doInit(format *Format, force bool) error { } // cache all acls - maxId, err := m.getCounter(ACLCounterName) + maxId, err := m.getCounter(aclCounter) if err != nil { return err } if maxId > 0 { - missKeys := make([]string, maxId) + allKeys := make([]string, maxId) for i := 0; i < int(maxId); i++ { - missKeys[i] = m.aclKey(uint32(i) + 1) + allKeys[i] = m.aclKey(uint32(i) + 1) } - acls, err := m.rdb.MGet(ctx, missKeys...).Result() + acls, err := m.rdb.MGet(ctx, allKeys...).Result() if err != nil { return err } @@ -342,7 +342,7 @@ func (m *redisMeta) doInit(format *Format, force bool) error { var tmpRule *aclAPI.Rule if val != nil { tmpRule = &aclAPI.Rule{} - tmpRule.Decode(val.([]byte)) + tmpRule.Decode(([]byte)(val.(string))) } // may have empty slot m.aclCache.Put(uint32(i)+1, tmpRule) @@ -663,6 +663,10 @@ func (m *redisMeta) sliceRefs() string { return m.prefix + "sliceRef" } +func (m *redisMeta) counterName(name string) string { + return m.prefix + name +} + func (m *redisMeta) packQuota(space, inodes int64) []byte { wb := utils.NewBuffer(16) wb.Put64(uint64(space)) @@ -841,8 +845,8 @@ func (m *redisMeta) doGetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno m.parseAttr(val, attr) if attr != nil && attr.AccessACLId != aclAPI.None { - rule := &aclAPI.Rule{} - if err := m.getACL(ctx, tx, attr.AccessACLId, rule); err != nil { + rule, err := m.getACL(ctx, tx, attr.AccessACLId) + if err != nil { return err } attr.Mode = (rule.GetMode() & 0777) | (attr.Mode & 07000) @@ -1190,8 +1194,8 @@ func (m *redisMeta) doSetAttr(ctx Context, inode Ino, set uint16, sugidclearmode // get acl var rule *aclAPI.Rule if cur.AccessACLId != aclAPI.None { - rule = &aclAPI.Rule{} - if err := m.getACL(ctx, tx, cur.AccessACLId, rule); err != nil { + rule, err = m.getACL(ctx, tx, cur.AccessACLId) + if err != nil { return err } } @@ -1211,6 +1215,10 @@ func (m *redisMeta) doSetAttr(ctx Context, inode Ino, set uint16, sugidclearmode return err } setAttrACLId(dirtyAttr, aclAPI.TypeAccess, aclId) + + if err = m.tryLoadMissACLs(ctx, tx); err != nil { + logger.Warnf("SetAttr: load miss acls error: %s", err) + } } dirtyAttr.Ctime = now.Unix() @@ -1359,8 +1367,8 @@ func (m *redisMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, m } // set access acl by parent's default acl - rule := &aclAPI.Rule{} - if err = m.getACL(ctx, tx, pattr.DefaultACLId, rule); err != nil { + rule, err := m.getACL(ctx, tx, pattr.DefaultACLId) + if err != nil { return err } @@ -1373,6 +1381,10 @@ func (m *redisMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, m if err != nil { return err } + if err = m.tryLoadMissACLs(ctx, tx); err != nil { + logger.Warnf("Mknode: load miss acls error: %s", err) + } + attr.AccessACLId = id attr.Mode = (mode & 0xFE00) | cRule.GetMode() } @@ -4587,6 +4599,10 @@ func (m *redisMeta) SetFacl(ctx Context, ino Ino, aclType uint8, rule *aclAPI.Ru } setAttrACLId(attr, aclType, aclId) + if err = m.tryLoadMissACLs(ctx, tx); err != nil { + logger.Warnf("SetFacl: load miss acls error: %s", err) + } + // set mode if aclType == aclAPI.TypeAccess { attr.Mode &= 07000 @@ -4631,14 +4647,18 @@ func (m *redisMeta) GetFacl(ctx Context, ino Ino, aclType uint8, rule *aclAPI.Ru return ENOATTR } - return m.getACL(ctx, tx, aclId, rule) + a, err := m.getACL(ctx, tx, aclId) + if err != nil { + return err + } + *rule = *a + return nil }, m.inodeKey(ino))) } -func (m *redisMeta) getACL(ctx Context, tx *redis.Tx, id uint32, rule *aclAPI.Rule) error { +func (m *redisMeta) getACL(ctx Context, tx *redis.Tx, id uint32) (*aclAPI.Rule, error) { if cRule := m.aclCache.Get(id); cRule != nil { - *rule = *cRule - return nil + return cRule, nil } cmds, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { @@ -4646,27 +4666,28 @@ func (m *redisMeta) getACL(ctx Context, tx *redis.Tx, id uint32, rule *aclAPI.Ru return nil }) if err != nil { - return err + return nil, err } val, err := cmds[0].(*redis.StringCmd).Bytes() if err != nil { - return err + return nil, err } if val == nil { - return ENOATTR + return nil, ENOATTR } + rule := &aclAPI.Rule{} rule.Decode(val) m.aclCache.Put(id, rule) - return nil + return rule, nil } func (m *redisMeta) insertACL(ctx Context, tx *redis.Tx, rule *aclAPI.Rule) (uint32, error) { var aclId uint32 if aclId = m.aclCache.GetId(rule); aclId == aclAPI.None { // TODO failures may result in some id wastage. - newId, err := m.incrCounter(ACLCounterName, 1) + newId, err := m.incrCounter(aclCounter, 1) if err != nil { return aclAPI.None, err } @@ -4676,29 +4697,32 @@ func (m *redisMeta) insertACL(ctx Context, tx *redis.Tx, rule *aclAPI.Rule) (uin return aclAPI.None, err } m.aclCache.Put(aclId, rule) + } + return aclId, nil +} - // try load miss - missIds := m.aclCache.GetMissIds(aclId) - if len(missIds) > 0 { - missKeys := make([]string, len(missIds)) - for i, id := range missIds { - missKeys[i] = m.aclKey(id) - } +func (m *redisMeta) tryLoadMissACLs(ctx Context, tx *redis.Tx) error { + // try load miss + missIds := m.aclCache.GetMissIds() + if len(missIds) > 0 { + missKeys := make([]string, len(missIds)) + for i, id := range missIds { + missKeys[i] = m.aclKey(id) + } - acls, err := tx.MGet(ctx, missKeys...).Result() - if err != nil { - return aclId, nil - } - for i, data := range acls { - var tmpRule *aclAPI.Rule - if data != nil { - tmpRule = &aclAPI.Rule{} - tmpRule.Decode(data.([]byte)) - } - // may have empty slot - m.aclCache.Put(missIds[i], tmpRule) + acls, err := tx.MGet(ctx, missKeys...).Result() + if err != nil { + return err + } + for i, data := range acls { + var rule *aclAPI.Rule + if data != nil { + rule = &aclAPI.Rule{} + rule.Decode(data.([]byte)) } + // may have empty slot + m.aclCache.Put(missIds[i], rule) } } - return aclId, nil + return nil } diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 81500b40b359..4c4842ccab28 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -45,7 +45,7 @@ import ( "github.com/sirupsen/logrus" ) -const MaxFieldsCountOfTable = 16 // node table +const MaxFieldsCountOfTable = 18 // node table type setting struct { Name string `xorm:"pk"` @@ -118,16 +118,15 @@ func newSQLAcl(r *aclAPI.Rule) *acl { return a } -func (a *acl) ToRule(r *aclAPI.Rule) { - if r == nil { - *r = aclAPI.Rule{} - } +func (a *acl) toRule() *aclAPI.Rule { + r := &aclAPI.Rule{} r.Owner = a.Owner r.Group = a.Group r.Other = a.Other r.Mask = a.Mask r.NamedUsers.Decode(a.NamedUsers) r.NamedGroups.Decode(a.NamedGroups) + return r } type namedNode struct { @@ -472,9 +471,7 @@ func (m *dbMeta) doInit(format *Format, force bool) error { return err } for _, val := range acls { - tmpRule := &aclAPI.Rule{} - val.ToRule(tmpRule) - m.aclCache.Put(val.Id, tmpRule) + m.aclCache.Put(val.Id, val.toRule()) } return nil }) @@ -485,7 +482,7 @@ func (m *dbMeta) Reset() error { &node{}, &edge{}, &symlink{}, &xattr{}, &chunk{}, &sliceRef{}, &delslices{}, &session{}, &session2{}, &sustained{}, &delfile{}, - &flock{}, &plock{}, &dirStats{}, &dirQuota{}, &detachedNode{}) + &flock{}, &plock{}, &dirStats{}, &dirQuota{}, &detachedNode{}, &acl{}) } func (m *dbMeta) doLoad() (data []byte, err error) { @@ -975,8 +972,8 @@ func (m *dbMeta) doGetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno { m.parseAttr(&n, attr) if attr != nil && attr.AccessACLId != aclAPI.None { - rule := &aclAPI.Rule{} - if err = m.getACL(s, attr.AccessACLId, rule); err != nil { + rule, err := m.getACL(s, attr.AccessACLId) + if err != nil { return err } attr.Mode = (rule.GetMode() & 0777) | (attr.Mode & 07000) @@ -1005,8 +1002,8 @@ func (m *dbMeta) doSetAttr(ctx Context, inode Ino, set uint16, sugidclearmode ui // get acl var rule *aclAPI.Rule if curAttr.AccessACLId != aclAPI.None { - rule = &aclAPI.Rule{} - if err = m.getACL(s, curAttr.AccessACLId, rule); err != nil { + rule, err = m.getACL(s, curAttr.AccessACLId) + if err != nil { return err } } @@ -1026,6 +1023,10 @@ func (m *dbMeta) doSetAttr(ctx Context, inode Ino, set uint16, sugidclearmode ui return err } setAttrACLId(dirtyAttr, aclAPI.TypeAccess, aclId) + + if err = m.tryLoadMissACLs(s); err != nil { + logger.Warnf("SetAttr: load miss acls error: %s", err) + } } var dirtyNode node @@ -1392,8 +1393,8 @@ func (m *dbMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, mode } // set access acl by parent's default acl - rule := &aclAPI.Rule{} - if err = m.getACL(s, pattr.DefaultACLId, rule); err != nil { + rule, err := m.getACL(s, pattr.DefaultACLId) + if err != nil { return err } @@ -1406,6 +1407,10 @@ func (m *dbMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, mode if err != nil { return err } + if err = m.tryLoadMissACLs(s); err != nil { + logger.Warnf("Mknode: load miss acls error: %s", err) + } + n.AccessACLId = id n.Mode = (mode & 0xFE00) | cRule.GetMode() } @@ -3938,7 +3943,7 @@ func (m *dbMeta) LoadMeta(r io.Reader) error { if err := m.syncTable(new(detachedNode)); err != nil { return fmt.Errorf("create table detachedNode: %s", err) } - if err := m.syncTable(new(acl)); err != nil { + if err = m.syncTable(new(acl)); err != nil { return fmt.Errorf("create table acl: %s", err) } var batch int @@ -4310,41 +4315,40 @@ func (m *dbMeta) insertACL(s *xorm.Session, rule *aclAPI.Rule) (uint32, error) { } aclId = val.Id m.aclCache.Put(aclId, rule) + } + return aclId, nil +} - // try load miss - missIds := m.aclCache.GetMissIds(val.Id) - if len(missIds) > 0 { - var acls []acl - if err := s.In("id", missIds).Find(&acls); err != nil { - return aclId, err - } +func (m *dbMeta) tryLoadMissACLs(s *xorm.Session) error { + missIds := m.aclCache.GetMissIds() + if len(missIds) > 0 { + var acls []acl + if err := s.In("id", missIds).Find(&acls); err != nil { + return err + } - for _, data := range acls { - tmpRule := &aclAPI.Rule{} - data.ToRule(tmpRule) - m.aclCache.Put(data.Id, tmpRule) - } + for _, data := range acls { + m.aclCache.Put(data.Id, data.toRule()) } } - return aclId, nil + return nil } -func (m *dbMeta) getACL(s *xorm.Session, id uint32, rule *aclAPI.Rule) error { +func (m *dbMeta) getACL(s *xorm.Session, id uint32) (*aclAPI.Rule, error) { if cRule := m.aclCache.Get(id); cRule != nil { - *rule = *cRule - return nil + return cRule, nil } var aclVal = &acl{Id: id} if ok, err := s.Get(aclVal); err != nil { - return err + return nil, err } else if !ok { - return ENOATTR + return nil, ENOATTR } - aclVal.ToRule(rule) - m.aclCache.Put(id, rule) - return nil + r := aclVal.toRule() + m.aclCache.Put(id, r) + return r, nil } func (m *dbMeta) SetFacl(ctx Context, ino Ino, aclType uint8, rule *aclAPI.Rule) syscall.Errno { @@ -4395,6 +4399,10 @@ func (m *dbMeta) SetFacl(ctx Context, ino Ino, aclType uint8, rule *aclAPI.Rule) } setAttrACLId(attr, aclType, aclId) + if err = m.tryLoadMissACLs(s); err != nil { + logger.Warnf("SetFacl: load miss acls error: %s", err) + } + // set mode if aclType == aclAPI.TypeAccess { attr.Mode &= 07000 @@ -4433,7 +4441,7 @@ func (m *dbMeta) GetFacl(ctx Context, ino Ino, aclType uint8, rule *aclAPI.Rule) return errno(m.roTxn(func(s *xorm.Session) error { attr := &Attr{} n := &node{Inode: ino} - if ok, err := s.ForUpdate().Get(n); err != nil { + if ok, err := s.Get(n); err != nil { return err } else if !ok { return syscall.ENOENT @@ -4446,6 +4454,11 @@ func (m *dbMeta) GetFacl(ctx Context, ino Ino, aclType uint8, rule *aclAPI.Rule) return ENOATTR } - return m.getACL(s, aclId, rule) + a, err := m.getACL(s, aclId) + if err != nil { + return err + } + *rule = *a + return nil })) } diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 4fb545c48fa9..74c2d0e38d0e 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -256,6 +256,12 @@ func (m *kvMeta) aclKey(id uint32) []byte { return m.fmtKey("R", id) } +func (m *kvMeta) parseACLId(key string) uint32 { + // trim "R" + rb := utils.ReadBuffer([]byte(key[1:])) + return rb.Get32() +} + func (m *kvMeta) parseSid(key string) uint64 { buf := []byte(key[2:]) // "SE" or "SH" if len(buf) != 8 { @@ -477,22 +483,14 @@ func (m *kvMeta) doInit(format *Format, force bool) error { } // cache all acls - maxId, err := m.getCounter(ACLCounterName) + acls, err := m.scanValues(m.fmtKey("R"), -1, nil) if err != nil { return err } - - if maxId > 0 { - missKeys := make([][]byte, maxId) - for i := 0; i < int(maxId); i++ { - missKeys[i] = m.aclKey(uint32(i) + 1) - } - acls := tx.gets(missKeys...) - for i, val := range acls { - tmpRule := &aclAPI.Rule{} - tmpRule.Decode(val) - m.aclCache.Put(uint32(i)+1, tmpRule) - } + for key, val := range acls { + tmpRule := &aclAPI.Rule{} + tmpRule.Decode(val) + m.aclCache.Put(m.parseACLId(key), tmpRule) } return nil }) @@ -888,8 +886,8 @@ func (m *kvMeta) doGetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno { m.parseAttr(val, attr) if attr != nil && attr.AccessACLId != aclAPI.None { - rule := &aclAPI.Rule{} - if err := m.getACL(tx, attr.AccessACLId, rule); err != nil { + rule, err := m.getACL(tx, attr.AccessACLId) + if err != nil { return err } attr.Mode = (rule.GetMode() & 0777) | (attr.Mode & 07000) @@ -914,8 +912,9 @@ func (m *kvMeta) doSetAttr(ctx Context, inode Ino, set uint16, sugidclearmode ui // get acl var rule *aclAPI.Rule if cur.AccessACLId != aclAPI.None { - rule = &aclAPI.Rule{} - if err := m.getACL(tx, cur.AccessACLId, rule); err != nil { + var err error + rule, err = m.getACL(tx, cur.AccessACLId) + if err != nil { return err } } @@ -935,6 +934,10 @@ func (m *kvMeta) doSetAttr(ctx Context, inode Ino, set uint16, sugidclearmode ui return err } setAttrACLId(dirtyAttr, aclAPI.TypeAccess, aclId) + + if err = m.tryLoadMissACLs(tx); err != nil { + logger.Warnf("SetAttr: load miss acls error: %s", err) + } } dirtyAttr.Ctime = now.Unix() @@ -1233,8 +1236,8 @@ func (m *kvMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, mode } // set access acl by parent's default acl - rule := &aclAPI.Rule{} - if err = m.getACL(tx, pattr.DefaultACLId, rule); err != nil { + rule, err := m.getACL(tx, pattr.DefaultACLId) + if err != nil { return err } @@ -1247,6 +1250,10 @@ func (m *kvMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, mode if err != nil { return err } + if err = m.tryLoadMissACLs(tx); err != nil { + logger.Warnf("Mknode: load miss acls error: %s", err) + } + attr.AccessACLId = id attr.Mode = (mode & 0xFE00) | cRule.GetMode() } @@ -3716,6 +3723,10 @@ func (m *kvMeta) SetFacl(ctx Context, ino Ino, aclType uint8, rule *aclAPI.Rule) } setAttrACLId(attr, aclType, aclId) + if err = m.tryLoadMissACLs(tx); err != nil { + logger.Warnf("SetFacl: load miss acls error: %s", err) + } + // set mode if aclType == aclAPI.TypeAccess { attr.Mode &= 07000 @@ -3757,14 +3768,19 @@ func (m *kvMeta) GetFacl(ctx Context, ino Ino, aclType uint8, rule *aclAPI.Rule) return ENOATTR } - return m.getACL(tx, aclId, rule) + a, err := m.getACL(tx, aclId) + if err != nil { + return err + } + *rule = *a + return nil })) } func (m *kvMeta) insertACL(tx *kvTxn, rule *aclAPI.Rule) (uint32, error) { var aclId uint32 if aclId = m.aclCache.GetId(rule); aclId == aclAPI.None { - newId, err := m.incrCounter(ACLCounterName, 1) + newId, err := m.incrCounter(aclCounter, 1) if err != nil { return aclAPI.None, err } @@ -3772,38 +3788,40 @@ func (m *kvMeta) insertACL(tx *kvTxn, rule *aclAPI.Rule) (uint32, error) { tx.set(m.aclKey(aclId), rule.Encode()) m.aclCache.Put(aclId, rule) + } + return aclId, nil +} - // try load miss - missIds := m.aclCache.GetMissIds(aclId) - if len(missIds) > 0 { - missKeys := make([][]byte, len(missIds)) - for i, id := range missIds { - missKeys[i] = m.aclKey(id) - } +func (m *kvMeta) tryLoadMissACLs(tx *kvTxn) error { + missIds := m.aclCache.GetMissIds() + if len(missIds) > 0 { + missKeys := make([][]byte, len(missIds)) + for i, id := range missIds { + missKeys[i] = m.aclKey(id) + } - acls := tx.gets(missKeys...) - for i, data := range acls { - tmpRule := &aclAPI.Rule{} - tmpRule.Decode(data) - m.aclCache.Put(missIds[i], tmpRule) - } + acls := tx.gets(missKeys...) + for i, data := range acls { + r := &aclAPI.Rule{} + r.Decode(data) + m.aclCache.Put(missIds[i], r) } } - return aclId, nil + return nil } -func (m *kvMeta) getACL(tx *kvTxn, id uint32, rule *aclAPI.Rule) error { +func (m *kvMeta) getACL(tx *kvTxn, id uint32) (*aclAPI.Rule, error) { if cRule := m.aclCache.Get(id); cRule != nil { - *rule = *cRule - return nil + return cRule, nil } val := tx.get(m.aclKey(id)) if val == nil { - return ENOATTR + return nil, ENOATTR } + rule := &aclAPI.Rule{} rule.Decode(val) m.aclCache.Put(id, rule) - return nil + return rule, nil }