Skip to content

Commit

Permalink
gc: clean expired trash files, delfiles and delslices (#3022)
Browse files Browse the repository at this point in the history
* add delete api for statistic

Signed-off-by: xixi <[email protected]>

* scan and delete delfiles and delslices in gc subcommand

Signed-off-by: xixi <[email protected]>

* fix slice leak

Signed-off-by: xixi <[email protected]>

* fix increProgress nil bug

Signed-off-by: xixi <[email protected]>

* fix bugs of tkv meta engine

Signed-off-by: xixi <[email protected]>

* remove unnecessary info

Signed-off-by: xixi <[email protected]>

* rename Statistic to ScanDeletedObject

Signed-off-by: xixi <[email protected]>

* update description and information of gc subcommand

Signed-off-by: xixi <[email protected]>

* change log

Signed-off-by: xixi <[email protected]>

* fix bugs on conflicts

Signed-off-by: xixi <[email protected]>

* fix unit tests

Signed-off-by: xixi <[email protected]>

* make scan sync

Signed-off-by: xixi <[email protected]>

Signed-off-by: xixi <[email protected]>
  • Loading branch information
Hexilee authored Dec 6, 2022
1 parent fd42b7c commit 1dfc957
Show file tree
Hide file tree
Showing 8 changed files with 386 additions and 178 deletions.
158 changes: 108 additions & 50 deletions cmd/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func cmdGC() *cli.Command {
ArgsUsage: "META-URL",
Description: `
It scans all objects in data storage and slices in metadata, comparing them to see if there is any
leaked object. It can also actively trigger compaction of slices.
leaked object. It can also actively trigger compaction of slices and the cleanup of delayed deleted slices or files.
Use this command if you find that data storage takes more than expected.
Examples:
Expand All @@ -52,7 +52,7 @@ $ juicefs gc redis://localhost
# Trigger compaction of all slices
$ juicefs gc redis://localhost --compact
# Delete leaked objects
# Delete leaked objects and delayed deleted slices or files
$ juicefs gc redis://localhost --delete`,
Flags: []cli.Flag{
&cli.BoolFlag{
Expand All @@ -61,7 +61,7 @@ $ juicefs gc redis://localhost --delete`,
},
&cli.BoolFlag{
Name: "delete",
Usage: "delete leaked objects",
Usage: "delete leaked objects and delayed deleted slices or files",
},
&cli.IntFlag{
Name: "threads",
Expand Down Expand Up @@ -110,12 +110,70 @@ func gc(ctx *cli.Context) error {

// Scan all chunks first and do compaction if necessary
progress := utils.NewProgress(false, false)
if ctx.Bool("compact") {
bar := progress.AddCountBar("Compacted chunks", 0)
spin := progress.AddDoubleSpinner("Compacted slices")
// Delete pending slices while listing all slices
delete := ctx.Bool("delete")
threads := ctx.Int("threads")
compact := ctx.Bool("compact")
if (delete || compact) && threads <= 0 {
logger.Fatal("threads should be greater than 0 to delete or compact objects")
}

var wg sync.WaitGroup
var delSpin *utils.Bar
var sliceChan chan *dSlice // pending delete slices

if delete || compact {
delSpin = progress.AddCountSpinner("Deleted pending")
sliceChan = make(chan *dSlice, 10240)
m.OnMsg(meta.DeleteSlice, func(args ...interface{}) error {
return store.Remove(args[0].(uint64), int(args[1].(uint32)))
delSpin.Increment()
sliceChan <- &dSlice{args[0].(uint64), args[1].(uint32)}
return nil
})
for i := 0; i < threads; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for s := range sliceChan {
if err := store.Remove(s.id, int(s.length)); err != nil {
logger.Warnf("remove %d_%d: %s", s.id, s.length, err)
}
}
}()
}
}

c := meta.WrapContext(ctx.Context)
delayedFileSpin := progress.AddDoubleSpinner("Delfiles")
cleanedFileSpin := progress.AddDoubleSpinner("Cleaned delfiles")
edge := time.Now().Add(-time.Duration(format.TrashDays) * 24 * time.Hour)
if delete {
cleanTrashSpin := progress.AddCountSpinner("Cleaned trash")
m.CleanupTrashBefore(c, edge, cleanTrashSpin.Increment)
cleanTrashSpin.Done()
}

err = m.ScanDeletedObject(
c,
nil,
func(_ meta.Ino, size uint64, ts int64) (bool, error) {
delayedFileSpin.IncrInt64(int64(size))
if delete {
cleanedFileSpin.IncrInt64(int64(size))
return true, nil
}
return false, nil
},
)
if err != nil {
logger.Fatalf("scan deleted object: %s", err)
}
delayedFileSpin.Done()
cleanedFileSpin.Done()

if compact {
bar := progress.AddCountBar("Compacted chunks", 0)
spin := progress.AddDoubleSpinner("Compacted slices")
m.OnMsg(meta.CompactChunk, func(args ...interface{}) error {
slices := args[0].([]meta.Slice)
err := vfs.Compact(chunkConf, store, slices, args[1].(uint64))
Expand All @@ -125,15 +183,15 @@ func gc(ctx *cli.Context) error {
return err
})
if st := m.CompactAll(meta.Background, ctx.Int("threads"), bar); st == 0 {
bar.Done()
spin.Done()
if progress.Quiet {
c, b := spin.Current()
logger.Infof("Compacted %d chunks (%d slices, %d bytes).", bar.Current(), c, b)
}
} else {
logger.Errorf("compact all chunks: %s", st)
}
bar.Done()
spin.Done()
} else {
m.OnMsg(meta.CompactChunk, func(args ...interface{}) error {
return nil // ignore compaction
Expand All @@ -143,52 +201,37 @@ func gc(ctx *cli.Context) error {
// put it above delete count spinner
sliceCSpin := progress.AddCountSpinner("Listed slices")

// Delete pending slices while listing all slices
delete := ctx.Bool("delete")
threads := ctx.Int("threads")
if delete && threads <= 0 {
logger.Fatal("threads should be greater than 0 to delete objects")
}
var delSpin *utils.Bar
var sliceChan chan *dSlice // pending delete slices
var wg sync.WaitGroup
if delete {
delSpin = progress.AddCountSpinner("Deleted pending")
sliceChan = make(chan *dSlice, 10240)
m.OnMsg(meta.DeleteSlice, func(args ...interface{}) error {
delSpin.Increment()
sliceChan <- &dSlice{args[0].(uint64), args[1].(uint32)}
return nil
})
for i := 0; i < threads; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for s := range sliceChan {
if err := store.Remove(s.id, int(s.length)); err != nil {
logger.Warnf("remove %d_%d: %s", s.id, s.length, err)
}
}
}()
}
}

// List all slices in metadata engine
var c = meta.NewContext(0, 0, []uint32{0})
slices := make(map[meta.Ino][]meta.Slice)
r := m.ListSlices(c, slices, delete, sliceCSpin.Increment)
if r != 0 {
logger.Fatalf("list all slices: %s", r)
}
if delete {
close(sliceChan)
wg.Wait()
delSpin.Done()
if progress.Quiet {
logger.Infof("Deleted %d pending slices", delSpin.Current())
}

delayedSliceSpin := progress.AddDoubleSpinner("Delslices")
cleanedSliceSpin := progress.AddDoubleSpinner("Cleaned delslices")

err = m.ScanDeletedObject(
c,
func(ss []meta.Slice, ts int64) (bool, error) {
for _, s := range ss {
delayedSliceSpin.IncrInt64(int64(s.Size))
if delete && ts < edge.Unix() {
cleanedSliceSpin.IncrInt64(int64(s.Size))
}
}
if delete && ts < edge.Unix() {
return true, nil
}
return false, nil
},
nil,
)
if err != nil {
logger.Fatalf("statistic: %s", err)
}
sliceCSpin.Done()
delayedSliceSpin.Done()
cleanedSliceSpin.Done()

// Scan all objects to find leaked ones
blob = object.WithPrefix(blob, "chunks/")
Expand Down Expand Up @@ -313,16 +356,31 @@ func gc(ctx *cli.Context) error {
}
}
}
m.OnMsg(meta.DeleteSlice, func(args ...interface{}) error {
return nil
})
if sliceChan != nil {
close(sliceChan)
}
close(leakedObj)
wg.Wait()
if delete || compact {
delSpin.Done()
if progress.Quiet {
logger.Infof("Deleted %d pending slices", delSpin.Current())
}
}
sliceCSpin.Done()
progress.Done()

vc, _ := valid.Current()
cc, cb := compacted.Current()
lc, lb := leaked.Current()
sc, sb := skipped.Current()
logger.Infof("scanned %d objects, %d valid, %d compacted (%d bytes), %d leaked (%d bytes), %d skipped (%d bytes)",
bar.Current(), vc, cc, cb, lc, lb, sc, sb)
dsc, dsb := cleanedSliceSpin.Current()
fc, fb := cleanedFileSpin.Current()
logger.Infof("scanned %d objects, %d valid, %d compacted (%d bytes), %d leaked (%d bytes), %d delslices (%d bytes), %d delfiles (%d bytes), %d skipped (%d bytes)",
bar.Current(), vc, cc, cb, lc, lb, dsc, dsb, fc, fb, sc, sb)
if lc > 0 && !delete {
logger.Infof("Please add `--delete` to clean leaked objects")
}
Expand Down
16 changes: 9 additions & 7 deletions cmd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,17 @@ func status(ctx *cli.Context) error {
fileSpinner := progress.AddDoubleSpinner("Delayed Files")
defer fileSpinner.Done()

err = m.Statistic(
ctx.Context,
func(s meta.Slice) error {
slicesSpinner.IncrInt64(int64(s.Size))
return nil
err = m.ScanDeletedObject(
meta.WrapContext(ctx.Context),
func(ss []meta.Slice, _ int64) (bool, error) {
for _, s := range ss {
slicesSpinner.IncrInt64(int64(s.Size))
}
return false, nil
},
func(_ meta.Ino, size uint64) error {
func(_ meta.Ino, size uint64, _ int64) (bool, error) {
fileSpinner.IncrInt64(int64(size))
return nil
return false, nil
},
)
if err != nil {
Expand Down
35 changes: 23 additions & 12 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package meta

import (
"context"
"encoding/json"
"fmt"
"path"
Expand Down Expand Up @@ -82,12 +81,15 @@ type engine interface {
doGetParents(ctx Context, inode Ino) map[Ino]int
doRepair(ctx Context, inode Ino, attr *Attr) syscall.Errno

scanDeletedSlices(ctx context.Context, visitor func(s Slice) error) error
scanDeletedFiles(ctx context.Context, visitor func(ino Ino, size uint64) error) error
scanDeletedSlices(Context, deletedSliceScan) error
scanDeletedFiles(Context, deletedFileScan) error

GetSession(sid uint64, detail bool) (*Session, error)
}

type deletedSliceScan func(ss []Slice, ts int64) (clean bool, err error)
type deletedFileScan func(ino Ino, size uint64, ts int64) (clean bool, err error)

// fsStat aligned for atomic operations
// nolint:structcheck
type fsStat struct {
Expand Down Expand Up @@ -1363,9 +1365,8 @@ func (m *baseMeta) cleanupTrash() {
}
}

func (m *baseMeta) doCleanupTrash(force bool) {
func (m *baseMeta) CleanupTrashBefore(ctx Context, edge time.Time, increProgress func()) {
logger.Debugf("cleanup trash: started")
ctx := Background
now := time.Now()
var st syscall.Errno
var entries []*Entry
Expand All @@ -1383,7 +1384,6 @@ func (m *baseMeta) doCleanupTrash(force bool) {
}
}()
batch := 1000000
edge := now.Add(-time.Duration(24*m.fmt.TrashDays+1) * time.Hour)
for len(entries) > 0 {
e := entries[0]
ts, err := time.Parse("2006-01-02-15", string(e.Name))
Expand All @@ -1392,7 +1392,7 @@ func (m *baseMeta) doCleanupTrash(force bool) {
entries = entries[1:]
continue
}
if ts.Before(edge) || force {
if ts.Before(edge) {
var subEntries []*Entry
if st = m.en.doReaddir(ctx, e.Inode, 0, &subEntries, batch); st != 0 {
logger.Warnf("readdir subTrash %d: %s", e.Inode, st)
Expand All @@ -1411,6 +1411,9 @@ func (m *baseMeta) doCleanupTrash(force bool) {
}
if st == 0 {
count++
if increProgress != nil {
increProgress()
}
} else {
logger.Warnf("delete from trash %s/%s: %s", e.Name, se.Name, st)
rmdir = false
Expand All @@ -1431,6 +1434,14 @@ func (m *baseMeta) doCleanupTrash(force bool) {
}
}

func (m *baseMeta) doCleanupTrash(force bool) {
edge := time.Now().Add(-time.Duration(24*m.fmt.TrashDays+1) * time.Hour)
if force {
edge = time.Now()
}
m.CleanupTrashBefore(Background, edge, nil)
}

func (m *baseMeta) cleanupDelayedSlices() {
now := time.Now()
edge := now.Unix() - int64(m.fmt.TrashDays)*24*3600
Expand All @@ -1447,16 +1458,16 @@ func (m *baseMeta) cleanupDelayedSlices() {
}
}

func (m *baseMeta) Statistic(ctx context.Context, slicesDeletedScan func(Slice) error, fileDeletedScan func(ino Ino, size uint64) error) error {
func (m *baseMeta) ScanDeletedObject(ctx Context, sliceScan deletedSliceScan, fileScan deletedFileScan) error {
eg := errgroup.Group{}
if slicesDeletedScan != nil {
if sliceScan != nil {
eg.Go(func() error {
return m.en.scanDeletedSlices(ctx, slicesDeletedScan)
return m.en.scanDeletedSlices(ctx, sliceScan)
})
}
if fileDeletedScan != nil {
if fileScan != nil {
eg.Go(func() error {
return m.en.scanDeletedFiles(ctx, fileDeletedScan)
return m.en.scanDeletedFiles(ctx, fileScan)
})
}
return eg.Wait()
Expand Down
Loading

0 comments on commit 1dfc957

Please sign in to comment.