Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flush dir stat in less transactions #3277

Merged
merged 12 commits into from
Mar 3, 2023
56 changes: 43 additions & 13 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ type engine interface {
doRepair(ctx Context, inode Ino, attr *Attr) syscall.Errno

doGetParents(ctx Context, inode Ino) map[Ino]int
doUpdateDirStat(ctx Context, ino Ino, space int64, inodes int64) error
doUpdateDirStat(ctx Context, batch map[Ino]dirStat) error
doGetDirStat(ctx Context, ino Ino) (space, inodes uint64, err error)
doSyncDirStat(ctx Context, ino Ino) (space, inodes uint64, err error)

scanTrashSlices(Context, trashSliceScan) error
scanPendingSlices(Context, pendingSliceScan) error
Expand Down Expand Up @@ -242,6 +243,40 @@ func (m *baseMeta) checkRoot(inode Ino) Ino {
}
}

func (m *baseMeta) parallelSyncDirStat(ctx Context, inos map[Ino]bool) *sync.WaitGroup {
var wg sync.WaitGroup
for i := range inos {
wg.Add(1)
go func(ino Ino) {
defer wg.Done()
_, _, err := m.en.doSyncDirStat(ctx, ino)
if err != nil {
logger.Warnf("sync dir stat for %d: %s", ino, err)
}
}(i)
}
return &wg
}

func (m *baseMeta) groupBatch(batch map[Ino]dirStat, size int) [][]Ino {
var inos []Ino
for ino := range batch {
inos = append(inos, ino)
}
sort.Slice(inos, func(i, j int) bool {
return inos[i] < inos[j]
})
var batches [][]Ino
for i := 0; i < len(inos); i += size {
davies marked this conversation as resolved.
Show resolved Hide resolved
end := i + size
if end > len(inos) {
end = len(inos)
}
batches = append(batches, inos[i:end])
}
return batches
}

func (m *baseMeta) calcDirStat(ctx Context, ino Ino) (space, inodes uint64, err error) {
var entries []*Entry
if eno := m.en.doReaddir(ctx, ino, 1, &entries, -1); eno != 0 {
Expand Down Expand Up @@ -269,10 +304,10 @@ func (m *baseMeta) updateDirStat(ctx Context, ino Ino, space int64, inodes int64
}
m.dirStatsLock.Lock()
defer m.dirStatsLock.Unlock()
event := m.dirStats[ino]
event.space += space
event.inodes += inodes
m.dirStats[ino] = event
stat := m.dirStats[ino]
stat.space += space
stat.inodes += inodes
m.dirStats[ino] = stat
}

func (m *baseMeta) updateParentStat(ctx Context, inode, parent Ino, space int64) {
Expand Down Expand Up @@ -307,14 +342,9 @@ func (m *baseMeta) doFlushDirStat() {
stats := m.dirStats
m.dirStats = make(map[Ino]dirStat)
m.dirStatsLock.Unlock()
for ino, e := range stats {
if e.space == 0 && e.inodes == 0 {
continue
}
err := m.en.doUpdateDirStat(Background, ino, e.space, e.inodes)
if err != nil {
logger.Errorf("update dir stat failed: %v", err)
}
err := m.en.doUpdateDirStat(Background, stats)
if err != nil {
logger.Errorf("update dir stat failed: %v", err)
}
}

Expand Down
70 changes: 47 additions & 23 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2242,44 +2242,68 @@ func (m *redisMeta) doGetParents(ctx Context, inode Ino) map[Ino]int {
}

func (m *redisMeta) doSyncDirStat(ctx Context, ino Ino) (space, inodes uint64, err error) {
spaceKey := m.dirUsedSpaceKey()
inodesKey := m.dirUsedInodesKey()
field := strconv.FormatUint(uint64(ino), 16)
space, inodes, err = m.calcDirStat(ctx, ino)
if err != nil {
return
}
err = m.txn(ctx, func(tx *redis.Tx) error {
_, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.HSet(ctx, spaceKey, field, space)
pipe.HSet(ctx, inodesKey, field, inodes)
return nil
})
return err
}, spaceKey, inodesKey)
_, err = m.rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.HSetNX(ctx, m.dirUsedSpaceKey(), field, space)
pipe.HSetNX(ctx, m.dirUsedInodesKey(), field, inodes)
return nil
})
return
}

func (m *redisMeta) doUpdateDirStat(ctx Context, ino Ino, space int64, inodes int64) error {
func (m *redisMeta) doUpdateDirStat(ctx Context, batch map[Ino]dirStat) error {
spaceKey := m.dirUsedSpaceKey()
inodesKey := m.dirUsedInodesKey()
field := strconv.FormatUint(uint64(ino), 10)
if !m.rdb.HExists(ctx, spaceKey, field).Val() {
_, _, err := m.doSyncDirStat(ctx, ino)
nonexist := make(map[Ino]bool, 0)
statList := make([]Ino, 0, len(batch))
pipeline := m.rdb.Pipeline()
for ino := range batch {
pipeline.HExists(ctx, spaceKey, strconv.FormatUint(uint64(ino), 10))
statList = append(statList, ino)
}
rets, err := pipeline.Exec(ctx)
if err != nil {
return err
}
return m.txn(ctx, func(tx *redis.Tx) error {
_, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
if space != 0 {
pipe.HIncrBy(ctx, spaceKey, field, space)
}
if inodes != 0 {
pipe.HIncrBy(ctx, inodesKey, field, inodes)
for i, ret := range rets {
if ret.Err() != nil {
return ret.Err()
}
if exist, _ := ret.(*redis.BoolCmd).Result(); !exist {
nonexist[statList[i]] = true
}
}
if len(nonexist) > 0 {
wg := m.parallelSyncDirStat(ctx, nonexist)
defer wg.Wait()
}

for _, group := range m.groupBatch(batch, 1000) {
davies marked this conversation as resolved.
Show resolved Hide resolved
_, err := m.rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
for _, ino := range group {
field := strconv.FormatUint(uint64(ino), 10)
if nonexist[ino] {
continue
}
stat := batch[ino]
if stat.space != 0 {
pipe.HIncrBy(ctx, spaceKey, field, stat.space)
}
if stat.inodes != 0 {
pipe.HIncrBy(ctx, inodesKey, field, stat.inodes)
}
}
return nil
})
return err
}, spaceKey, inodesKey)
if err != nil {
return err
}
}
return nil
}

func (m *redisMeta) doGetDirStat(ctx Context, ino Ino) (space, inodes uint64, err error) {
Expand Down
37 changes: 27 additions & 10 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2256,7 +2256,7 @@ func (m *dbMeta) doGetParents(ctx Context, inode Ino) map[Ino]int {
return ps
}

func (m *dbMeta) doUpdateDirStat(ctx Context, ino Ino, space int64, inodes int64) error {
func (m *dbMeta) doUpdateDirStat(ctx Context, batch map[Ino]dirStat) error {
table := m.db.GetTableMapper().Obj2Table("dirStats")
usedSpaceColumn := m.db.GetColumnMapper().Obj2Table("UsedSpace")
usedInodeColumn := m.db.GetColumnMapper().Obj2Table("UsedInodes")
Expand All @@ -2266,19 +2266,36 @@ func (m *dbMeta) doUpdateDirStat(ctx Context, ino Ino, space int64, inodes int64
usedSpaceColumn, usedSpaceColumn,
usedInodeColumn, usedInodeColumn,
)
var affected int64
err := m.txn(func(s *xorm.Session) error {
ret, err := s.Exec(sql, space, inodes, ino)

nonexist := make(map[Ino]bool, 0)

for _, group := range m.groupBatch(batch, 1000) {
err := m.txn(func(s *xorm.Session) error {
for _, ino := range group {
stat := batch[ino]
ret, err := s.Exec(sql, stat.space, stat.inodes, ino)
if err != nil {
return err
}
affected, err := ret.RowsAffected()
if err != nil {
return err
}
if affected == 0 {
nonexist[ino] = true
}
}
return nil
})
if err != nil {
return err
}
affected, err = ret.RowsAffected()
return err
})
if err == nil && affected == 0 {
_, _, err = m.doSyncDirStat(ctx, ino)
}
return err

if len(nonexist) > 0 {
m.parallelSyncDirStat(ctx, nonexist).Wait()
}
return nil
}

func (m *dbMeta) doSyncDirStat(ctx Context, ino Ino) (space, inodes uint64, err error) {
Expand Down
71 changes: 48 additions & 23 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -1950,37 +1950,62 @@ func (m *kvMeta) doSyncDirStat(ctx Context, ino Ino) (space, inodes uint64, err
if err != nil {
return
}
err = m.txn(func(tx *kvTxn) error {

if m.conf.ReadOnly {
err = syscall.EROFS
return
}
err = m.client.txn(func(tx *kvTxn) error {
tx.set(m.dirStatKey(ino), m.packDirStat(space, inodes))
return nil
})
if eno, ok := err.(syscall.Errno); ok && eno == 0 {
err = nil
}
if err != nil && m.shouldRetry(err) {
// other clients have synced
err = nil
}
return
}

var errEmptyStat = errors.New("empty stat")
func (m *kvMeta) doUpdateDirStat(ctx Context, batch map[Ino]dirStat) error {
syncMap := make(map[Ino]bool, 0)
for _, group := range m.groupBatch(batch, 20) {
err := m.txn(func(tx *kvTxn) error {
keys := make([][]byte, 0, len(group))
for _, ino := range group {
keys = append(keys, m.dirStatKey(ino))
}
for i, rawStat := range tx.gets(keys...) {
ino := group[i]
if rawStat == nil {
syncMap[ino] = true
continue
}
us, ui := m.parseDirStat(rawStat)
usedSpace, usedInodes := int64(us), int64(ui)
stat := batch[ino]
usedSpace += stat.space
usedInodes += stat.inodes
if usedSpace < 0 || usedInodes < 0 {
logger.Warnf("dir stat of inode %d is invalid: space %d, inodes %d, try to sync", ino, usedSpace, usedInodes)
syncMap[ino] = true
continue
}
tx.set(keys[i], m.packDirStat(uint64(usedSpace), uint64(usedInodes)))
}
return nil
})
if err != nil {
return err
}
}

func (m *kvMeta) doUpdateDirStat(ctx Context, ino Ino, space int64, inodes int64) error {
err := m.txn(func(tx *kvTxn) error {
key := m.dirStatKey(ino)
rawStat := tx.get(key)
if rawStat == nil {
return errEmptyStat
}
us, ui := m.parseDirStat(rawStat)
usedSpace, usedInodes := int64(us), int64(ui)
usedSpace += space
usedInodes += inodes
if usedSpace < 0 || usedInodes < 0 {
logger.Warnf("dir stat of inode %d is invalid: space %d, inodes %d, try to sync", ino, usedSpace, usedInodes)
return errEmptyStat
}
tx.set(key, m.packDirStat(uint64(usedSpace), uint64(usedInodes)))
return nil
})
if err == errEmptyStat {
_, _, err = m.doSyncDirStat(ctx, ino)
if len(syncMap) > 0 {
m.parallelSyncDirStat(ctx, syncMap).Wait()
}
return err
return nil
}

func (m *kvMeta) doGetDirStat(ctx Context, ino Ino) (space, inodes uint64, err error) {
Expand Down