Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

meta: use FastGetSummary to get recursive usage for quota #3371

Merged
merged 2 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 85 additions & 8 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type engine interface {
doCleanupDelayedSlices(edge int64) (int, error)
doDeleteSlice(id uint64, size uint32) error

doGetQuota(ctx Context, inode Ino) (*Quota, error)
doSetQuota(ctx Context, inode Ino, quota *Quota, create bool) error
doDelQuota(ctx Context, inode Ino) error
doLoadQuotas(ctx Context) (map[Ino]*Quota, error)
doFlushQuota(ctx Context, inode Ino, space, inodes int64) error

Expand Down Expand Up @@ -572,10 +575,6 @@ func (m *baseMeta) checkQuota(ctx Context, space, inodes int64, parent Ino) bool
return m.checkDirQuota(ctx, parent, space, inodes)
}

func (m *baseMeta) GetDirRecStat(ctx Context, inode Ino) (space, inodes int64, err error) {
return 0, 0, nil // FIXME: use FastGetSummary
}

func (m *baseMeta) loadQuotas() {
quotas, err := m.en.doLoadQuotas(Background)
if err == nil {
Expand Down Expand Up @@ -714,6 +713,82 @@ func (m *baseMeta) flushQuotas() {
}
}

func (m *baseMeta) HandleQuota(ctx Context, cmd uint8, dpath string, quotas map[string]*Quota) error {
var inode Ino
if cmd != QuotaList {
if st := m.resolve(ctx, dpath, &inode); st != 0 {
return st
}
if isTrash(inode) {
return errors.New("no quota for any trash directory")
}
}

switch cmd {
case QuotaSet:
q, err := m.en.doGetQuota(ctx, inode)
if err != nil {
return err
}
quota := quotas[dpath]
if q == nil {
var sum Summary
if st := m.FastGetSummary(ctx, inode, &sum, true); st != 0 {
return st
}
quota.UsedSpace = int64(sum.Size) - align4K(0)
quota.UsedInodes = int64(sum.Dirs+sum.Files) - 1
if quota.MaxSpace < 0 {
quota.MaxSpace = 0
}
if quota.MaxInodes < 0 {
quota.MaxInodes = 0
}
return m.en.doSetQuota(ctx, inode, quota, true)
} else {
quota.UsedSpace, quota.UsedInodes = q.UsedSpace, q.UsedInodes
if quota.MaxSpace < 0 {
quota.MaxSpace = q.MaxSpace
}
if quota.MaxInodes < 0 {
quota.MaxInodes = q.MaxInodes
}
if quota.MaxSpace == q.MaxSpace && quota.MaxInodes == q.MaxInodes {
return nil // nothing to update
}
return m.en.doSetQuota(ctx, inode, quota, false)
}
case QuotaGet:
q, err := m.en.doGetQuota(ctx, inode)
if err != nil {
return err
}
if q == nil {
return fmt.Errorf("no quota for inode %d path %s", inode, dpath)
}
quotas[dpath] = q
case QuotaDel:
return m.en.doDelQuota(ctx, inode)
case QuotaList:
quotaMap, err := m.en.doLoadQuotas(ctx)
if err != nil {
return err
}
var p string
for ino, quota := range quotaMap {
if ps := m.GetPaths(ctx, ino); len(ps) > 0 {
p = ps[0]
} else {
p = fmt.Sprintf("inode:%d", ino)
}
quotas[p] = quota
}
default: // FIXME: QuotaCheck
return fmt.Errorf("invalid quota command: %d", cmd)
}
return nil
}

func (m *baseMeta) cleanupDeletedFiles() {
for {
utils.SleepWithJitter(time.Minute)
Expand Down Expand Up @@ -1229,12 +1304,14 @@ func (m *baseMeta) Rename(ctx Context, parentSrc Ino, nameSrc string, parentDst
if st := m.Lookup(ctx, parentSrc, nameSrc, inode, attr); st != 0 {
return st
}
var err error
if attr.Typ == TypeDirectory {
space, inodes, err = m.GetDirRecStat(ctx, *inode)
if err != nil {
return errno(err)
var sum Summary
logger.Debugf("Start to get summary of inode %d", *inode)
if st := m.FastGetSummary(ctx, *inode, &sum, true); st != 0 {
logger.Warnf("Get summary of inode %d: %s", *inode, st)
return st
}
space, inodes = int64(sum.Size), int64(sum.Dirs+sum.Files)
} else {
space, inodes = align4K(attr.Length), 1
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/meta/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,6 @@ type Meta interface {
GetParents(ctx Context, inode Ino) map[Ino]int
// GetDirStat returns the space and inodes usage of a directory.
GetDirStat(ctx Context, inode Ino) (st *dirStat, err error)
// GetDirRecStat returns the space and inodes usage (recursive) of a directory.
GetDirRecStat(ctx Context, inode Ino) (space, inodes int64, err error)

// GetXattr returns the value of extended attribute for given name.
GetXattr(ctx Context, inode Ino, name string, vbuff *[]byte) syscall.Errno
Expand Down
143 changes: 47 additions & 96 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3285,109 +3285,60 @@ func (m *redisMeta) doRemoveXattr(ctx Context, inode Ino, name string) syscall.E
}
}

func (m *redisMeta) HandleQuota(ctx Context, cmd uint8, dpath string, quotas map[string]*Quota) error {
var inode Ino
if cmd != QuotaList {
if st := m.resolve(ctx, dpath, &inode); st != 0 {
return st
}
if isTrash(inode) {
return errors.New("no quota for any trash directory")
}
func (m *redisMeta) doGetQuota(ctx Context, inode Ino) (*Quota, error) {
field := inode.String()
cmds, err := m.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.HGet(ctx, m.dirQuotaKey(), field)
pipe.HGet(ctx, m.dirQuotaUsedSpaceKey(), field)
pipe.HGet(ctx, m.dirQuotaUsedInodesKey(), field)
return nil
})
if err == redis.Nil {
return nil, nil
} else if err != nil {
return nil, err
}

var err error
field := inode.String()
switch cmd {
case QuotaSet:
quota := quotas[dpath]
err = m.txn(ctx, func(tx *redis.Tx) error {
rawQ, e := tx.HGet(ctx, m.dirQuotaKey(), field).Bytes()
if e != nil && e != redis.Nil {
return e
}
if len(rawQ) != 0 && len(rawQ) != 16 {
return fmt.Errorf("invalid quota value: %v", rawQ)
}
maxSpace, maxInodes := m.parseQuota(rawQ)
if quota.MaxSpace < 0 {
quota.MaxSpace = maxSpace
}
if quota.MaxInodes < 0 {
quota.MaxInodes = maxInodes
}
if maxSpace == quota.MaxSpace && maxInodes == quota.MaxInodes {
return nil // nothing to update
}
create := e == redis.Nil
var usedSpace, usedInodes int64
if create {
usedSpace, usedInodes, e = m.GetDirRecStat(ctx, inode)
if e != nil {
return e
}
}
_, e = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
buf, _ := cmds[0].(*redis.StringCmd).Bytes()
if len(buf) != 16 {
return nil, fmt.Errorf("invalid quota value: %v", buf)
}
var quota Quota
quota.MaxSpace, quota.MaxInodes = m.parseQuota(buf)
if quota.UsedSpace, err = cmds[1].(*redis.StringCmd).Int64(); err != nil {
return nil, err
}
if quota.UsedInodes, err = cmds[2].(*redis.StringCmd).Int64(); err != nil {
return nil, err
}
return &quota, nil
}

func (m *redisMeta) doSetQuota(ctx Context, inode Ino, quota *Quota, create bool) error {
return m.txn(ctx, func(tx *redis.Tx) error {
field := inode.String()
if create {
_, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.HSet(ctx, m.dirQuotaKey(), field, m.packQuota(quota.MaxSpace, quota.MaxInodes))
if create {
pipe.HSet(ctx, m.dirQuotaUsedSpaceKey(), field, usedSpace)
pipe.HSet(ctx, m.dirQuotaUsedInodesKey(), field, usedInodes)
}
pipe.HSet(ctx, m.dirQuotaUsedSpaceKey(), field, quota.UsedSpace)
pipe.HSet(ctx, m.dirQuotaUsedInodesKey(), field, quota.UsedInodes)
return nil
})
return e
}, m.dirQuotaKey())
case QuotaGet:
var quota Quota
var cmds []redis.Cmder
cmds, err = m.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.HGet(ctx, m.dirQuotaKey(), field)
pipe.HGet(ctx, m.dirQuotaUsedSpaceKey(), field)
pipe.HGet(ctx, m.dirQuotaUsedInodesKey(), field)
return nil
})
if err == redis.Nil {
err = errors.New("no quota")
}
if err != nil {
return err
} else {
return tx.HSet(ctx, m.dirQuotaKey(), field, m.packQuota(quota.MaxSpace, quota.MaxInodes)).Err()
}
rawQ, _ := cmds[0].(*redis.StringCmd).Bytes()
if len(rawQ) != 16 {
return fmt.Errorf("invalid quota value: %v", rawQ)
}
quota.MaxSpace, quota.MaxInodes = m.parseQuota(rawQ)
if quota.UsedSpace, err = cmds[1].(*redis.StringCmd).Int64(); err != nil {
return err
}
if quota.UsedInodes, err = cmds[2].(*redis.StringCmd).Int64(); err != nil {
return err
}
quotas[dpath] = &quota
case QuotaDel:
_, err = m.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.HDel(ctx, m.dirQuotaKey(), field)
pipe.HDel(ctx, m.dirQuotaUsedSpaceKey(), field)
pipe.HDel(ctx, m.dirQuotaUsedInodesKey(), field)
return nil
})
case QuotaList:
var quotaMap map[Ino]*Quota
quotaMap, err = m.doLoadQuotas(ctx)
if err == nil {
var p string
for ino, quota := range quotaMap {
if ps := m.GetPaths(ctx, ino); len(ps) > 0 {
p = ps[0]
} else {
p = fmt.Sprintf("inode:%d", ino)
}
quotas[p] = quota
}
}
default: // FIXME: QuotaCheck
err = fmt.Errorf("invalid quota command: %d", cmd)
}
}, m.dirQuotaKey())
}

func (m *redisMeta) doDelQuota(ctx Context, inode Ino) error {
field := inode.String()
_, err := m.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.HDel(ctx, m.dirQuotaKey(), field)
pipe.HDel(ctx, m.dirQuotaUsedSpaceKey(), field)
pipe.HDel(ctx, m.dirQuotaUsedInodesKey(), field)
return nil
})
return err
}

Expand Down
Loading