Skip to content

Commit

Permalink
cmd/clone: clean the detached nodes in the gc command (#3366)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhijian-pro authored Mar 28, 2023
1 parent 8b8d8b3 commit c159d9e
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 85 deletions.
8 changes: 6 additions & 2 deletions cmd/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ $ juicefs gc redis://localhost
# Trigger compaction of all slices
$ juicefs gc redis://localhost --compact
# Delete leaked objects and delayed deleted slices or files
# Delete leaked objects or metadata and delayed deleted slices or files
$ juicefs gc redis://localhost --delete`,
Flags: []cli.Flag{
&cli.BoolFlag{
Expand All @@ -61,7 +61,7 @@ $ juicefs gc redis://localhost --delete`,
},
&cli.BoolFlag{
Name: "delete",
Usage: "delete leaked objects and delayed deleted slices or files",
Usage: "delete leaked objects or metadata and delayed deleted slices or files",
},
&cli.IntFlag{
Name: "threads",
Expand Down Expand Up @@ -144,6 +144,10 @@ func gc(ctx *cli.Context) error {
cleanTrashSpin := progress.AddCountSpinner("Cleaned trash")
m.CleanupTrashBefore(c, edge, cleanTrashSpin.Increment)
cleanTrashSpin.Done()

cleanDetachedNodeSpin := progress.AddCountSpinner("Cleaned detached nodes")
m.CleanupDetachedNodesBefore(c, time.Now().Add(-time.Hour*24), cleanDetachedNodeSpin.Increment)
cleanDetachedNodeSpin.Done()
}

err = m.ScanDeletedObject(
Expand Down
14 changes: 14 additions & 0 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type engine interface {
doCleanupSlices()
doCleanupDelayedSlices(edge int64) (int, error)
doDeleteSlice(id uint64, size uint32) error
doFindDetachedNodes(t time.Time) []Ino
doCleanupDetachedNode(ctx Context, detachedNode Ino) syscall.Errno

doGetQuota(ctx Context, inode Ino) (*Quota, error)
doSetQuota(ctx Context, inode Ino, quota *Quota, create bool) error
Expand Down Expand Up @@ -1977,6 +1979,18 @@ func (m *baseMeta) cleanupTrash() {
}
}

func (m *baseMeta) CleanupDetachedNodesBefore(ctx Context, edge time.Time, increProgress func()) {
for _, inode := range m.en.doFindDetachedNodes(edge) {
if eno := m.en.doCleanupDetachedNode(Background, inode); eno != 0 {
logger.Errorf("cleanupDetachedNode: remove detached tree (%d) error: %s", inode, eno)
} else {
if increProgress != nil {
increProgress()
}
}
}
}

func (m *baseMeta) CleanupTrashBefore(ctx Context, edge time.Time, increProgress func()) {
logger.Debugf("cleanup trash: started")
now := time.Now()
Expand Down
38 changes: 33 additions & 5 deletions pkg/meta/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2223,59 +2223,87 @@ func testClone(t *testing.T, m Meta) {
return nil
})
// check remove tree
var dNode1, dNode2, dNode3, dNode4 Ino = 101, 102, 103, 104
switch m := m.(type) {
case *redisMeta:
// del edge first
if err := m.rdb.HDel(Background, m.entryKey(cloneDstAttr.Parent), cloneDstName).Err(); err != nil {
t.Fatalf("del edge error: %v", err)
}
// check remove tree
if eno := m.emptyDir(Background, cloneDstIno, true, nil, make(chan int, 10)); eno != 0 {
if eno := m.doCleanupDetachedNode(Background, cloneDstIno); eno != 0 {
t.Fatalf("remove tree error rootInode: %v", cloneDstIno)
}
time.Sleep(1 * time.Second)
removedKeysStr := make([]string, len(removedItem))
for i, key := range removedItem {
removedKeysStr[i] = key.(string)
}
removedKeysStr = append(removedKeysStr, m.detachedNodes())
if exists := m.rdb.Exists(Background, removedKeysStr...).Val(); exists != 0 {
t.Fatalf("has keys not removed: %v", removedItem)
}
// check detached node
m.rdb.ZAdd(Background, m.detachedNodes(), redis.Z{Member: dNode1.String(), Score: float64(time.Now().Add(-1 * time.Minute).Unix())}).Err()
m.rdb.ZAdd(Background, m.detachedNodes(), redis.Z{Member: dNode2.String(), Score: float64(time.Now().Add(-5 * time.Minute).Unix())}).Err()
m.rdb.ZAdd(Background, m.detachedNodes(), redis.Z{Member: dNode3.String(), Score: float64(time.Now().Add(-48 * time.Hour).Unix())}).Err()
m.rdb.ZAdd(Background, m.detachedNodes(), redis.Z{Member: dNode4.String(), Score: float64(time.Now().Add(-48 * time.Hour).Unix())}).Err()
case *dbMeta:
if n, err := m.db.Delete(&edge{Parent: cloneDstAttr.Parent, Name: []byte(cloneDstName)}); err != nil || n != 1 {
t.Fatalf("del edge error: %v", err)
}
// check remove tree
if eno := m.emptyDir(Background, cloneDstIno, true, nil, make(chan int, 10)); eno != 0 {
if eno := m.doCleanupDetachedNode(Background, cloneDstIno); eno != 0 {
t.Fatalf("remove tree error rootInode: %v", cloneDstIno)
}
removedItem = append(removedItem, &detachedNode{Inode: cloneDstIno})
time.Sleep(1 * time.Second)
if exists, err := m.db.Exist(removedItem...); err != nil || exists {
t.Fatalf("has keys not removed: %v", removedItem)
}
m.txn(func(s *xorm.Session) error {
return mustInsert(s,
&detachedNode{Inode: dNode1, Added: time.Now().Add(-1 * time.Minute).Unix()},
&detachedNode{Inode: dNode2, Added: time.Now().Add(-5 * time.Minute).Unix()},
&detachedNode{Inode: dNode3, Added: time.Now().Add(-48 * time.Hour).Unix()},
&detachedNode{Inode: dNode4, Added: time.Now().Add(-48 * time.Hour).Unix()},
)
})
case *kvMeta:
// del edge first
if err := m.deleteKeys(m.entryKey(cloneDstAttr.Parent, cloneDstName)); err != nil {
t.Fatalf("del edge error: %v", err)
}
// check remove tree
if eno := m.emptyDir(Background, cloneDstIno, true, nil, make(chan int, 10)); eno != 0 {
if eno := m.doCleanupDetachedNode(Background, cloneDstIno); eno != 0 {
t.Fatalf("remove tree error rootInode: %v", cloneDstIno)
}
time.Sleep(1 * time.Second)
removedItem = append(removedItem, m.detachedKey(cloneDstIno))
m.txn(func(tx *kvTxn) error {
for _, key := range removedItem {
if buf := tx.get(key.([]byte)); buf != nil {
t.Fatalf("has keys not removed: %v", removedItem)
}
}
tx.set(m.detachedKey(dNode1), m.packInt64(time.Now().Add(-1*time.Minute).Unix()))
tx.set(m.detachedKey(dNode2), m.packInt64(time.Now().Add(-5*time.Minute).Unix()))
tx.set(m.detachedKey(dNode3), m.packInt64(time.Now().Add(-48*time.Hour).Unix()))
tx.set(m.detachedKey(dNode4), m.packInt64(time.Now().Add(-48*time.Hour).Unix()))
return nil
})

}
time.Sleep(1 * time.Second)
if !sli1del || !sli2del {
t.Fatalf("slice should be deleted")
}
nodes := m.(engine).doFindDetachedNodes(time.Now())
if len(nodes) != 4 {
t.Fatalf("find detached nodes error: %v", nodes)
}
nodes = m.(engine).doFindDetachedNodes(time.Now().Add(-24 * time.Hour))
if len(nodes) != 2 {
t.Fatalf("find detached nodes error: %v", nodes)
}

}

Expand Down
2 changes: 2 additions & 0 deletions pkg/meta/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ type Meta interface {
CleanStaleSessions()
// CleanupTrashBefore deletes all files in trash before the given time.
CleanupTrashBefore(ctx Context, edge time.Time, increProgress func())
// CleanupDetachedNodesBefore deletes all detached nodes before the given time.
CleanupDetachedNodesBefore(ctx Context, edge time.Time, increProgress func())

// StatFS returns summary statistics of a volume.
StatFS(ctx Context, totalspace, availspace, iused, iavail *uint64, subdir bool) syscall.Errno
Expand Down
70 changes: 48 additions & 22 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
locked: locked$sid -> { lockf$inode or lockp$inode }
Removed files: delfiles -> [$inode:$length -> seconds]
detached nodes: detachedNodes -> [$inode -> seconds]
Slices refs: k$sliceId_$size -> refcount
Dir data length: dirDataLength -> { $inode -> length }
Expand Down Expand Up @@ -578,6 +579,10 @@ func (m *redisMeta) delfiles() string {
return m.prefix + "delfiles"
}

func (m *redisMeta) detachedNodes() string {
return m.prefix + "detachedNodes"
}

func (r *redisMeta) delSlices() string {
return r.prefix + "delSlices"
}
Expand Down Expand Up @@ -4045,7 +4050,10 @@ func (m *redisMeta) Clone(ctx Context, srcIno, dstParentIno Ino, dstName string,
return eno
}
dstParentAttr.Nlink++
return m.rdb.Set(ctx, m.inodeKey(dstParentIno), m.marshal(dstParentAttr), 0).Err()
if err = tx.Set(ctx, m.inodeKey(dstParentIno), m.marshal(dstParentAttr), 0).Err(); err != nil {
return err
}
return tx.ZRem(ctx, m.detachedNodes(), dstIno.String()).Err()
}, m.entryKey(dstParentIno))
if err != nil {
cloneEno = errno(err)
Expand All @@ -4055,28 +4063,10 @@ func (m *redisMeta) Clone(ctx Context, srcIno, dstParentIno Ino, dstName string,
}
}
// delete the dst tree if clone failed
if eno != 0 || err != nil {
// todo: store dstIno and delete it in the background
attr := &Attr{}
eno := m.doGetAttr(ctx, dstIno, attr)
if eno == syscall.ENOENT {
return cloneEno
}
if eno != 0 {
logger.Errorf("clone: remove tree error rootInode %v", dstIno)
return eno
}
rmConcurrent := make(chan int, 10)
if eno := m.emptyDir(ctx, dstIno, true, nil, rmConcurrent); eno != 0 {
logger.Errorf("clone: remove tree error rootInode %v", dstIno)
return eno
if cloneEno != 0 {
if eno := m.doCleanupDetachedNode(ctx, dstIno); eno != 0 {
logger.Errorf("doCleanupDetachedNode: remove detached tree (%d) error: %s", dstIno, eno)
}

return errno(m.txn(ctx, func(tx *redis.Tx) error {
tx.Del(ctx, m.inodeKey(dstIno))
tx.Del(ctx, m.xattrKey(dstIno))
return nil
}, m.inodeKey(dstIno), m.xattrKey(dstIno)))
}
} else {
cloneEno = m.cloneEntry(ctx, srcIno, srcAttr.Typ, dstParentIno, dstName, &dstIno, cmode, cumask, count, total, true, concurrent)
Expand Down Expand Up @@ -4306,8 +4296,44 @@ func (m *redisMeta) mkNodeWithAttr(ctx Context, tx *redis.Tx, srcIno Ino, srcAtt
tx.HSet(ctx, m.entryKey(dstParentIno), dstName, m.packEntry(srcAttr.Typ, *dstIno))
tx.IncrBy(ctx, m.usedSpaceKey(), align4K(0))
tx.Incr(ctx, m.totalInodesKey())
} else {
pipe.ZAdd(ctx, m.detachedNodes(), redis.Z{Member: dstIno.String(), Score: float64(time.Now().Unix())})
}
return nil
})
return err
}

func (m *redisMeta) doCleanupDetachedNode(ctx Context, detachedNode Ino) syscall.Errno {
exists, err := m.rdb.Exists(ctx, m.inodeKey(detachedNode)).Result()
if err != nil {
return errno(err)
}
if exists == 1 {
rmConcurrent := make(chan int, 10)
if eno := m.emptyDir(ctx, detachedNode, true, nil, rmConcurrent); eno != 0 {
return eno
}
if err := m.txn(ctx, func(tx *redis.Tx) error {
tx.Del(ctx, m.inodeKey(detachedNode))
tx.Del(ctx, m.xattrKey(detachedNode))
return nil
}, m.inodeKey(detachedNode), m.xattrKey(detachedNode)); err != nil {
return errno(err)
}
}
return errno(m.rdb.ZRem(ctx, m.detachedNodes(), detachedNode.String()).Err())
}

func (m *redisMeta) doFindDetachedNodes(t time.Time) []Ino {
var detachedInos []Ino
detachedNodes, err := m.rdb.ZRangeByScore(Background, m.detachedNodes(), &redis.ZRangeBy{Max: strconv.FormatInt(t.Unix(), 10)}).Result()
if err != nil {
logger.Errorf("Scan detached nodes error: %s", err)
}
for _, node := range detachedNodes {
inode, _ := strconv.ParseUint(node, 10, 64)
detachedInos = append(detachedInos, Ino(inode))
}
return detachedInos
}
Loading

0 comments on commit c159d9e

Please sign in to comment.