Skip to content

Commit

Permalink
flush dir stat in less transactions (#3277)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hexilee authored Mar 3, 2023
1 parent 5e618d6 commit 7162385
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 69 deletions.
56 changes: 43 additions & 13 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ type engine interface {
doRepair(ctx Context, inode Ino, attr *Attr) syscall.Errno

doGetParents(ctx Context, inode Ino) map[Ino]int
doUpdateDirStat(ctx Context, ino Ino, space int64, inodes int64) error
doUpdateDirStat(ctx Context, batch map[Ino]dirStat) error
doGetDirStat(ctx Context, ino Ino) (space, inodes uint64, err error)
doSyncDirStat(ctx Context, ino Ino) (space, inodes uint64, err error)

scanTrashSlices(Context, trashSliceScan) error
scanPendingSlices(Context, pendingSliceScan) error
Expand Down Expand Up @@ -242,6 +243,40 @@ func (m *baseMeta) checkRoot(inode Ino) Ino {
}
}

func (m *baseMeta) parallelSyncDirStat(ctx Context, inos map[Ino]bool) *sync.WaitGroup {
var wg sync.WaitGroup
for i := range inos {
wg.Add(1)
go func(ino Ino) {
defer wg.Done()
_, _, err := m.en.doSyncDirStat(ctx, ino)
if err != nil {
logger.Warnf("sync dir stat for %d: %s", ino, err)
}
}(i)
}
return &wg
}

func (m *baseMeta) groupBatch(batch map[Ino]dirStat, size int) [][]Ino {
var inos []Ino
for ino := range batch {
inos = append(inos, ino)
}
sort.Slice(inos, func(i, j int) bool {
return inos[i] < inos[j]
})
var batches [][]Ino
for i := 0; i < len(inos); i += size {
end := i + size
if end > len(inos) {
end = len(inos)
}
batches = append(batches, inos[i:end])
}
return batches
}

func (m *baseMeta) calcDirStat(ctx Context, ino Ino) (space, inodes uint64, err error) {
var entries []*Entry
if eno := m.en.doReaddir(ctx, ino, 1, &entries, -1); eno != 0 {
Expand Down Expand Up @@ -269,10 +304,10 @@ func (m *baseMeta) updateDirStat(ctx Context, ino Ino, space int64, inodes int64
}
m.dirStatsLock.Lock()
defer m.dirStatsLock.Unlock()
event := m.dirStats[ino]
event.space += space
event.inodes += inodes
m.dirStats[ino] = event
stat := m.dirStats[ino]
stat.space += space
stat.inodes += inodes
m.dirStats[ino] = stat
}

func (m *baseMeta) updateParentStat(ctx Context, inode, parent Ino, space int64) {
Expand Down Expand Up @@ -307,14 +342,9 @@ func (m *baseMeta) doFlushDirStat() {
stats := m.dirStats
m.dirStats = make(map[Ino]dirStat)
m.dirStatsLock.Unlock()
for ino, e := range stats {
if e.space == 0 && e.inodes == 0 {
continue
}
err := m.en.doUpdateDirStat(Background, ino, e.space, e.inodes)
if err != nil {
logger.Errorf("update dir stat failed: %v", err)
}
err := m.en.doUpdateDirStat(Background, stats)
if err != nil {
logger.Errorf("update dir stat failed: %v", err)
}
}

Expand Down
70 changes: 47 additions & 23 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2242,44 +2242,68 @@ func (m *redisMeta) doGetParents(ctx Context, inode Ino) map[Ino]int {
}

func (m *redisMeta) doSyncDirStat(ctx Context, ino Ino) (space, inodes uint64, err error) {
spaceKey := m.dirUsedSpaceKey()
inodesKey := m.dirUsedInodesKey()
field := strconv.FormatUint(uint64(ino), 16)
space, inodes, err = m.calcDirStat(ctx, ino)
if err != nil {
return
}
err = m.txn(ctx, func(tx *redis.Tx) error {
_, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.HSet(ctx, spaceKey, field, space)
pipe.HSet(ctx, inodesKey, field, inodes)
return nil
})
return err
}, spaceKey, inodesKey)
_, err = m.rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.HSetNX(ctx, m.dirUsedSpaceKey(), field, space)
pipe.HSetNX(ctx, m.dirUsedInodesKey(), field, inodes)
return nil
})
return
}

func (m *redisMeta) doUpdateDirStat(ctx Context, ino Ino, space int64, inodes int64) error {
func (m *redisMeta) doUpdateDirStat(ctx Context, batch map[Ino]dirStat) error {
spaceKey := m.dirUsedSpaceKey()
inodesKey := m.dirUsedInodesKey()
field := strconv.FormatUint(uint64(ino), 10)
if !m.rdb.HExists(ctx, spaceKey, field).Val() {
_, _, err := m.doSyncDirStat(ctx, ino)
nonexist := make(map[Ino]bool, 0)
statList := make([]Ino, 0, len(batch))
pipeline := m.rdb.Pipeline()
for ino := range batch {
pipeline.HExists(ctx, spaceKey, strconv.FormatUint(uint64(ino), 10))
statList = append(statList, ino)
}
rets, err := pipeline.Exec(ctx)
if err != nil {
return err
}
return m.txn(ctx, func(tx *redis.Tx) error {
_, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
if space != 0 {
pipe.HIncrBy(ctx, spaceKey, field, space)
}
if inodes != 0 {
pipe.HIncrBy(ctx, inodesKey, field, inodes)
for i, ret := range rets {
if ret.Err() != nil {
return ret.Err()
}
if exist, _ := ret.(*redis.BoolCmd).Result(); !exist {
nonexist[statList[i]] = true
}
}
if len(nonexist) > 0 {
wg := m.parallelSyncDirStat(ctx, nonexist)
defer wg.Wait()
}

for _, group := range m.groupBatch(batch, 1000) {
_, err := m.rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
for _, ino := range group {
field := strconv.FormatUint(uint64(ino), 10)
if nonexist[ino] {
continue
}
stat := batch[ino]
if stat.space != 0 {
pipe.HIncrBy(ctx, spaceKey, field, stat.space)
}
if stat.inodes != 0 {
pipe.HIncrBy(ctx, inodesKey, field, stat.inodes)
}
}
return nil
})
return err
}, spaceKey, inodesKey)
if err != nil {
return err
}
}
return nil
}

func (m *redisMeta) doGetDirStat(ctx Context, ino Ino) (space, inodes uint64, err error) {
Expand Down
37 changes: 27 additions & 10 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2256,7 +2256,7 @@ func (m *dbMeta) doGetParents(ctx Context, inode Ino) map[Ino]int {
return ps
}

func (m *dbMeta) doUpdateDirStat(ctx Context, ino Ino, space int64, inodes int64) error {
func (m *dbMeta) doUpdateDirStat(ctx Context, batch map[Ino]dirStat) error {
table := m.db.GetTableMapper().Obj2Table("dirStats")
usedSpaceColumn := m.db.GetColumnMapper().Obj2Table("UsedSpace")
usedInodeColumn := m.db.GetColumnMapper().Obj2Table("UsedInodes")
Expand All @@ -2266,19 +2266,36 @@ func (m *dbMeta) doUpdateDirStat(ctx Context, ino Ino, space int64, inodes int64
usedSpaceColumn, usedSpaceColumn,
usedInodeColumn, usedInodeColumn,
)
var affected int64
err := m.txn(func(s *xorm.Session) error {
ret, err := s.Exec(sql, space, inodes, ino)

nonexist := make(map[Ino]bool, 0)

for _, group := range m.groupBatch(batch, 1000) {
err := m.txn(func(s *xorm.Session) error {
for _, ino := range group {
stat := batch[ino]
ret, err := s.Exec(sql, stat.space, stat.inodes, ino)
if err != nil {
return err
}
affected, err := ret.RowsAffected()
if err != nil {
return err
}
if affected == 0 {
nonexist[ino] = true
}
}
return nil
})
if err != nil {
return err
}
affected, err = ret.RowsAffected()
return err
})
if err == nil && affected == 0 {
_, _, err = m.doSyncDirStat(ctx, ino)
}
return err

if len(nonexist) > 0 {
m.parallelSyncDirStat(ctx, nonexist).Wait()
}
return nil
}

func (m *dbMeta) doSyncDirStat(ctx Context, ino Ino) (space, inodes uint64, err error) {
Expand Down
71 changes: 48 additions & 23 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -1955,37 +1955,62 @@ func (m *kvMeta) doSyncDirStat(ctx Context, ino Ino) (space, inodes uint64, err
if err != nil {
return
}
err = m.txn(func(tx *kvTxn) error {

if m.conf.ReadOnly {
err = syscall.EROFS
return
}
err = m.client.txn(func(tx *kvTxn) error {
tx.set(m.dirStatKey(ino), m.packDirStat(space, inodes))
return nil
})
if eno, ok := err.(syscall.Errno); ok && eno == 0 {
err = nil
}
if err != nil && m.shouldRetry(err) {
// other clients have synced
err = nil
}
return
}

var errEmptyStat = errors.New("empty stat")
func (m *kvMeta) doUpdateDirStat(ctx Context, batch map[Ino]dirStat) error {
syncMap := make(map[Ino]bool, 0)
for _, group := range m.groupBatch(batch, 20) {
err := m.txn(func(tx *kvTxn) error {
keys := make([][]byte, 0, len(group))
for _, ino := range group {
keys = append(keys, m.dirStatKey(ino))
}
for i, rawStat := range tx.gets(keys...) {
ino := group[i]
if rawStat == nil {
syncMap[ino] = true
continue
}
us, ui := m.parseDirStat(rawStat)
usedSpace, usedInodes := int64(us), int64(ui)
stat := batch[ino]
usedSpace += stat.space
usedInodes += stat.inodes
if usedSpace < 0 || usedInodes < 0 {
logger.Warnf("dir stat of inode %d is invalid: space %d, inodes %d, try to sync", ino, usedSpace, usedInodes)
syncMap[ino] = true
continue
}
tx.set(keys[i], m.packDirStat(uint64(usedSpace), uint64(usedInodes)))
}
return nil
})
if err != nil {
return err
}
}

func (m *kvMeta) doUpdateDirStat(ctx Context, ino Ino, space int64, inodes int64) error {
err := m.txn(func(tx *kvTxn) error {
key := m.dirStatKey(ino)
rawStat := tx.get(key)
if rawStat == nil {
return errEmptyStat
}
us, ui := m.parseDirStat(rawStat)
usedSpace, usedInodes := int64(us), int64(ui)
usedSpace += space
usedInodes += inodes
if usedSpace < 0 || usedInodes < 0 {
logger.Warnf("dir stat of inode %d is invalid: space %d, inodes %d, try to sync", ino, usedSpace, usedInodes)
return errEmptyStat
}
tx.set(key, m.packDirStat(uint64(usedSpace), uint64(usedInodes)))
return nil
})
if err == errEmptyStat {
_, _, err = m.doSyncDirStat(ctx, ino)
if len(syncMap) > 0 {
m.parallelSyncDirStat(ctx, syncMap).Wait()
}
return err
return nil
}

func (m *kvMeta) doGetDirStat(ctx Context, ino Ino) (space, inodes uint64, err error) {
Expand Down

0 comments on commit 7162385

Please sign in to comment.