Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flush dir stat in less transactions #3277

Merged
merged 12 commits into from
Mar 3, 2023
53 changes: 40 additions & 13 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type engine interface {
doRepair(ctx Context, inode Ino, attr *Attr) syscall.Errno

doGetParents(ctx Context, inode Ino) map[Ino]int
doUpdateDirStat(ctx Context, ino Ino, space int64, inodes int64) error
doUpdateDirStat(ctx Context, batch map[Ino]dirStat) error
doGetDirStat(ctx Context, ino Ino) (space, inodes uint64, err error)

scanTrashSlices(Context, trashSliceScan) error
Expand Down Expand Up @@ -242,6 +242,38 @@ func (m *baseMeta) checkRoot(inode Ino) Ino {
}
}

func (m *baseMeta) batchCalcDirStat(ctx Context, stats map[Ino]*dirStat) error {
var eg errgroup.Group
for i, s := range stats {
ino, stat := i, s
eg.Go(func() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are independent, so we should not cancel others if one of them fail

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we cant to update the stats right after the calculation, don't wait for other large directories. So it's better to move the parallization out of calculation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

space, inodes, err := m.calcDirStat(ctx, ino)
if err != nil {
return err
}
stat.space, stat.inodes = int64(space), int64(inodes)
return nil
})
}
return eg.Wait()
}

func (m *baseMeta) groupBatch(batch map[Ino]dirStat, size int) [][]Ino {
var inos []Ino
for ino := range batch {
inos = append(inos, ino)
}
var batches [][]Ino
for i := 0; i < len(inos); i += size {
davies marked this conversation as resolved.
Show resolved Hide resolved
end := i + size
if end > len(inos) {
end = len(inos)
}
batches = append(batches, inos[i:end])
}
return batches
}

func (m *baseMeta) calcDirStat(ctx Context, ino Ino) (space, inodes uint64, err error) {
var entries []*Entry
if eno := m.en.doReaddir(ctx, ino, 1, &entries, -1); eno != 0 {
Expand Down Expand Up @@ -269,10 +301,10 @@ func (m *baseMeta) updateDirStat(ctx Context, ino Ino, space int64, inodes int64
}
m.dirStatsLock.Lock()
defer m.dirStatsLock.Unlock()
event := m.dirStats[ino]
event.space += space
event.inodes += inodes
m.dirStats[ino] = event
stat := m.dirStats[ino]
stat.space += space
stat.inodes += inodes
m.dirStats[ino] = stat
}

func (m *baseMeta) updateParentStat(ctx Context, inode, parent Ino, space int64) {
Expand Down Expand Up @@ -307,14 +339,9 @@ func (m *baseMeta) doFlushDirStat() {
stats := m.dirStats
m.dirStats = make(map[Ino]dirStat)
m.dirStatsLock.Unlock()
for ino, e := range stats {
if e.space == 0 && e.inodes == 0 {
continue
}
err := m.en.doUpdateDirStat(Background, ino, e.space, e.inodes)
if err != nil {
logger.Errorf("update dir stat failed: %v", err)
}
err := m.en.doUpdateDirStat(Background, stats)
if err != nil {
logger.Errorf("update dir stat failed: %v", err)
}
}

Expand Down
60 changes: 47 additions & 13 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2260,26 +2260,60 @@ func (m *redisMeta) doSyncDirStat(ctx Context, ino Ino) (space, inodes uint64, e
return
}

func (m *redisMeta) doUpdateDirStat(ctx Context, ino Ino, space int64, inodes int64) error {
func (m *redisMeta) doUpdateDirStat(ctx Context, batch map[Ino]dirStat) error {
spaceKey := m.dirUsedSpaceKey()
inodesKey := m.dirUsedInodesKey()
field := strconv.FormatUint(uint64(ino), 10)
if !m.rdb.HExists(ctx, spaceKey, field).Val() {
_, _, err := m.doSyncDirStat(ctx, ino)
nonexist := make(map[Ino]*dirStat, 0)
statList := make([]Ino, 0, len(batch))

pipeline := m.rdb.Pipeline()
for ino := range batch {
pipeline.HExists(ctx, spaceKey, strconv.FormatUint(uint64(ino), 10))
statList = append(statList, ino)
}
rets, err := pipeline.Exec(ctx)
if err != nil {
return err
}
return m.txn(ctx, func(tx *redis.Tx) error {
_, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
if space != 0 {
pipe.HIncrBy(ctx, spaceKey, field, space)
}
if inodes != 0 {
pipe.HIncrBy(ctx, inodesKey, field, inodes)
for i, ret := range rets {
if ret.Err() != nil {
return ret.Err()
}
if exist, _ := ret.(*redis.BoolCmd).Result(); !exist {
nonexist[statList[i]] = &dirStat{}
}
}

if len(nonexist) > 0 {
if err := m.batchCalcDirStat(ctx, nonexist); err != nil {
return err
}
}

for _, group := range m.groupBatch(batch, 1000) {
davies marked this conversation as resolved.
Show resolved Hide resolved
_, err := m.rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
for _, ino := range group {
field := strconv.FormatUint(uint64(ino), 10)
if stat, ok := nonexist[ino]; ok {
pipe.HSetNX(ctx, spaceKey, field, stat.space)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just ignore it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignored

pipe.HSetNX(ctx, inodesKey, field, stat.inodes)
continue
}
stat := batch[ino]
if stat.space != 0 {
pipe.HIncrBy(ctx, spaceKey, field, stat.space)
}
if stat.inodes != 0 {
pipe.HIncrBy(ctx, inodesKey, field, stat.inodes)
}
}
return nil
})
return err
}, spaceKey, inodesKey)
if err != nil {
return err
}
}
return nil
}

func (m *redisMeta) doGetDirStat(ctx Context, ino Ino) (space, inodes uint64, err error) {
Expand Down
48 changes: 38 additions & 10 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2256,7 +2256,7 @@ func (m *dbMeta) doGetParents(ctx Context, inode Ino) map[Ino]int {
return ps
}

func (m *dbMeta) doUpdateDirStat(ctx Context, ino Ino, space int64, inodes int64) error {
func (m *dbMeta) doUpdateDirStat(ctx Context, batch map[Ino]dirStat) error {
table := m.db.GetTableMapper().Obj2Table("dirStats")
usedSpaceColumn := m.db.GetColumnMapper().Obj2Table("UsedSpace")
usedInodeColumn := m.db.GetColumnMapper().Obj2Table("UsedInodes")
Expand All @@ -2266,19 +2266,47 @@ func (m *dbMeta) doUpdateDirStat(ctx Context, ino Ino, space int64, inodes int64
usedSpaceColumn, usedSpaceColumn,
usedInodeColumn, usedInodeColumn,
)
var affected int64
err := m.txn(func(s *xorm.Session) error {
ret, err := s.Exec(sql, space, inodes, ino)

nonexist := make(map[Ino]*dirStat, 0)

for _, group := range m.groupBatch(batch, 1000) {
err := m.txn(func(s *xorm.Session) error {
for _, ino := range group {
stat := batch[ino]
ret, err := s.Exec(sql, stat.space, stat.inodes, ino)
if err != nil {
return err
}
affected, err := ret.RowsAffected()
if err != nil {
return err
}
if affected == 0 {
nonexist[ino] = new(dirStat)
}
}
return nil
})
if err != nil {
return err
}
affected, err = ret.RowsAffected()
return err
})
if err == nil && affected == 0 {
_, _, err = m.doSyncDirStat(ctx, ino)
}
return err

if len(nonexist) > 0 {
if err := m.batchCalcDirStat(ctx, nonexist); err != nil {
return err
}
}

return m.txn(func(s *xorm.Session) error {
for ino, stat := range nonexist {
_, err := s.Insert(&dirStats{Inode: ino, UsedSpace: int64(stat.space), UsedInodes: int64(stat.inodes)})
if err != nil && !strings.Contains(err.Error(), "UNIQUE constraint failed") {
return err
}
}
return nil
})
}

func (m *dbMeta) doSyncDirStat(ctx Context, ino Ino) (space, inodes uint64, err error) {
Expand Down
65 changes: 44 additions & 21 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -1956,30 +1956,53 @@ func (m *kvMeta) doSyncDirStat(ctx Context, ino Ino) (space, inodes uint64, err
return
}

var errEmptyStat = errors.New("empty stat")
func (m *kvMeta) doUpdateDirStat(ctx Context, batch map[Ino]dirStat) error {
syncMap := make(map[Ino]*dirStat, 0)
for _, group := range m.groupBatch(batch, 20) {
err := m.txn(func(tx *kvTxn) error {
for _, ino := range group {
key := m.dirStatKey(ino)
rawStat := tx.get(key)
davies marked this conversation as resolved.
Show resolved Hide resolved
if rawStat == nil {
syncMap[ino] = new(dirStat)
continue
}
us, ui := m.parseDirStat(rawStat)
usedSpace, usedInodes := int64(us), int64(ui)
stat := batch[ino]
usedSpace += stat.space
usedInodes += stat.inodes
if usedSpace < 0 || usedInodes < 0 {
logger.Warnf("dir stat of inode %d is invalid: space %d, inodes %d, try to sync", ino, usedSpace, usedInodes)
syncMap[ino] = new(dirStat)
continue
}
tx.set(key, m.packDirStat(uint64(usedSpace), uint64(usedInodes)))
}
return nil
})
if err != nil {
return err
}
}

func (m *kvMeta) doUpdateDirStat(ctx Context, ino Ino, space int64, inodes int64) error {
err := m.txn(func(tx *kvTxn) error {
key := m.dirStatKey(ino)
rawStat := tx.get(key)
if rawStat == nil {
return errEmptyStat
}
us, ui := m.parseDirStat(rawStat)
usedSpace, usedInodes := int64(us), int64(ui)
usedSpace += space
usedInodes += inodes
if usedSpace < 0 || usedInodes < 0 {
logger.Warnf("dir stat of inode %d is invalid: space %d, inodes %d, try to sync", ino, usedSpace, usedInodes)
return errEmptyStat
}
tx.set(key, m.packDirStat(uint64(usedSpace), uint64(usedInodes)))
if len(syncMap) > 0 {
if err := m.batchCalcDirStat(ctx, syncMap); err != nil {
return err
}
}

return m.txn(func(tx *kvTxn) error {
for ino, stat := range syncMap {
key := m.dirStatKey(ino)
if tx.exist(key) {
// other clients have synced
continue
}
tx.set(key, m.packDirStat(uint64(stat.space), uint64(stat.inodes)))
}
return nil
})
if err == errEmptyStat {
_, _, err = m.doSyncDirStat(ctx, ino)
}
return err
}

func (m *kvMeta) doGetDirStat(ctx Context, ino Ino) (space, inodes uint64, err error) {
Expand Down