Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/clone: clean the detached nodes in the gc command #3366

Merged
merged 11 commits into from
Mar 28, 2023
8 changes: 6 additions & 2 deletions cmd/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ $ juicefs gc redis://localhost
# Trigger compaction of all slices
$ juicefs gc redis://localhost --compact
# Delete leaked objects and delayed deleted slices or files
# Delete leaked objects or metadata and delayed deleted slices or files
$ juicefs gc redis://localhost --delete`,
Flags: []cli.Flag{
&cli.BoolFlag{
Expand All @@ -61,7 +61,7 @@ $ juicefs gc redis://localhost --delete`,
},
&cli.BoolFlag{
Name: "delete",
Usage: "delete leaked objects and delayed deleted slices or files",
Usage: "delete leaked objects or metadata and delayed deleted slices or files",
},
&cli.IntFlag{
Name: "threads",
Expand Down Expand Up @@ -144,6 +144,10 @@ func gc(ctx *cli.Context) error {
cleanTrashSpin := progress.AddCountSpinner("Cleaned trash")
m.CleanupTrashBefore(c, edge, cleanTrashSpin.Increment)
cleanTrashSpin.Done()

cleanDetachedNodeSpin := progress.AddCountSpinner("Cleaned detached nodes")
m.CleanupDetachedNodesBefore(c, time.Now().Add(-time.Hour*24), cleanDetachedNodeSpin.Increment)
cleanDetachedNodeSpin.Done()
}

err = m.ScanDeletedObject(
Expand Down
14 changes: 14 additions & 0 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type engine interface {
doCleanupSlices()
doCleanupDelayedSlices(edge int64) (int, error)
doDeleteSlice(id uint64, size uint32) error
doFindDetachedNodes(t time.Time) []Ino
zhijian-pro marked this conversation as resolved.
Show resolved Hide resolved
doCleanupDetachedNode(ctx Context, detachedNode Ino) syscall.Errno

doGetQuota(ctx Context, inode Ino) (*Quota, error)
doSetQuota(ctx Context, inode Ino, quota *Quota, create bool) error
Expand Down Expand Up @@ -1920,6 +1922,18 @@ func (m *baseMeta) cleanupTrash() {
}
}

func (m *baseMeta) CleanupDetachedNodesBefore(ctx Context, edge time.Time, increProgress func()) {
for _, inode := range m.en.doFindDetachedNodes(edge) {
if eno := m.en.doCleanupDetachedNode(Background, inode); eno != 0 {
logger.Errorf("cleanupDetachedNode: remove detached tree (%d) error: %s", inode, eno)
} else {
if increProgress != nil {
increProgress()
}
}
}
}

func (m *baseMeta) CleanupTrashBefore(ctx Context, edge time.Time, increProgress func()) {
logger.Debugf("cleanup trash: started")
now := time.Now()
Expand Down
38 changes: 33 additions & 5 deletions pkg/meta/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2135,59 +2135,87 @@ func testClone(t *testing.T, m Meta) {
return nil
})
// check remove tree
var dNode1, dNode2, dNode3, dNode4 Ino = 101, 102, 103, 104
switch m := m.(type) {
case *redisMeta:
// del edge first
if err := m.rdb.HDel(Background, m.entryKey(cloneDstAttr.Parent), cloneDstName).Err(); err != nil {
t.Fatalf("del edge error: %v", err)
}
// check remove tree
if eno := m.emptyDir(Background, cloneDstIno, true, nil, make(chan int, 10)); eno != 0 {
if eno := m.doCleanupDetachedNode(Background, cloneDstIno); eno != 0 {
t.Fatalf("remove tree error rootInode: %v", cloneDstIno)
}
time.Sleep(1 * time.Second)
removedKeysStr := make([]string, len(removedItem))
for i, key := range removedItem {
removedKeysStr[i] = key.(string)
}
removedKeysStr = append(removedKeysStr, m.detachedNodes())
if exists := m.rdb.Exists(Background, removedKeysStr...).Val(); exists != 0 {
t.Fatalf("has keys not removed: %v", removedItem)
}
// check detached node
m.rdb.ZAdd(Background, m.detachedNodes(), redis.Z{Member: dNode1.String(), Score: float64(time.Now().Add(-1 * time.Minute).Unix())}).Err()
m.rdb.ZAdd(Background, m.detachedNodes(), redis.Z{Member: dNode2.String(), Score: float64(time.Now().Add(-5 * time.Minute).Unix())}).Err()
m.rdb.ZAdd(Background, m.detachedNodes(), redis.Z{Member: dNode3.String(), Score: float64(time.Now().Add(-48 * time.Hour).Unix())}).Err()
m.rdb.ZAdd(Background, m.detachedNodes(), redis.Z{Member: dNode4.String(), Score: float64(time.Now().Add(-48 * time.Hour).Unix())}).Err()
case *dbMeta:
if n, err := m.db.Delete(&edge{Parent: cloneDstAttr.Parent, Name: []byte(cloneDstName)}); err != nil || n != 1 {
t.Fatalf("del edge error: %v", err)
}
// check remove tree
if eno := m.emptyDir(Background, cloneDstIno, true, nil, make(chan int, 10)); eno != 0 {
if eno := m.doCleanupDetachedNode(Background, cloneDstIno); eno != 0 {
t.Fatalf("remove tree error rootInode: %v", cloneDstIno)
}
removedItem = append(removedItem, &detachedNode{Inode: cloneDstIno})
time.Sleep(1 * time.Second)
if exists, err := m.db.Exist(removedItem...); err != nil || exists {
t.Fatalf("has keys not removed: %v", removedItem)
}
m.txn(func(s *xorm.Session) error {
return mustInsert(s,
&detachedNode{Inode: dNode1, Added: time.Now().Add(-1 * time.Minute).Unix()},
&detachedNode{Inode: dNode2, Added: time.Now().Add(-5 * time.Minute).Unix()},
&detachedNode{Inode: dNode3, Added: time.Now().Add(-48 * time.Hour).Unix()},
&detachedNode{Inode: dNode4, Added: time.Now().Add(-48 * time.Hour).Unix()},
)
})
case *kvMeta:
// del edge first
if err := m.deleteKeys(m.entryKey(cloneDstAttr.Parent, cloneDstName)); err != nil {
t.Fatalf("del edge error: %v", err)
}
// check remove tree
if eno := m.emptyDir(Background, cloneDstIno, true, nil, make(chan int, 10)); eno != 0 {
if eno := m.doCleanupDetachedNode(Background, cloneDstIno); eno != 0 {
t.Fatalf("remove tree error rootInode: %v", cloneDstIno)
}
time.Sleep(1 * time.Second)
removedItem = append(removedItem, m.detachedKey(cloneDstIno))
m.txn(func(tx *kvTxn) error {
for _, key := range removedItem {
if buf := tx.get(key.([]byte)); buf != nil {
t.Fatalf("has keys not removed: %v", removedItem)
}
}
tx.set(m.detachedKey(dNode1), m.packInt64(time.Now().Add(-1*time.Minute).Unix()))
tx.set(m.detachedKey(dNode2), m.packInt64(time.Now().Add(-5*time.Minute).Unix()))
tx.set(m.detachedKey(dNode3), m.packInt64(time.Now().Add(-48*time.Hour).Unix()))
tx.set(m.detachedKey(dNode4), m.packInt64(time.Now().Add(-48*time.Hour).Unix()))
return nil
})

}
time.Sleep(1 * time.Second)
if !sli1del || !sli2del {
t.Fatalf("slice should be deleted")
}
nodes := m.(engine).doFindDetachedNodes(time.Now())
if len(nodes) != 4 {
t.Fatalf("find detached nodes error: %v", nodes)
}
nodes = m.(engine).doFindDetachedNodes(time.Now().Add(-24 * time.Hour))
if len(nodes) != 2 {
t.Fatalf("find detached nodes error: %v", nodes)
}

}

Expand Down
2 changes: 2 additions & 0 deletions pkg/meta/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ type Meta interface {
CleanStaleSessions()
// CleanupTrashBefore deletes all files in trash before the given time.
CleanupTrashBefore(ctx Context, edge time.Time, increProgress func())
// CleanupDetachedNodesBefore deletes all detached nodes before the given time.
CleanupDetachedNodesBefore(ctx Context, edge time.Time, increProgress func())

// StatFS returns summary statistics of a volume.
StatFS(ctx Context, totalspace, availspace, iused, iavail *uint64) syscall.Errno
Expand Down
70 changes: 48 additions & 22 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
locked: locked$sid -> { lockf$inode or lockp$inode }

Removed files: delfiles -> [$inode:$length -> seconds]
detached nodes: detachedNodes -> [$inode -> seconds]
Slices refs: k$sliceId_$size -> refcount

Dir data length: dirDataLength -> { $inode -> length }
Expand Down Expand Up @@ -578,6 +579,10 @@ func (m *redisMeta) delfiles() string {
return m.prefix + "delfiles"
}

func (m *redisMeta) detachedNodes() string {
return m.prefix + "detachedNodes"
}

func (r *redisMeta) delSlices() string {
return r.prefix + "delSlices"
}
Expand Down Expand Up @@ -4048,35 +4053,20 @@ func (m *redisMeta) Clone(ctx Context, srcIno, dstParentIno Ino, dstName string,
return eno
}
dstParentAttr.Nlink++
return m.rdb.Set(ctx, m.inodeKey(dstParentIno), m.marshal(dstParentAttr), 0).Err()
if err = tx.Set(ctx, m.inodeKey(dstParentIno), m.marshal(dstParentAttr), 0).Err(); err != nil {
return err
}
return tx.ZRem(ctx, m.detachedNodes(), dstIno.String()).Err()
}, m.entryKey(dstParentIno))
if err != nil {
cloneEno = errno(err)
}
}
// delete the dst tree if clone failed
if eno != 0 || err != nil {
// todo: store dstIno and delete it in the background
attr := &Attr{}
eno := m.doGetAttr(ctx, dstIno, attr)
if eno == syscall.ENOENT {
return cloneEno
}
if eno != 0 {
logger.Errorf("clone: remove tree error rootInode %v", dstIno)
return eno
}
rmConcurrent := make(chan int, 10)
if eno := m.emptyDir(ctx, dstIno, true, nil, rmConcurrent); eno != 0 {
logger.Errorf("clone: remove tree error rootInode %v", dstIno)
return eno
if cloneEno != 0 {
if eno := m.doCleanupDetachedNode(ctx, dstIno); eno != 0 {
logger.Errorf("doCleanupDetachedNode: remove detached tree (%d) error: %s", dstIno, eno)
}

return errno(m.txn(ctx, func(tx *redis.Tx) error {
tx.Del(ctx, m.inodeKey(dstIno))
tx.Del(ctx, m.xattrKey(dstIno))
return nil
}, m.inodeKey(dstIno), m.xattrKey(dstIno)))
}
} else {
cloneEno = m.cloneEntry(ctx, srcIno, srcAttr.Typ, dstParentIno, dstName, &dstIno, cmode, cumask, count, total, true, concurrent)
Expand Down Expand Up @@ -4306,8 +4296,44 @@ func (m *redisMeta) mkNodeWithAttr(ctx Context, tx *redis.Tx, srcIno Ino, srcAtt
tx.HSet(ctx, m.entryKey(dstParentIno), dstName, m.packEntry(srcAttr.Typ, *dstIno))
tx.IncrBy(ctx, m.usedSpaceKey(), align4K(0))
tx.Incr(ctx, m.totalInodesKey())
} else {
pipe.ZAdd(ctx, m.detachedNodes(), redis.Z{Member: dstIno.String(), Score: float64(time.Now().Unix())})
davies marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
})
return err
}

func (m *redisMeta) doCleanupDetachedNode(ctx Context, detachedNode Ino) syscall.Errno {
exists, err := m.rdb.Exists(ctx, m.inodeKey(detachedNode)).Result()
if err != nil {
return errno(err)
}
if exists == 1 {
rmConcurrent := make(chan int, 10)
if eno := m.emptyDir(ctx, detachedNode, true, nil, rmConcurrent); eno != 0 {
return eno
}
if err := m.txn(ctx, func(tx *redis.Tx) error {
davies marked this conversation as resolved.
Show resolved Hide resolved
tx.Del(ctx, m.inodeKey(detachedNode))
tx.Del(ctx, m.xattrKey(detachedNode))
return nil
}, m.inodeKey(detachedNode), m.xattrKey(detachedNode)); err != nil {
return errno(err)
}
}
return errno(m.rdb.ZRem(ctx, m.detachedNodes(), detachedNode.String()).Err())
}

func (m *redisMeta) doFindDetachedNodes(t time.Time) []Ino {
var detachedInos []Ino
detachedNodes, err := m.rdb.ZRangeByScore(Background, m.detachedNodes(), &redis.ZRangeBy{Max: strconv.FormatInt(t.Unix(), 10)}).Result()
if err != nil {
logger.Errorf("Scan detached nodes error: %s", err)
}
for _, node := range detachedNodes {
inode, _ := strconv.ParseUint(node, 10, 64)
detachedInos = append(detachedInos, Ino(inode))
}
return detachedInos
}
Loading