Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

quota: atomically set limitation #4158

Merged
merged 19 commits into from
Nov 13, 2023
56 changes: 30 additions & 26 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ type engine interface {
doCleanupDetachedNode(ctx Context, detachedNode Ino) syscall.Errno

doGetQuota(ctx Context, inode Ino) (*Quota, error)
doSetQuota(ctx Context, inode Ino, quota *Quota, create bool) error
// set quota, return true if there is no quota exists before
doSetQuota(ctx Context, inode Ino, quota *Quota) (created bool, err error)
doDelQuota(ctx Context, inode Ino) error
doLoadQuotas(ctx Context) (map[Ino]*Quota, error)
doFlushQuotas(ctx Context, quotas map[Ino]*Quota) error
Expand Down Expand Up @@ -828,37 +829,34 @@ func (m *baseMeta) HandleQuota(ctx Context, cmd uint8, dpath string, quotas map[
return err
}
}
q, err := m.en.doGetQuota(ctx, inode)
quota := quotas[dpath]
created, err := m.en.doSetQuota(ctx, inode, &Quota{
MaxSpace: quota.MaxSpace,
MaxInodes: quota.MaxInodes,
UsedSpace: -1,
UsedInodes: -1,
})
if err != nil {
return err
}
quota := quotas[dpath]
if q == nil {
if created {
wrapErr := func(e error) error {
return errors.Wrapf(e, "set quota usage for file(%s), please repair it later", dpath)
}
var sum Summary
if st := m.GetSummary(ctx, inode, &sum, true, strict); st != 0 {
return st
}
quota.UsedSpace = int64(sum.Size) - align4K(0)
quota.UsedInodes = int64(sum.Dirs+sum.Files) - 1
if quota.MaxSpace < 0 {
quota.MaxSpace = 0
return wrapErr(st)
}
if quota.MaxInodes < 0 {
quota.MaxInodes = 0
}
return m.en.doSetQuota(ctx, inode, quota, true)
} else {
quota.UsedSpace, quota.UsedInodes = q.UsedSpace, q.UsedInodes
if quota.MaxSpace < 0 {
quota.MaxSpace = q.MaxSpace
}
if quota.MaxInodes < 0 {
quota.MaxInodes = q.MaxInodes
}
if quota.MaxSpace == q.MaxSpace && quota.MaxInodes == q.MaxInodes {
return nil // nothing to update
_, err := m.en.doSetQuota(ctx, inode, &Quota{
UsedSpace: int64(sum.Size) - align4K(0),
UsedInodes: int64(sum.Dirs+sum.Files) - 1,
MaxSpace: -1,
MaxInodes: -1,
})
if err != nil {
return wrapErr(err)
}
return m.en.doSetQuota(ctx, inode, quota, false)
return nil
}
case QuotaGet:
q, err := m.en.doGetQuota(ctx, inode)
Expand Down Expand Up @@ -914,7 +912,13 @@ func (m *baseMeta) HandleQuota(ctx Context, cmd uint8, dpath string, quotas map[
q.UsedSpace = usedSpace
quotas[dpath] = q
logger.Info("repairing...")
return m.en.doSetQuota(ctx, inode, q, true)
_, err = m.en.doSetQuota(ctx, inode, &Quota{
MaxInodes: -1,
MaxSpace: -1,
UsedInodes: q.UsedInodes,
UsedSpace: q.UsedSpace,
})
return err
}
return fmt.Errorf("quota of %s is inconsistent, please repair it with --repair flag", dpath)
default:
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (dm *DumpedMeta) writeJsonWithOutTree(w io.Writer) (*bufio.Writer, error) {
func (m *baseMeta) loadDumpedQuotas(ctx Context, quotas map[Ino]*DumpedQuota) {
// update quota
for inode, q := range quotas {
if err := m.en.doSetQuota(ctx, inode, &Quota{q.MaxSpace, q.MaxInodes, q.UsedSpace, q.UsedInodes, 0, 0}, true); err != nil {
if _, err := m.en.doSetQuota(ctx, inode, &Quota{q.MaxSpace, q.MaxInodes, q.UsedSpace, q.UsedInodes, 0, 0}); err != nil {
logger.Warnf("reset quota of %d: %s", inode, err)
continue
}
Expand Down
45 changes: 33 additions & 12 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3467,21 +3467,42 @@ func (m *redisMeta) doGetQuota(ctx Context, inode Ino) (*Quota, error) {
return &quota, nil
}

func (m *redisMeta) doSetQuota(ctx Context, inode Ino, quota *Quota, create bool) error {
return m.txn(ctx, func(tx *redis.Tx) error {
func (m *redisMeta) doSetQuota(ctx Context, inode Ino, quota *Quota) (bool, error) {
var created bool
err := m.txn(ctx, func(tx *redis.Tx) error {
origin := new(Quota)
field := inode.String()
if create {
_, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.HSet(ctx, m.dirQuotaKey(), field, m.packQuota(quota.MaxSpace, quota.MaxInodes))
pipe.HSet(ctx, m.dirQuotaUsedSpaceKey(), field, quota.UsedSpace)
pipe.HSet(ctx, m.dirQuotaUsedInodesKey(), field, quota.UsedInodes)
return nil
})
return err
buf, e := tx.HGet(ctx, m.dirQuotaKey(), field).Bytes()
if e == nil {
created = false
origin.MaxSpace, origin.MaxInodes = m.parseQuota(buf)
} else if e == redis.Nil {
created = true
if quota.MaxSpace < 0 && quota.MaxInodes < 0 {
return errors.New("limitation not set or deleted")
}
} else {
return tx.HSet(ctx, m.dirQuotaKey(), field, m.packQuota(quota.MaxSpace, quota.MaxInodes)).Err()
return e
}
}, m.dirQuotaKey())
if quota.MaxSpace >= 0 {
origin.MaxSpace = quota.MaxSpace
}
if quota.MaxInodes >= 0 {
origin.MaxInodes = quota.MaxInodes
}
_, e = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.HSet(ctx, m.dirQuotaKey(), field, m.packQuota(origin.MaxSpace, origin.MaxInodes))
if quota.UsedSpace >= 0 {
pipe.HSet(ctx, m.dirQuotaUsedSpaceKey(), field, quota.UsedSpace)
}
if quota.UsedInodes >= 0 {
pipe.HSet(ctx, m.dirQuotaUsedInodesKey(), field, quota.UsedInodes)
}
return nil
})
return e
}, m.inodeKey(inode))
return created, err
}

func (m *redisMeta) doDelQuota(ctx Context, inode Ino) error {
Expand Down
50 changes: 34 additions & 16 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -3201,28 +3201,46 @@ func (m *dbMeta) doGetQuota(ctx Context, inode Ino) (*Quota, error) {
})
}

func (m *dbMeta) doSetQuota(ctx Context, inode Ino, quota *Quota, create bool) error {
return m.txn(func(s *xorm.Session) error {
q := dirQuota{Inode: inode}
ok, e := s.ForUpdate().Get(&q)
func (m *dbMeta) doSetQuota(ctx Context, inode Ino, quota *Quota) (bool, error) {
var created bool
err := m.txn(func(s *xorm.Session) error {
origin := dirQuota{Inode: inode}
exist, e := s.ForUpdate().Get(&origin)
if e != nil {
return e
}
q.MaxSpace, q.MaxInodes = quota.MaxSpace, quota.MaxInodes
if ok {
if create {
q.UsedSpace, q.UsedInodes = quota.UsedSpace, quota.UsedInodes
_, e = s.Cols("max_space", "max_inodes", "used_space", "used_inodes").Update(&q, &dirQuota{Inode: inode})
} else {
quota.UsedSpace, quota.UsedInodes = q.UsedSpace, q.UsedInodes
_, e = s.Cols("max_space", "max_inodes").Update(&q, &dirQuota{Inode: inode})
}
} else if create {
q.UsedSpace, q.UsedInodes = quota.UsedSpace, quota.UsedInodes
e = mustInsert(s, &q)
if exist {
created = false
} else if quota.MaxSpace < 0 && quota.MaxInodes < 0 {
return errors.Errorf("limitation not set or deleted")
} else {
created = true
}
updateColumns := make([]string, 0, 4)
if quota.MaxSpace >= 0 {
origin.MaxSpace = quota.MaxSpace
updateColumns = append(updateColumns, "max_space")
}
if quota.MaxInodes >= 0 {
origin.MaxInodes = quota.MaxInodes
updateColumns = append(updateColumns, "max_inodes")
}
if quota.UsedSpace >= 0 {
origin.UsedSpace = quota.UsedSpace
updateColumns = append(updateColumns, "used_space")
}
if quota.UsedInodes >= 0 {
origin.UsedInodes = quota.UsedInodes
updateColumns = append(updateColumns, "used_inodes")
}
if exist {
_, e = s.Cols(updateColumns...).Update(&origin, &dirQuota{Inode: inode})
} else {
e = mustInsert(s, &origin)
}
return e
})
return created, err
}

func (m *dbMeta) doDelQuota(ctx Context, inode Ino) error {
Expand Down
39 changes: 29 additions & 10 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2748,22 +2748,41 @@ func (m *kvMeta) doGetQuota(ctx Context, inode Ino) (*Quota, error) {
return m.parseQuota(buf), nil
}

func (m *kvMeta) doSetQuota(ctx Context, inode Ino, quota *Quota, create bool) error {
return m.txn(func(tx *kvTxn) error {
if create {
tx.set(m.dirQuotaKey(inode), m.packQuota(quota))
func (m *kvMeta) doSetQuota(ctx Context, inode Ino, quota *Quota) (bool, error) {
var created bool
err := m.txn(func(tx *kvTxn) error {
var origin *Quota
buf := tx.get(m.dirQuotaKey(inode))
if len(buf) == 32 {
origin = m.parseQuota(buf)
created = false
} else if len(buf) != 0 {
return fmt.Errorf("invalid quota value: %v", buf)
} else {
buf := tx.get(m.dirQuotaKey(inode))
if len(buf) != 32 {
return fmt.Errorf("invalid quota value: %v", buf)
if quota.MaxSpace < 0 && quota.MaxInodes < 0 {
return errors.New("limitation not set or deleted")
}
q := m.parseQuota(buf)
quota.UsedSpace, quota.UsedInodes = q.UsedSpace, q.UsedInodes
tx.set(m.dirQuotaKey(inode), m.packQuota(quota))
created = true
origin = new(Quota)
}
if quota.MaxSpace >= 0 {
origin.MaxSpace = quota.MaxSpace
}
if quota.MaxInodes >= 0 {
origin.MaxInodes = quota.MaxInodes
}
if quota.UsedSpace >= 0 {
origin.UsedSpace = quota.UsedSpace
}
if quota.UsedInodes >= 0 {
origin.UsedInodes = quota.UsedInodes
}
tx.set(m.dirQuotaKey(inode), m.packQuota(origin))
return nil
})
return created, err
}

func (m *kvMeta) doDelQuota(ctx Context, inode Ino) error {
return m.deleteKeys(m.dirQuotaKey(inode))
}
Expand Down