Skip to content

Commit

Permalink
meta: use FastGetSummary to get recursive usage for quota (#3371)
Browse files Browse the repository at this point in the history
  • Loading branch information
SandyXSD authored Mar 24, 2023
1 parent 27c8b7a commit e7ed009
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 264 deletions.
93 changes: 85 additions & 8 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type engine interface {
doCleanupDelayedSlices(edge int64) (int, error)
doDeleteSlice(id uint64, size uint32) error

doGetQuota(ctx Context, inode Ino) (*Quota, error)
doSetQuota(ctx Context, inode Ino, quota *Quota, create bool) error
doDelQuota(ctx Context, inode Ino) error
doLoadQuotas(ctx Context) (map[Ino]*Quota, error)
doFlushQuota(ctx Context, inode Ino, space, inodes int64) error

Expand Down Expand Up @@ -572,10 +575,6 @@ func (m *baseMeta) checkQuota(ctx Context, space, inodes int64, parent Ino) bool
return m.checkDirQuota(ctx, parent, space, inodes)
}

func (m *baseMeta) GetDirRecStat(ctx Context, inode Ino) (space, inodes int64, err error) {
return 0, 0, nil // FIXME: use FastGetSummary
}

func (m *baseMeta) loadQuotas() {
quotas, err := m.en.doLoadQuotas(Background)
if err == nil {
Expand Down Expand Up @@ -714,6 +713,82 @@ func (m *baseMeta) flushQuotas() {
}
}

func (m *baseMeta) HandleQuota(ctx Context, cmd uint8, dpath string, quotas map[string]*Quota) error {
var inode Ino
if cmd != QuotaList {
if st := m.resolve(ctx, dpath, &inode); st != 0 {
return st
}
if isTrash(inode) {
return errors.New("no quota for any trash directory")
}
}

switch cmd {
case QuotaSet:
q, err := m.en.doGetQuota(ctx, inode)
if err != nil {
return err
}
quota := quotas[dpath]
if q == nil {
var sum Summary
if st := m.FastGetSummary(ctx, inode, &sum, true); st != 0 {
return st
}
quota.UsedSpace = int64(sum.Size) - align4K(0)
quota.UsedInodes = int64(sum.Dirs+sum.Files) - 1
if quota.MaxSpace < 0 {
quota.MaxSpace = 0
}
if quota.MaxInodes < 0 {
quota.MaxInodes = 0
}
return m.en.doSetQuota(ctx, inode, quota, true)
} else {
quota.UsedSpace, quota.UsedInodes = q.UsedSpace, q.UsedInodes
if quota.MaxSpace < 0 {
quota.MaxSpace = q.MaxSpace
}
if quota.MaxInodes < 0 {
quota.MaxInodes = q.MaxInodes
}
if quota.MaxSpace == q.MaxSpace && quota.MaxInodes == q.MaxInodes {
return nil // nothing to update
}
return m.en.doSetQuota(ctx, inode, quota, false)
}
case QuotaGet:
q, err := m.en.doGetQuota(ctx, inode)
if err != nil {
return err
}
if q == nil {
return fmt.Errorf("no quota for inode %d path %s", inode, dpath)
}
quotas[dpath] = q
case QuotaDel:
return m.en.doDelQuota(ctx, inode)
case QuotaList:
quotaMap, err := m.en.doLoadQuotas(ctx)
if err != nil {
return err
}
var p string
for ino, quota := range quotaMap {
if ps := m.GetPaths(ctx, ino); len(ps) > 0 {
p = ps[0]
} else {
p = fmt.Sprintf("inode:%d", ino)
}
quotas[p] = quota
}
default: // FIXME: QuotaCheck
return fmt.Errorf("invalid quota command: %d", cmd)
}
return nil
}

func (m *baseMeta) cleanupDeletedFiles() {
for {
utils.SleepWithJitter(time.Minute)
Expand Down Expand Up @@ -1229,12 +1304,14 @@ func (m *baseMeta) Rename(ctx Context, parentSrc Ino, nameSrc string, parentDst
if st := m.Lookup(ctx, parentSrc, nameSrc, inode, attr); st != 0 {
return st
}
var err error
if attr.Typ == TypeDirectory {
space, inodes, err = m.GetDirRecStat(ctx, *inode)
if err != nil {
return errno(err)
var sum Summary
logger.Debugf("Start to get summary of inode %d", *inode)
if st := m.FastGetSummary(ctx, *inode, &sum, true); st != 0 {
logger.Warnf("Get summary of inode %d: %s", *inode, st)
return st
}
space, inodes = int64(sum.Size), int64(sum.Dirs+sum.Files)
} else {
space, inodes = align4K(attr.Length), 1
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/meta/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,6 @@ type Meta interface {
GetParents(ctx Context, inode Ino) map[Ino]int
// GetDirStat returns the space and inodes usage of a directory.
GetDirStat(ctx Context, inode Ino) (st *dirStat, err error)
// GetDirRecStat returns the space and inodes usage (recursive) of a directory.
GetDirRecStat(ctx Context, inode Ino) (space, inodes int64, err error)

// GetXattr returns the value of extended attribute for given name.
GetXattr(ctx Context, inode Ino, name string, vbuff *[]byte) syscall.Errno
Expand Down
143 changes: 47 additions & 96 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3285,109 +3285,60 @@ func (m *redisMeta) doRemoveXattr(ctx Context, inode Ino, name string) syscall.E
}
}

func (m *redisMeta) HandleQuota(ctx Context, cmd uint8, dpath string, quotas map[string]*Quota) error {
var inode Ino
if cmd != QuotaList {
if st := m.resolve(ctx, dpath, &inode); st != 0 {
return st
}
if isTrash(inode) {
return errors.New("no quota for any trash directory")
}
func (m *redisMeta) doGetQuota(ctx Context, inode Ino) (*Quota, error) {
field := inode.String()
cmds, err := m.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.HGet(ctx, m.dirQuotaKey(), field)
pipe.HGet(ctx, m.dirQuotaUsedSpaceKey(), field)
pipe.HGet(ctx, m.dirQuotaUsedInodesKey(), field)
return nil
})
if err == redis.Nil {
return nil, nil
} else if err != nil {
return nil, err
}

var err error
field := inode.String()
switch cmd {
case QuotaSet:
quota := quotas[dpath]
err = m.txn(ctx, func(tx *redis.Tx) error {
rawQ, e := tx.HGet(ctx, m.dirQuotaKey(), field).Bytes()
if e != nil && e != redis.Nil {
return e
}
if len(rawQ) != 0 && len(rawQ) != 16 {
return fmt.Errorf("invalid quota value: %v", rawQ)
}
maxSpace, maxInodes := m.parseQuota(rawQ)
if quota.MaxSpace < 0 {
quota.MaxSpace = maxSpace
}
if quota.MaxInodes < 0 {
quota.MaxInodes = maxInodes
}
if maxSpace == quota.MaxSpace && maxInodes == quota.MaxInodes {
return nil // nothing to update
}
create := e == redis.Nil
var usedSpace, usedInodes int64
if create {
usedSpace, usedInodes, e = m.GetDirRecStat(ctx, inode)
if e != nil {
return e
}
}
_, e = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
buf, _ := cmds[0].(*redis.StringCmd).Bytes()
if len(buf) != 16 {
return nil, fmt.Errorf("invalid quota value: %v", buf)
}
var quota Quota
quota.MaxSpace, quota.MaxInodes = m.parseQuota(buf)
if quota.UsedSpace, err = cmds[1].(*redis.StringCmd).Int64(); err != nil {
return nil, err
}
if quota.UsedInodes, err = cmds[2].(*redis.StringCmd).Int64(); err != nil {
return nil, err
}
return &quota, nil
}

func (m *redisMeta) doSetQuota(ctx Context, inode Ino, quota *Quota, create bool) error {
return m.txn(ctx, func(tx *redis.Tx) error {
field := inode.String()
if create {
_, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.HSet(ctx, m.dirQuotaKey(), field, m.packQuota(quota.MaxSpace, quota.MaxInodes))
if create {
pipe.HSet(ctx, m.dirQuotaUsedSpaceKey(), field, usedSpace)
pipe.HSet(ctx, m.dirQuotaUsedInodesKey(), field, usedInodes)
}
pipe.HSet(ctx, m.dirQuotaUsedSpaceKey(), field, quota.UsedSpace)
pipe.HSet(ctx, m.dirQuotaUsedInodesKey(), field, quota.UsedInodes)
return nil
})
return e
}, m.dirQuotaKey())
case QuotaGet:
var quota Quota
var cmds []redis.Cmder
cmds, err = m.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.HGet(ctx, m.dirQuotaKey(), field)
pipe.HGet(ctx, m.dirQuotaUsedSpaceKey(), field)
pipe.HGet(ctx, m.dirQuotaUsedInodesKey(), field)
return nil
})
if err == redis.Nil {
err = errors.New("no quota")
}
if err != nil {
return err
} else {
return tx.HSet(ctx, m.dirQuotaKey(), field, m.packQuota(quota.MaxSpace, quota.MaxInodes)).Err()
}
rawQ, _ := cmds[0].(*redis.StringCmd).Bytes()
if len(rawQ) != 16 {
return fmt.Errorf("invalid quota value: %v", rawQ)
}
quota.MaxSpace, quota.MaxInodes = m.parseQuota(rawQ)
if quota.UsedSpace, err = cmds[1].(*redis.StringCmd).Int64(); err != nil {
return err
}
if quota.UsedInodes, err = cmds[2].(*redis.StringCmd).Int64(); err != nil {
return err
}
quotas[dpath] = &quota
case QuotaDel:
_, err = m.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.HDel(ctx, m.dirQuotaKey(), field)
pipe.HDel(ctx, m.dirQuotaUsedSpaceKey(), field)
pipe.HDel(ctx, m.dirQuotaUsedInodesKey(), field)
return nil
})
case QuotaList:
var quotaMap map[Ino]*Quota
quotaMap, err = m.doLoadQuotas(ctx)
if err == nil {
var p string
for ino, quota := range quotaMap {
if ps := m.GetPaths(ctx, ino); len(ps) > 0 {
p = ps[0]
} else {
p = fmt.Sprintf("inode:%d", ino)
}
quotas[p] = quota
}
}
default: // FIXME: QuotaCheck
err = fmt.Errorf("invalid quota command: %d", cmd)
}
}, m.dirQuotaKey())
}

func (m *redisMeta) doDelQuota(ctx Context, inode Ino) error {
field := inode.String()
_, err := m.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.HDel(ctx, m.dirQuotaKey(), field)
pipe.HDel(ctx, m.dirQuotaUsedSpaceKey(), field)
pipe.HDel(ctx, m.dirQuotaUsedInodesKey(), field)
return nil
})
return err
}

Expand Down
Loading

0 comments on commit e7ed009

Please sign in to comment.