Skip to content

Commit

Permalink
quota: atomically set limitation (#4158)
Browse files Browse the repository at this point in the history
Signed-off-by: xixi <[email protected]>
  • Loading branch information
Hexilee authored and SandyXSD committed Nov 24, 2023
1 parent 4e9183b commit 9eecd16
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 65 deletions.
56 changes: 30 additions & 26 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ type engine interface {
doCleanupDetachedNode(ctx Context, detachedNode Ino) syscall.Errno

doGetQuota(ctx Context, inode Ino) (*Quota, error)
doSetQuota(ctx Context, inode Ino, quota *Quota, create bool) error
// set quota, return true if there is no quota exists before
doSetQuota(ctx Context, inode Ino, quota *Quota) (created bool, err error)
doDelQuota(ctx Context, inode Ino) error
doLoadQuotas(ctx Context) (map[Ino]*Quota, error)
doFlushQuotas(ctx Context, quotas map[Ino]*Quota) error
Expand Down Expand Up @@ -827,37 +828,34 @@ func (m *baseMeta) HandleQuota(ctx Context, cmd uint8, dpath string, quotas map[
return err
}
}
q, err := m.en.doGetQuota(ctx, inode)
quota := quotas[dpath]
created, err := m.en.doSetQuota(ctx, inode, &Quota{
MaxSpace: quota.MaxSpace,
MaxInodes: quota.MaxInodes,
UsedSpace: -1,
UsedInodes: -1,
})
if err != nil {
return err
}
quota := quotas[dpath]
if q == nil {
if created {
wrapErr := func(e error) error {
return errors.Wrapf(e, "set quota usage for file(%s), please repair it later", dpath)
}
var sum Summary
if st := m.GetSummary(ctx, inode, &sum, true, strict); st != 0 {
return st
}
quota.UsedSpace = int64(sum.Size) - align4K(0)
quota.UsedInodes = int64(sum.Dirs+sum.Files) - 1
if quota.MaxSpace < 0 {
quota.MaxSpace = 0
return wrapErr(st)
}
if quota.MaxInodes < 0 {
quota.MaxInodes = 0
}
return m.en.doSetQuota(ctx, inode, quota, true)
} else {
quota.UsedSpace, quota.UsedInodes = q.UsedSpace, q.UsedInodes
if quota.MaxSpace < 0 {
quota.MaxSpace = q.MaxSpace
}
if quota.MaxInodes < 0 {
quota.MaxInodes = q.MaxInodes
}
if quota.MaxSpace == q.MaxSpace && quota.MaxInodes == q.MaxInodes {
return nil // nothing to update
_, err := m.en.doSetQuota(ctx, inode, &Quota{
UsedSpace: int64(sum.Size) - align4K(0),
UsedInodes: int64(sum.Dirs+sum.Files) - 1,
MaxSpace: -1,
MaxInodes: -1,
})
if err != nil {
return wrapErr(err)
}
return m.en.doSetQuota(ctx, inode, quota, false)
return nil
}
case QuotaGet:
q, err := m.en.doGetQuota(ctx, inode)
Expand Down Expand Up @@ -913,7 +911,13 @@ func (m *baseMeta) HandleQuota(ctx Context, cmd uint8, dpath string, quotas map[
q.UsedSpace = usedSpace
quotas[dpath] = q
logger.Info("repairing...")
return m.en.doSetQuota(ctx, inode, q, true)
_, err = m.en.doSetQuota(ctx, inode, &Quota{
MaxInodes: -1,
MaxSpace: -1,
UsedInodes: q.UsedInodes,
UsedSpace: q.UsedSpace,
})
return err
}
return fmt.Errorf("quota of %s is inconsistent, please repair it with --repair flag", dpath)
default:
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (dm *DumpedMeta) writeJsonWithOutTree(w io.Writer) (*bufio.Writer, error) {
func (m *baseMeta) loadDumpedQuotas(ctx Context, quotas map[Ino]*DumpedQuota) {
// update quota
for inode, q := range quotas {
if err := m.en.doSetQuota(ctx, inode, &Quota{q.MaxSpace, q.MaxInodes, q.UsedSpace, q.UsedInodes, 0, 0}, true); err != nil {
if _, err := m.en.doSetQuota(ctx, inode, &Quota{q.MaxSpace, q.MaxInodes, q.UsedSpace, q.UsedInodes, 0, 0}); err != nil {
logger.Warnf("reset quota of %d: %s", inode, err)
continue
}
Expand Down
45 changes: 33 additions & 12 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3467,21 +3467,42 @@ func (m *redisMeta) doGetQuota(ctx Context, inode Ino) (*Quota, error) {
return &quota, nil
}

func (m *redisMeta) doSetQuota(ctx Context, inode Ino, quota *Quota, create bool) error {
return m.txn(ctx, func(tx *redis.Tx) error {
func (m *redisMeta) doSetQuota(ctx Context, inode Ino, quota *Quota) (bool, error) {
var created bool
err := m.txn(ctx, func(tx *redis.Tx) error {
origin := new(Quota)
field := inode.String()
if create {
_, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.HSet(ctx, m.dirQuotaKey(), field, m.packQuota(quota.MaxSpace, quota.MaxInodes))
pipe.HSet(ctx, m.dirQuotaUsedSpaceKey(), field, quota.UsedSpace)
pipe.HSet(ctx, m.dirQuotaUsedInodesKey(), field, quota.UsedInodes)
return nil
})
return err
buf, e := tx.HGet(ctx, m.dirQuotaKey(), field).Bytes()
if e == nil {
created = false
origin.MaxSpace, origin.MaxInodes = m.parseQuota(buf)
} else if e == redis.Nil {
created = true
if quota.MaxSpace < 0 && quota.MaxInodes < 0 {
return errors.New("limitation not set or deleted")
}
} else {
return tx.HSet(ctx, m.dirQuotaKey(), field, m.packQuota(quota.MaxSpace, quota.MaxInodes)).Err()
return e
}
}, m.dirQuotaKey())
if quota.MaxSpace >= 0 {
origin.MaxSpace = quota.MaxSpace
}
if quota.MaxInodes >= 0 {
origin.MaxInodes = quota.MaxInodes
}
_, e = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.HSet(ctx, m.dirQuotaKey(), field, m.packQuota(origin.MaxSpace, origin.MaxInodes))
if quota.UsedSpace >= 0 {
pipe.HSet(ctx, m.dirQuotaUsedSpaceKey(), field, quota.UsedSpace)
}
if quota.UsedInodes >= 0 {
pipe.HSet(ctx, m.dirQuotaUsedInodesKey(), field, quota.UsedInodes)
}
return nil
})
return e
}, m.inodeKey(inode))
return created, err
}

func (m *redisMeta) doDelQuota(ctx Context, inode Ino) error {
Expand Down
50 changes: 34 additions & 16 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -3200,28 +3200,46 @@ func (m *dbMeta) doGetQuota(ctx Context, inode Ino) (*Quota, error) {
})
}

func (m *dbMeta) doSetQuota(ctx Context, inode Ino, quota *Quota, create bool) error {
return m.txn(func(s *xorm.Session) error {
q := dirQuota{Inode: inode}
ok, e := s.ForUpdate().Get(&q)
func (m *dbMeta) doSetQuota(ctx Context, inode Ino, quota *Quota) (bool, error) {
var created bool
err := m.txn(func(s *xorm.Session) error {
origin := dirQuota{Inode: inode}
exist, e := s.ForUpdate().Get(&origin)
if e != nil {
return e
}
q.MaxSpace, q.MaxInodes = quota.MaxSpace, quota.MaxInodes
if ok {
if create {
q.UsedSpace, q.UsedInodes = quota.UsedSpace, quota.UsedInodes
_, e = s.Cols("max_space", "max_inodes", "used_space", "used_inodes").Update(&q, &dirQuota{Inode: inode})
} else {
quota.UsedSpace, quota.UsedInodes = q.UsedSpace, q.UsedInodes
_, e = s.Cols("max_space", "max_inodes").Update(&q, &dirQuota{Inode: inode})
}
} else if create {
q.UsedSpace, q.UsedInodes = quota.UsedSpace, quota.UsedInodes
e = mustInsert(s, &q)
if exist {
created = false
} else if quota.MaxSpace < 0 && quota.MaxInodes < 0 {
return errors.Errorf("limitation not set or deleted")
} else {
created = true
}
updateColumns := make([]string, 0, 4)
if quota.MaxSpace >= 0 {
origin.MaxSpace = quota.MaxSpace
updateColumns = append(updateColumns, "max_space")
}
if quota.MaxInodes >= 0 {
origin.MaxInodes = quota.MaxInodes
updateColumns = append(updateColumns, "max_inodes")
}
if quota.UsedSpace >= 0 {
origin.UsedSpace = quota.UsedSpace
updateColumns = append(updateColumns, "used_space")
}
if quota.UsedInodes >= 0 {
origin.UsedInodes = quota.UsedInodes
updateColumns = append(updateColumns, "used_inodes")
}
if exist {
_, e = s.Cols(updateColumns...).Update(&origin, &dirQuota{Inode: inode})
} else {
e = mustInsert(s, &origin)
}
return e
})
return created, err
}

func (m *dbMeta) doDelQuota(ctx Context, inode Ino) error {
Expand Down
39 changes: 29 additions & 10 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2748,22 +2748,41 @@ func (m *kvMeta) doGetQuota(ctx Context, inode Ino) (*Quota, error) {
return m.parseQuota(buf), nil
}

func (m *kvMeta) doSetQuota(ctx Context, inode Ino, quota *Quota, create bool) error {
return m.txn(func(tx *kvTxn) error {
if create {
tx.set(m.dirQuotaKey(inode), m.packQuota(quota))
func (m *kvMeta) doSetQuota(ctx Context, inode Ino, quota *Quota) (bool, error) {
var created bool
err := m.txn(func(tx *kvTxn) error {
var origin *Quota
buf := tx.get(m.dirQuotaKey(inode))
if len(buf) == 32 {
origin = m.parseQuota(buf)
created = false
} else if len(buf) != 0 {
return fmt.Errorf("invalid quota value: %v", buf)
} else {
buf := tx.get(m.dirQuotaKey(inode))
if len(buf) != 32 {
return fmt.Errorf("invalid quota value: %v", buf)
if quota.MaxSpace < 0 && quota.MaxInodes < 0 {
return errors.New("limitation not set or deleted")
}
q := m.parseQuota(buf)
quota.UsedSpace, quota.UsedInodes = q.UsedSpace, q.UsedInodes
tx.set(m.dirQuotaKey(inode), m.packQuota(quota))
created = true
origin = new(Quota)
}
if quota.MaxSpace >= 0 {
origin.MaxSpace = quota.MaxSpace
}
if quota.MaxInodes >= 0 {
origin.MaxInodes = quota.MaxInodes
}
if quota.UsedSpace >= 0 {
origin.UsedSpace = quota.UsedSpace
}
if quota.UsedInodes >= 0 {
origin.UsedInodes = quota.UsedInodes
}
tx.set(m.dirQuotaKey(inode), m.packQuota(origin))
return nil
})
return created, err
}

func (m *kvMeta) doDelQuota(ctx Context, inode Ino) error {
return m.deleteKeys(m.dirQuotaKey(inode))
}
Expand Down

0 comments on commit 9eecd16

Please sign in to comment.