Skip to content

Commit

Permalink
refactor and do not update dir stats in txn
Browse files Browse the repository at this point in the history
  • Loading branch information
zhijian-pro committed Mar 23, 2023
1 parent 1c3e662 commit df8dd0d
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 219 deletions.
58 changes: 58 additions & 0 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type engine interface {
doCleanupDelayedSlices(edge int64) (int, error)
doDeleteSlice(id uint64, size uint32) error
doFindDetachedNodes(t time.Time) []Ino
doCloneEntry(ctx Context, srcIno Ino, srcType uint8, dstParentIno Ino, dstName string, dstIno *Ino, cmode uint8, cumask uint16, count, total *uint64, attach bool, concurrent chan struct{}) syscall.Errno
doAttachDirNode(ctx Context, dstParentIno Ino, dstIno Ino, dstName string) syscall.Errno
doCleanupDetachedNode(ctx Context, detachedNode Ino) syscall.Errno

doGetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno
Expand All @@ -85,6 +87,7 @@ type engine interface {
doSetXattr(ctx Context, inode Ino, name string, value []byte, flags uint32) syscall.Errno
doRemoveXattr(ctx Context, inode Ino, name string) syscall.Errno
doRepair(ctx Context, inode Ino, attr *Attr) syscall.Errno
doCheckEdgeExist(ctx Context, parent Ino, name string) (bool, error)

doGetParents(ctx Context, inode Ino) map[Ino]int
doUpdateDirStat(ctx Context, batch map[Ino]dirStat) error
Expand Down Expand Up @@ -1732,3 +1735,58 @@ func (m *baseMeta) ScanDeletedObject(ctx Context, tss trashSliceScan, pss pendin
}
return eg.Wait()
}

func (m *baseMeta) Clone(ctx Context, srcIno, dstParentIno Ino, dstName string, cmode uint8, cumask uint16, count, total *uint64) syscall.Errno {
srcAttr := &Attr{}
var eno syscall.Errno
if eno = m.en.doGetAttr(ctx, srcIno, srcAttr); eno != 0 {
return eno
}
// total should start from 1
*total = 1
var dstIno Ino
var err error
var cloneEno syscall.Errno
concurrent := make(chan struct{}, 4)
if srcAttr.Typ == TypeDirectory {
// check dst edge
var edgeExist bool
edgeExist, err = m.en.doCheckEdgeExist(ctx, dstParentIno, dstName)
if err != nil {
return errno(err)
}
if edgeExist {
return syscall.EEXIST
}

eno = m.en.doCloneEntry(ctx, srcIno, TypeDirectory, dstParentIno, dstName, &dstIno, cmode, cumask, count, total, false, concurrent)
if eno != 0 {
cloneEno = eno
}
if eno == 0 {
edgeExist, err = m.en.doCheckEdgeExist(ctx, dstParentIno, dstName)
if err != nil {
return errno(err)
}
if edgeExist {
return syscall.EEXIST
}
if eno := m.en.doAttachDirNode(ctx, dstParentIno, dstIno, dstName); eno != 0 {
cloneEno = eno
} else {
newSpace := align4K(0)
m.en.updateStats(newSpace, 1)
m.updateDirStat(ctx, dstParentIno, 0, newSpace, 1)
}
}
// delete the dst tree if clone failed
if cloneEno != 0 {
if eno = m.en.doCleanupDetachedNode(ctx, dstIno); eno != 0 {
logger.Errorf("doCleanupDetachedNode: remove detached tree (%d) error: %s", dstIno, eno)
}
}
} else {
cloneEno = m.en.doCloneEntry(ctx, srcIno, srcAttr.Typ, dstParentIno, dstName, &dstIno, cmode, cumask, count, total, true, concurrent)
}
return cloneEno
}
94 changes: 28 additions & 66 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3838,69 +3838,7 @@ func (m *redisMeta) LoadMeta(r io.Reader) (err error) {
return err
}

func (m *redisMeta) Clone(ctx Context, srcIno, dstParentIno Ino, dstName string, cmode uint8, cumask uint16, count, total *uint64) syscall.Errno {
srcAttr := &Attr{}
var eno syscall.Errno
if eno = m.doGetAttr(ctx, srcIno, srcAttr); eno != 0 {
return eno
}
// total should start from 1
*total = 1
var dstIno Ino
var err error
var cloneEno syscall.Errno
concurrent := make(chan struct{}, 4)
if srcAttr.Typ == TypeDirectory {
// check dst edge
if m.rdb.HExists(ctx, m.entryKey(dstParentIno), dstName).Val() {
return syscall.EEXIST
}
eno = m.cloneEntry(ctx, srcIno, TypeDirectory, dstParentIno, dstName, &dstIno, cmode, cumask, count, total, false, concurrent)
if eno != 0 {
cloneEno = eno
}
if eno == 0 {
err = m.txn(ctx, func(tx *redis.Tx) error {
// check dst edge again
if tx.HExists(ctx, m.entryKey(dstParentIno), dstName).Val() {
// fixme: maybe return os.ErrExist rather than syscall.errno
return syscall.EEXIST
}
if err = tx.HSet(ctx, m.entryKey(dstParentIno), dstName, m.packEntry(TypeDirectory, dstIno)).Err(); err != nil {
return err
}

newSpace := align4K(0)
tx.IncrBy(ctx, m.usedSpaceKey(), newSpace)
tx.Incr(ctx, m.totalInodesKey())
m.updateStats(newSpace, 1)
m.updateDirStat(ctx, dstParentIno, 0, newSpace, 1)

// update parent nlink
dstParentAttr := &Attr{}
if eno := m.doGetAttr(ctx, dstParentIno, dstParentAttr); eno != 0 {
return eno
}
dstParentAttr.Nlink++
return m.rdb.Set(ctx, m.inodeKey(dstParentIno), m.marshal(dstParentAttr), 0).Err()
}, m.entryKey(dstParentIno))
if err != nil {
cloneEno = errno(err)
}
}
// delete the dst tree if clone failed
if cloneEno != 0 {
if eno := m.doCleanupDetachedNode(ctx, dstIno); eno != 0 {
logger.Errorf("doCleanupDetachedNode: remove detached tree (%d) error: %s", dstIno, eno)
}
}
} else {
cloneEno = m.cloneEntry(ctx, srcIno, srcAttr.Typ, dstParentIno, dstName, &dstIno, cmode, cumask, count, total, true, concurrent)
}
return cloneEno
}

func (m *redisMeta) cloneEntry(ctx Context, srcIno Ino, srcType uint8, dstParentIno Ino, dstName string, dstIno *Ino, cmode uint8, cumask uint16, count, total *uint64, attach bool, concurrent chan struct{}) syscall.Errno {
func (m *redisMeta) doCloneEntry(ctx Context, srcIno Ino, srcType uint8, dstParentIno Ino, dstName string, dstIno *Ino, cmode uint8, cumask uint16, count, total *uint64, attach bool, concurrent chan struct{}) syscall.Errno {
var err error
switch srcType {
case TypeDirectory:
Expand Down Expand Up @@ -3950,7 +3888,7 @@ func (m *redisMeta) cloneEntry(ctx Context, srcIno Ino, srcType uint8, dstParent
go func(srcIno Ino, dstName string) {
defer wg.Done()
var dstIno2 Ino
eno := m.cloneEntry(ctx, srcIno, TypeDirectory, *dstIno, dstName, &dstIno2, cmode, cumask, count, total, true, concurrent)
eno := m.en.doCloneEntry(ctx, srcIno, TypeDirectory, *dstIno, dstName, &dstIno2, cmode, cumask, count, total, true, concurrent)
if eno == 0 {
atomic.AddUint32(&countDir, 1)
}
Expand All @@ -3961,7 +3899,7 @@ func (m *redisMeta) cloneEntry(ctx Context, srcIno Ino, srcType uint8, dstParent
}(entry.Inode, string(entry.Name))
default:
var dstIno2 Ino
if eno := m.cloneEntry(ctx, entry.Inode, TypeDirectory, *dstIno, string(entry.Name), &dstIno2, cmode, cumask, count, total, true, concurrent); eno != 0 {
if eno := m.en.doCloneEntry(ctx, entry.Inode, TypeDirectory, *dstIno, string(entry.Name), &dstIno2, cmode, cumask, count, total, true, concurrent); eno != 0 {
return eno
} else {
atomic.AddUint32(&countDir, 1)
Expand All @@ -3970,7 +3908,7 @@ func (m *redisMeta) cloneEntry(ctx Context, srcIno Ino, srcType uint8, dstParent

} else {
var dstIno2 Ino
if eno := m.cloneEntry(ctx, entry.Inode, entry.Attr.Typ, *dstIno, string(entry.Name), &dstIno2, cmode, cumask, count, total, true, concurrent); eno != 0 {
if eno := m.en.doCloneEntry(ctx, entry.Inode, entry.Attr.Typ, *dstIno, string(entry.Name), &dstIno2, cmode, cumask, count, total, true, concurrent); eno != 0 {
return eno
}
}
Expand Down Expand Up @@ -4131,6 +4069,30 @@ func (m *redisMeta) mkNodeWithAttr(ctx Context, tx *redis.Tx, srcIno Ino, srcAtt
return err
}

func (m *redisMeta) doCheckEdgeExist(ctx Context, parent Ino, name string) (exist bool, err error) {
return m.rdb.HExists(ctx, m.entryKey(parent), name).Result()
}

func (m *redisMeta) doAttachDirNode(ctx Context, dstParentIno Ino, dstIno Ino, dstName string) syscall.Errno {
return errno(m.txn(ctx, func(tx *redis.Tx) error {
if err := tx.HSet(ctx, m.entryKey(dstParentIno), dstName, m.packEntry(TypeDirectory, dstIno)).Err(); err != nil {
return err
}

newSpace := align4K(0)
tx.IncrBy(ctx, m.usedSpaceKey(), newSpace)
tx.Incr(ctx, m.totalInodesKey())

// update parent nlink
dstParentAttr := &Attr{}
if eno := m.doGetAttr(ctx, dstParentIno, dstParentAttr); eno != 0 {
return eno
}
dstParentAttr.Nlink++
return m.rdb.Set(ctx, m.inodeKey(dstParentIno), m.marshal(dstParentAttr), 0).Err()
}, m.entryKey(dstParentIno)))
}

func (m *redisMeta) doCleanupDetachedNode(ctx Context, detachedNode Ino) syscall.Errno {
exists, err := m.rdb.Exists(ctx, m.inodeKey(detachedNode)).Result()
if err != nil {
Expand Down
115 changes: 34 additions & 81 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -3572,84 +3572,7 @@ func isDuplicateEntryErr(err error) bool {
return false
}

func (m *dbMeta) Clone(ctx Context, srcIno, dstParentIno Ino, dstName string, cmode uint8, cumask uint16, count, total *uint64) syscall.Errno {
srcAttr := &Attr{}
var eno syscall.Errno
if eno = m.doGetAttr(ctx, srcIno, srcAttr); eno != 0 {
return eno
}
// total should start from 1
*total = 1
var dstIno Ino
var err error
var cloneEno syscall.Errno
concurrent := make(chan struct{}, 4)
if srcAttr.Typ == TypeDirectory {
// check dst edge
var exist bool
if err := m.roTxn(func(s *xorm.Session) error {
exist, err = s.Get(&edge{Parent: dstParentIno, Name: []byte(dstName)})
return err
}); err != nil {
return errno(err)
}
if exist {
return syscall.EEXIST
}

eno = m.cloneEntry(ctx, srcIno, TypeDirectory, dstParentIno, dstName, &dstIno, cmode, cumask, count, total, false, concurrent)
if eno != 0 {
cloneEno = eno
}
if eno == 0 {
err = m.txn(func(s *xorm.Session) error {
// check dst edge again
if exist, err = s.Get(&edge{Parent: dstParentIno, Name: []byte(dstName)}); err != nil {
return errno(err)
} else if exist {
return syscall.EEXIST
}

if err = mustInsert(s, &edge{Parent: dstParentIno, Name: []byte(dstName), Inode: dstIno, Type: TypeDirectory}); err != nil {
if isDuplicateEntryErr(err) {
return syscall.EEXIST
}
return err
}

// update parent nlink
var n = node{Inode: dstParentIno}
ok, err := s.ForUpdate().Get(&n)
if err == nil {
if ok {
_, err = s.Cols("nlink").Update(&node{Nlink: n.Nlink + 1}, &node{Inode: dstParentIno})
} else {
err = syscall.ENOENT
}
}
return err
}, dstParentIno)
if err != nil {
cloneEno = errno(err)
} else {
newSpace := align4K(0)
m.updateStats(newSpace, 1)
m.updateDirStat(ctx, dstParentIno, 0, newSpace, 1)
}
}
// delete the dst tree if clone failed
if cloneEno != 0 {
if eno = m.doCleanupDetachedNode(ctx, dstIno); eno != 0 {
logger.Errorf("doCleanupDetachedNode: remove detached tree (%d) error: %s", dstIno, eno)
}
}
} else {
cloneEno = m.cloneEntry(ctx, srcIno, srcAttr.Typ, dstParentIno, dstName, &dstIno, cmode, cumask, count, total, true, concurrent)
}
return cloneEno
}

func (m *dbMeta) cloneEntry(ctx Context, srcIno Ino, srcType uint8, dstParentIno Ino, dstName string, dstIno *Ino, cmode uint8, cumask uint16, count, total *uint64, attach bool, concurrent chan struct{}) syscall.Errno {
func (m *dbMeta) doCloneEntry(ctx Context, srcIno Ino, srcType uint8, dstParentIno Ino, dstName string, dstIno *Ino, cmode uint8, cumask uint16, count, total *uint64, attach bool, concurrent chan struct{}) syscall.Errno {
var err error
switch srcType {
case TypeDirectory:
Expand Down Expand Up @@ -3702,7 +3625,7 @@ func (m *dbMeta) cloneEntry(ctx Context, srcIno Ino, srcType uint8, dstParentIno
go func(srcIno Ino, dstName string) {
defer wg.Done()
var dstIno2 Ino
eno := m.cloneEntry(ctx, srcIno, TypeDirectory, *dstIno, dstName, &dstIno2, cmode, cumask, count, total, true, concurrent)
eno := m.en.doCloneEntry(ctx, srcIno, TypeDirectory, *dstIno, dstName, &dstIno2, cmode, cumask, count, total, true, concurrent)
if eno == 0 {
atomic.AddUint32(&countDir, 1)
}
Expand All @@ -3713,15 +3636,15 @@ func (m *dbMeta) cloneEntry(ctx Context, srcIno Ino, srcType uint8, dstParentIno
}(entry.Inode, string(entry.Name))
default:
var dstIno2 Ino
if eno := m.cloneEntry(ctx, entry.Inode, TypeDirectory, *dstIno, string(entry.Name), &dstIno2, cmode, cumask, count, total, true, concurrent); eno != 0 {
if eno := m.en.doCloneEntry(ctx, entry.Inode, TypeDirectory, *dstIno, string(entry.Name), &dstIno2, cmode, cumask, count, total, true, concurrent); eno != 0 {
return eno
} else {
atomic.AddUint32(&countDir, 1)
}
}
} else {
var dstIno2 Ino
if eno := m.cloneEntry(ctx, entry.Inode, entry.Attr.Typ, *dstIno, string(entry.Name), &dstIno2, cmode, cumask, count, total, true, concurrent); eno != 0 {
if eno := m.en.doCloneEntry(ctx, entry.Inode, entry.Attr.Typ, *dstIno, string(entry.Name), &dstIno2, cmode, cumask, count, total, true, concurrent); eno != 0 {
return eno
}
}
Expand Down Expand Up @@ -3901,6 +3824,36 @@ func (m *dbMeta) mkNodeWithAttr(ctx Context, s *xorm.Session, srcIno Ino, srcNod
return nil
}

func (m *dbMeta) doCheckEdgeExist(ctx Context, parent Ino, name string) (exist bool, err error) {
err = m.roTxn(func(s *xorm.Session) error {
exist, err = s.Get(&edge{Parent: parent, Name: []byte(name)})
return err
})
return
}

func (m *dbMeta) doAttachDirNode(ctx Context, dstParentIno Ino, dstIno Ino, dstName string) syscall.Errno {
return errno(m.txn(func(s *xorm.Session) error {
// must lock parent node first to avoid deadlock
var n = node{Inode: dstParentIno}
ok, err := s.ForUpdate().Get(&n)
if err == nil {
if ok {
_, err = s.Cols("nlink").Update(&node{Nlink: n.Nlink + 1}, &node{Inode: dstParentIno})
} else {
err = syscall.ENOENT
}
}
if err := mustInsert(s, &edge{Parent: dstParentIno, Name: []byte(dstName), Inode: dstIno, Type: TypeDirectory}); err != nil {
if isDuplicateEntryErr(err) {
return syscall.EEXIST
}
return err
}
return err
}, dstParentIno))
}

func (m *dbMeta) doFindDetachedNodes(t time.Time) []Ino {
var detachedNodes []Ino
if err := m.roTxn(func(s *xorm.Session) error {
Expand Down
Loading

0 comments on commit df8dd0d

Please sign in to comment.