Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/clone: clean the detached nodes in the gc command #3366

Merged
merged 11 commits into from
Mar 28, 2023
8 changes: 6 additions & 2 deletions cmd/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ $ juicefs gc redis://localhost
# Trigger compaction of all slices
$ juicefs gc redis://localhost --compact

# Delete leaked objects and delayed deleted slices or files
# Delete leaked objects or metadata and delayed deleted slices or files
$ juicefs gc redis://localhost --delete`,
Flags: []cli.Flag{
&cli.BoolFlag{
Expand All @@ -61,7 +61,7 @@ $ juicefs gc redis://localhost --delete`,
},
&cli.BoolFlag{
Name: "delete",
Usage: "delete leaked objects and delayed deleted slices or files",
Usage: "delete leaked objects or metadata and delayed deleted slices or files",
},
&cli.IntFlag{
Name: "threads",
Expand Down Expand Up @@ -144,6 +144,10 @@ func gc(ctx *cli.Context) error {
cleanTrashSpin := progress.AddCountSpinner("Cleaned trash")
m.CleanupTrashBefore(c, edge, cleanTrashSpin.Increment)
cleanTrashSpin.Done()

cleanDetachedNodeSpin := progress.AddCountSpinner("Cleaned detached nodes")
m.CleanupDetachedNodesBefore(c, time.Now().Add(-time.Hour*24), cleanDetachedNodeSpin.Increment)
cleanDetachedNodeSpin.Done()
}

err = m.ScanDeletedObject(
Expand Down
14 changes: 14 additions & 0 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type engine interface {
doCleanupSlices()
doCleanupDelayedSlices(edge int64) (int, error)
doDeleteSlice(id uint64, size uint32) error
doFindDetachedNodes(t time.Time) []Ino
zhijian-pro marked this conversation as resolved.
Show resolved Hide resolved
doCleanupDetachedNode(ctx Context, detachedNode Ino) syscall.Errno

doGetQuota(ctx Context, inode Ino) (*Quota, error)
doSetQuota(ctx Context, inode Ino, quota *Quota, create bool) error
Expand Down Expand Up @@ -1920,6 +1922,18 @@ func (m *baseMeta) cleanupTrash() {
}
}

func (m *baseMeta) CleanupDetachedNodesBefore(ctx Context, edge time.Time, increProgress func()) {
for _, inode := range m.en.doFindDetachedNodes(edge) {
if eno := m.en.doCleanupDetachedNode(Background, inode); eno != 0 {
logger.Errorf("cleanupDetachedNode: remove detached tree (%d) error: %s", inode, eno)
} else {
if increProgress != nil {
increProgress()
}
}
}
}

func (m *baseMeta) CleanupTrashBefore(ctx Context, edge time.Time, increProgress func()) {
logger.Debugf("cleanup trash: started")
now := time.Now()
Expand Down
2 changes: 2 additions & 0 deletions pkg/meta/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ type Meta interface {
CleanStaleSessions()
// CleanupTrashBefore deletes all files in trash before the given time.
CleanupTrashBefore(ctx Context, edge time.Time, increProgress func())
// CleanupDetachedNodesBefore deletes all detached nodes before the given time.
CleanupDetachedNodesBefore(ctx Context, edge time.Time, increProgress func())

// StatFS returns summary statistics of a volume.
StatFS(ctx Context, totalspace, availspace, iused, iavail *uint64) syscall.Errno
Expand Down
66 changes: 45 additions & 21 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
locked: locked$sid -> { lockf$inode or lockp$inode }

Removed files: delfiles -> [$inode:$length -> seconds]
detached nodes: detachedNodes -> [$inode -> seconds]
Slices refs: k$sliceId_$size -> refcount

Dir data length: dirDataLength -> { $inode -> length }
Expand Down Expand Up @@ -578,6 +579,10 @@ func (m *redisMeta) delfiles() string {
return m.prefix + "delfiles"
}

func (m *redisMeta) detachedNodes() string {
return m.prefix + "detachedNodes"
}

func (r *redisMeta) delSlices() string {
return r.prefix + "delSlices"
}
Expand Down Expand Up @@ -4055,28 +4060,10 @@ func (m *redisMeta) Clone(ctx Context, srcIno, dstParentIno Ino, dstName string,
}
}
// delete the dst tree if clone failed
if eno != 0 || err != nil {
// todo: store dstIno and delete it in the background
attr := &Attr{}
eno := m.doGetAttr(ctx, dstIno, attr)
if eno == syscall.ENOENT {
return cloneEno
}
if eno != 0 {
logger.Errorf("clone: remove tree error rootInode %v", dstIno)
return eno
if cloneEno != 0 {
if eno := m.doCleanupDetachedNode(ctx, dstIno); eno != 0 {
logger.Errorf("doCleanupDetachedNode: remove detached tree (%d) error: %s", dstIno, eno)
}
rmConcurrent := make(chan int, 10)
if eno := m.emptyDir(ctx, dstIno, true, nil, rmConcurrent); eno != 0 {
logger.Errorf("clone: remove tree error rootInode %v", dstIno)
return eno
}

return errno(m.txn(ctx, func(tx *redis.Tx) error {
tx.Del(ctx, m.inodeKey(dstIno))
tx.Del(ctx, m.xattrKey(dstIno))
return nil
}, m.inodeKey(dstIno), m.xattrKey(dstIno)))
}
} else {
cloneEno = m.cloneEntry(ctx, srcIno, srcAttr.Typ, dstParentIno, dstName, &dstIno, cmode, cumask, count, total, true, concurrent)
Expand Down Expand Up @@ -4299,6 +4286,9 @@ func (m *redisMeta) mkNodeWithAttr(ctx Context, tx *redis.Tx, srcIno Ino, srcAtt

_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Set(ctx, m.inodeKey(*dstIno), m.marshal(srcAttr), 0)
if !attach {
pipe.ZAdd(ctx, m.detachedNodes(), redis.Z{Member: dstIno.String(), Score: float64(time.Now().Unix())})
zhijian-pro marked this conversation as resolved.
Show resolved Hide resolved
}
if len(srcXattr) > 0 {
pipe.HMSet(ctx, m.xattrKey(*dstIno), srcXattr)
}
Expand All @@ -4311,3 +4301,37 @@ func (m *redisMeta) mkNodeWithAttr(ctx Context, tx *redis.Tx, srcIno Ino, srcAtt
})
return err
}

func (m *redisMeta) doCleanupDetachedNode(ctx Context, detachedNode Ino) syscall.Errno {
exists, err := m.rdb.Exists(ctx, m.inodeKey(detachedNode)).Result()
if err != nil {
return errno(err)
}
if exists == 1 {
rmConcurrent := make(chan int, 10)
if eno := m.emptyDir(ctx, detachedNode, true, nil, rmConcurrent); eno != 0 {
return eno
}
if err := m.txn(ctx, func(tx *redis.Tx) error {
davies marked this conversation as resolved.
Show resolved Hide resolved
tx.Del(ctx, m.inodeKey(detachedNode))
tx.Del(ctx, m.xattrKey(detachedNode))
return nil
}, m.inodeKey(detachedNode), m.xattrKey(detachedNode)); err != nil {
return errno(err)
}
}
return errno(m.rdb.ZRem(ctx, m.detachedNodes(), detachedNode.String()).Err())
}

func (m *redisMeta) doFindDetachedNodes(t time.Time) []Ino {
var detachedInos []Ino
detachedNodes, err := m.rdb.ZRangeByScore(Background, m.detachedNodes(), &redis.ZRangeBy{Max: strconv.FormatInt(t.Add(-time.Hour*24).Unix(), 10)}).Result()
if err != nil {
logger.Errorf("Scan detached nodes error: %s", err)
}
for _, node := range detachedNodes {
inode, _ := strconv.ParseUint(node, 10, 64)
detachedInos = append(detachedInos, Ino(inode))
}
return detachedInos
}
92 changes: 64 additions & 28 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ type dirStats struct {
UsedInodes int64 `xorm:"notnull"`
}

type detachedNode struct {
Inode Ino `xorm:"pk notnull"`
Expire int64 `xorm:"notnull"`
zhijian-pro marked this conversation as resolved.
Show resolved Hide resolved
}

type dirQuota struct {
Inode Ino `xorm:"pk"`
MaxSpace int64 `xorm:"notnull"`
Expand Down Expand Up @@ -294,6 +299,9 @@ func (m *dbMeta) Init(format *Format, force bool) error {
if err := m.syncTable(new(dirStats)); err != nil {
return fmt.Errorf("create table dirStats: %s", err)
}
if err := m.syncTable(new(detachedNode)); err != nil {
return fmt.Errorf("create table detachedNode: %s", err)
}

var s = setting{Name: "format"}
var ok bool
Expand Down Expand Up @@ -377,7 +385,7 @@ func (m *dbMeta) Reset() error {
&node{}, &edge{}, &symlink{}, &xattr{},
&chunk{}, &sliceRef{}, &delslices{},
&session{}, &session2{}, &sustained{}, &delfile{},
&flock{}, &plock{}, &dirStats{}, &dirQuota{})
&flock{}, &plock{}, &dirStats{}, &dirQuota{}, &detachedNode{})
}

func (m *dbMeta) doLoad() (data []byte, err error) {
Expand All @@ -399,9 +407,9 @@ func (m *dbMeta) doLoad() (data []byte, err error) {

func (m *dbMeta) doNewSession(sinfo []byte) error {
// add new table
err := m.syncTable(new(session2), new(delslices), new(dirStats), new(dirQuota))
err := m.syncTable(new(session2), new(delslices), new(dirStats), new(detachedNode), new(dirQuota))
if err != nil {
return fmt.Errorf("update table session2, delslices, dirstats, dirQuota: %s", err)
return fmt.Errorf("update table session2, delslices, dirstats, detachedNode, dirQuota: %s", err)
}
// add primary key
if err = m.syncTable(new(edge), new(chunk), new(xattr), new(sustained)); err != nil {
Expand Down Expand Up @@ -3555,7 +3563,9 @@ func (m *dbMeta) LoadMeta(r io.Reader) error {
if err := m.syncTable(new(dirStats)); err != nil {
return fmt.Errorf("create table dirStats: %s", err)
}

if err := m.syncTable(new(detachedNode)); err != nil {
return fmt.Errorf("create table detachedNode: %s", err)
}
var batch int
switch m.Name() {
case "sqlite3":
Expand Down Expand Up @@ -3733,31 +3743,10 @@ func (m *dbMeta) Clone(ctx Context, srcIno, dstParentIno Ino, dstName string, cm
}
}
// delete the dst tree if clone failed
if eno != 0 || err != nil {
// todo: store dstIno and delete it in the background
attr := &Attr{}
eno := m.doGetAttr(ctx, dstIno, attr)
if eno == syscall.ENOENT {
return cloneEno
}
if eno != 0 {
logger.Errorf("clone: remove tree error rootInode %v", dstIno)
return eno
if cloneEno != 0 {
if eno = m.doCleanupDetachedNode(ctx, dstIno); eno != 0 {
logger.Errorf("doCleanupDetachedNode: remove detached tree (%d) error: %s", dstIno, eno)
}
rmConcurrent := make(chan int, 10)
if eno := m.emptyDir(ctx, dstIno, true, nil, rmConcurrent); eno != 0 {
logger.Errorf("clone: remove tree error rootInode %v", dstIno)
return eno
}
return errno(m.txn(func(s *xorm.Session) error {
if _, err := s.Delete(&node{Inode: dstIno}); err != nil {
return err
}
if _, err := s.Delete(&xattr{Inode: dstIno}); err != nil {
return err
}
return nil
}))
}
} else {
cloneEno = m.cloneEntry(ctx, srcIno, srcAttr.Typ, dstParentIno, dstName, &dstIno, cmode, cumask, count, total, true, concurrent)
Expand Down Expand Up @@ -4007,9 +3996,56 @@ func (m *dbMeta) mkNodeWithAttr(ctx Context, s *xorm.Session, srcIno Ino, srcNod
}
}

if !attach {
return mustInsert(s, &detachedNode{Inode: *dstIno, Expire: time.Now().Unix()})
}
if attach {
// set edge
return mustInsert(s, &edge{Parent: dstParentIno, Name: []byte(dstName), Inode: *dstIno, Type: srcNode.Type})
}
return nil
}

func (m *dbMeta) doFindDetachedNodes(t time.Time) []Ino {
var detachedNodes []Ino
if err := m.roTxn(func(s *xorm.Session) error {
var nodes []detachedNode
err := s.Where("expire < ?", t).Find(&nodes)
if err != nil {
return err
}
for _, n := range nodes {
detachedNodes = append(detachedNodes, n.Inode)
}
return nil
}); err != nil {
logger.Errorf("Scan detached nodes error: %s", err)
}
return detachedNodes
}

func (m *dbMeta) doCleanupDetachedNode(ctx Context, detachedNode Ino) syscall.Errno {
exist, err := m.db.Exist(&node{Inode: detachedNode})
if err != nil {
return errno(err)
}
if exist {
rmConcurrent := make(chan int, 10)
if eno := m.emptyDir(ctx, detachedNode, true, nil, rmConcurrent); eno != 0 {
return eno
}
if err := m.txn(func(s *xorm.Session) error {
if _, err := s.Delete(&node{Inode: detachedNode}); err != nil {
return err
}
if _, err := s.Delete(&xattr{Inode: detachedNode}); err != nil {
return err
}
return nil
}); err != nil {
return errno(err)
}
}
_, err = m.db.Delete(&node{Inode: detachedNode})
zhijian-pro marked this conversation as resolved.
Show resolved Hide resolved
return errno(err)
}
Loading