Skip to content

Commit

Permalink
scan TrashFile and PendingDeletedSlice in status subcommand (#3145)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hexilee authored Jan 10, 2023
1 parent 4f5dcec commit 90c71cb
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 40 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pjdfstest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ jobs:
needs: [pjdfstest]
if: always()
steps:
- uses: technote-space/workflow-conclusion-action@v2
- uses: technote-space/workflow-conclusion-action@v3
- uses: actions/checkout@v3

- name: Check Failure
Expand Down
4 changes: 2 additions & 2 deletions cmd/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func gc(ctx *cli.Context) error {

err = m.ScanDeletedObject(
c,
nil,
nil, nil, nil,
func(_ meta.Ino, size uint64, ts int64) (bool, error) {
delayedFileSpin.IncrInt64(int64(size))
if delete {
Expand Down Expand Up @@ -225,7 +225,7 @@ func gc(ctx *cli.Context) error {
}
return false, nil
},
nil,
nil, nil, nil,
)
if err != nil {
logger.Fatalf("statistic: %s", err)
Expand Down
54 changes: 36 additions & 18 deletions cmd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"syscall"
"time"

"github.com/juicedata/juicefs/pkg/meta"
"github.com/juicedata/juicefs/pkg/utils"
Expand Down Expand Up @@ -64,14 +65,18 @@ type sections struct {
}

type statistic struct {
UsedSpace uint64
AvailableSpace uint64
UsedInodes uint64
AvailableInodes uint64
TrashSliceCount int64 `json:",omitempty"`
TrashSliceSize int64 `json:",omitempty"`
PendingDeletedFileCount int64 `json:",omitempty"`
PendingDeletedFileSize int64 `json:",omitempty"`
UsedSpace uint64
AvailableSpace uint64
UsedInodes uint64
AvailableInodes uint64
TrashFileCount int64 `json:",omitempty"`
TrashFileSize int64 `json:",omitempty"`
PendingDeletedFileCount int64 `json:",omitempty"`
PendingDeletedFileSize int64 `json:",omitempty"`
TrashSliceCount int64 `json:",omitempty"`
TrashSliceSize int64 `json:",omitempty"`
PendingDeletedSliceCount int64 `json:",omitempty"`
PendingDeletedSliceSize int64 `json:",omitempty"`
}

func printJson(v interface{}) {
Expand Down Expand Up @@ -115,30 +120,43 @@ func status(ctx *cli.Context) error {

if ctx.Bool("more") {
progress := utils.NewProgress(false, false)
slicesSpinner := progress.AddDoubleSpinner("Trash Slices")
defer slicesSpinner.Done()
fileSpinner := progress.AddDoubleSpinner("Pending Deleted Files")
defer fileSpinner.Done()

trashFileSpinner := progress.AddDoubleSpinner("Trash Files")
pendingDeletedFileSpinner := progress.AddDoubleSpinner("Pending Deleted Files")
trashSlicesSpinner := progress.AddDoubleSpinner("Trash Slices")
pendingDeletedSlicesSpinner := progress.AddDoubleSpinner("Pending Deleted Slices")
err = m.ScanDeletedObject(
meta.WrapContext(ctx.Context),
func(ss []meta.Slice, _ int64) (bool, error) {
for _, s := range ss {
slicesSpinner.IncrInt64(int64(s.Size))
trashSlicesSpinner.IncrInt64(int64(s.Size))
}
return false, nil
},
func(_ uint64, size uint32) (bool, error) {
pendingDeletedSlicesSpinner.IncrInt64(int64(size))
return false, nil
},
func(_ meta.Ino, size uint64, _ time.Time) (bool, error) {
trashFileSpinner.IncrInt64(int64(size))
return false, nil
},
func(_ meta.Ino, size uint64, _ int64) (bool, error) {
fileSpinner.IncrInt64(int64(size))
pendingDeletedFileSpinner.IncrInt64(int64(size))
return false, nil
},
)
if err != nil {
logger.Fatalf("statistic: %s", err)
return err
}
stat.TrashSliceCount, stat.TrashSliceSize = slicesSpinner.Current()
stat.PendingDeletedFileCount, stat.PendingDeletedFileSize = fileSpinner.Current()
trashSlicesSpinner.Done()
pendingDeletedSlicesSpinner.Done()
trashFileSpinner.Done()
pendingDeletedFileSpinner.Done()
progress.Done()
stat.TrashSliceCount, stat.TrashSliceSize = trashSlicesSpinner.Current()
stat.PendingDeletedSliceCount, stat.PendingDeletedSliceSize = pendingDeletedSlicesSpinner.Current()
stat.TrashFileCount, stat.TrashFileSize = trashFileSpinner.Current()
stat.PendingDeletedFileCount, stat.PendingDeletedFileSize = pendingDeletedFileSpinner.Current()
}

printJson(&sections{format, sessions, stat})
Expand Down
68 changes: 59 additions & 9 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/juicedata/juicefs/pkg/utils"
"github.com/juicedata/juicefs/pkg/version"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -81,14 +82,17 @@ type engine interface {
doGetParents(ctx Context, inode Ino) map[Ino]int
doRepair(ctx Context, inode Ino, attr *Attr) syscall.Errno

scanDeletedSlices(Context, deletedSliceScan) error
scanDeletedFiles(Context, deletedFileScan) error
scanTrashSlices(Context, trashSliceScan) error
scanPendingSlices(Context, pendingSliceScan) error
scanPendingFiles(Context, pendingFileScan) error

GetSession(sid uint64, detail bool) (*Session, error)
}

type deletedSliceScan func(ss []Slice, ts int64) (clean bool, err error)
type deletedFileScan func(ino Ino, size uint64, ts int64) (clean bool, err error)
type trashSliceScan func(ss []Slice, ts int64) (clean bool, err error)
type pendingSliceScan func(id uint64, size uint32) (clean bool, err error)
type trashFileScan func(inode Ino, size uint64, ts time.Time) (clean bool, err error)
type pendingFileScan func(ino Ino, size uint64, ts int64) (clean bool, err error)

// fsStat aligned for atomic operations
// nolint:structcheck
Expand Down Expand Up @@ -1438,6 +1442,42 @@ func (m *baseMeta) CleanupTrashBefore(ctx Context, edge time.Time, increProgress
}
}

func (m *baseMeta) scanTrashFiles(ctx Context, scan trashFileScan) error {
var st syscall.Errno
var entries []*Entry
if st = m.en.doReaddir(ctx, TrashInode, 1, &entries, -1); st != 0 {
return errors.Wrap(st, "read trash")
}

var subEntries []*Entry
for _, entry := range entries {
ts, err := time.Parse("2006-01-02-15", string(entry.Name))
if err != nil {
logger.Warnf("bad entry as a subTrash: %s", entry.Name)
continue
}
subEntries = subEntries[:0]
if st = m.en.doReaddir(ctx, entry.Inode, 1, &subEntries, -1); st != 0 {
logger.Warnf("readdir subEntry %d: %s", entry.Inode, st)
continue
}
for _, se := range subEntries {
if se.Attr.Typ == TypeFile {
clean, err := scan(se.Inode, se.Attr.Length, ts)
if err != nil {
return errors.Wrap(err, "scan trash files")
}
if clean {
// TODO: m.en.doUnlink(ctx, entry.Attr.Parent, string(entry.Name))
// avoid lint warning
_ = clean
}
}
}
}
return nil
}

func (m *baseMeta) doCleanupTrash(force bool) {
edge := time.Now().Add(-time.Duration(24*m.fmt.TrashDays+1) * time.Hour)
if force {
Expand All @@ -1457,16 +1497,26 @@ func (m *baseMeta) cleanupDelayedSlices() {
}
}

func (m *baseMeta) ScanDeletedObject(ctx Context, sliceScan deletedSliceScan, fileScan deletedFileScan) error {
func (m *baseMeta) ScanDeletedObject(ctx Context, tss trashSliceScan, pss pendingSliceScan, tfs trashFileScan, pfs pendingFileScan) error {
eg := errgroup.Group{}
if sliceScan != nil {
if tss != nil {
eg.Go(func() error {
return m.en.scanTrashSlices(ctx, tss)
})
}
if pss != nil {
eg.Go(func() error {
return m.en.scanPendingSlices(ctx, pss)
})
}
if tfs != nil {
eg.Go(func() error {
return m.en.scanDeletedSlices(ctx, sliceScan)
return m.scanTrashFiles(ctx, tfs)
})
}
if fileScan != nil {
if pfs != nil {
eg.Go(func() error {
return m.en.scanDeletedFiles(ctx, fileScan)
return m.en.scanPendingFiles(ctx, pfs)
})
}
return eg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ type Meta interface {
// ListSessions returns all client sessions.
ListSessions() ([]*Session, error)
// ScanDeletedObject scan deleted objects by customized scanner.
ScanDeletedObject(Context, deletedSliceScan, deletedFileScan) error
ScanDeletedObject(Context, trashSliceScan, pendingSliceScan, trashFileScan, pendingFileScan) error
// ListLocks returns all locks of a inode.
ListLocks(ctx context.Context, inode Ino) ([]PLockItem, []FLockItem, error)
// CleanStaleSessions cleans up sessions not active for more than 5 minutes
Expand Down
59 changes: 56 additions & 3 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2810,7 +2810,7 @@ func (m *redisMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool,
return errno(err)
}

return errno(m.scanDeletedSlices(ctx, func(ss []Slice, _ int64) (bool, error) {
return errno(m.scanTrashSlices(ctx, func(ss []Slice, _ int64) (bool, error) {
slices[1] = append(slices[1], ss...)
if showProgress != nil {
for range ss {
Expand All @@ -2821,7 +2821,7 @@ func (m *redisMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool,
}))
}

func (m *redisMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error {
func (m *redisMeta) scanTrashSlices(ctx Context, scan trashSliceScan) error {
if scan == nil {
return nil
}
Expand Down Expand Up @@ -2893,7 +2893,60 @@ func (m *redisMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error
return nil
}

func (m *redisMeta) scanDeletedFiles(ctx Context, scan deletedFileScan) error {
func (m *redisMeta) scanPendingSlices(ctx Context, scan pendingSliceScan) error {
if scan == nil {
return nil
}

pendingKeys := make(chan string, 1000)
c, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
_ = m.hscan(c, m.sliceRefs(), func(keys []string) error {
for i := 0; i < len(keys); i += 2 {
val := keys[i+1]
refs, err := strconv.ParseInt(val, 10, 64)
if err != nil {
// ignored
logger.Warn(errors.Wrapf(err, "parse slice ref: %s", val))
return nil
}
if refs < 0 {
pendingKeys <- keys[i]
}
}
return nil
})
close(pendingKeys)
}()

for key := range pendingKeys {
ps := strings.Split(key[1:], "_")
if len(ps) != 2 {
return fmt.Errorf("invalid key %s", key)
}
id, err := strconv.ParseUint(ps[0], 10, 64)
if err != nil {
return errors.Wrapf(err, "invalid key %s, fail to parse id", key)
}
size, err := strconv.ParseUint(ps[1], 10, 64)
if err != nil {
return errors.Wrapf(err, "invalid key %s, fail to parse size", key)
}
clean, err := scan(id, uint32(size))
if err != nil {
return errors.Wrap(err, "scan pending slices")
}
if clean {
// TODO: m.deleteSlice(id, uint32(size))
// avoid lint warning
_ = clean
}
}
return nil
}

func (m *redisMeta) scanPendingFiles(ctx Context, scan pendingFileScan) error {
if scan == nil {
return nil
}
Expand Down
36 changes: 33 additions & 3 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2598,7 +2598,7 @@ func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, sh
return 0
}

return errno(m.scanDeletedSlices(ctx, func(ss []Slice, _ int64) (bool, error) {
return errno(m.scanTrashSlices(ctx, func(ss []Slice, _ int64) (bool, error) {
slices[1] = append(slices[1], ss...)
if showProgress != nil {
for range ss {
Expand All @@ -2609,7 +2609,7 @@ func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, sh
}))
}

func (m *dbMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error {
func (m *dbMeta) scanTrashSlices(ctx Context, scan trashSliceScan) error {
if scan == nil {
return nil
}
Expand Down Expand Up @@ -2676,7 +2676,37 @@ func (m *dbMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error {
return nil
}

func (m *dbMeta) scanDeletedFiles(ctx Context, scan deletedFileScan) error {
func (m *dbMeta) scanPendingSlices(ctx Context, scan pendingSliceScan) error {
if scan == nil {
return nil
}
var refs []sliceRef
err := m.roTxn(func(tx *xorm.Session) error {
if ok, err := tx.IsTableExist(&sliceRef{}); err != nil {
return err
} else if !ok {
return nil
}
return tx.Where("refs <= 0").Find(&refs)
})
if err != nil {
return errors.Wrap(err, "scan slice refs")
}
for _, ref := range refs {
clean, err := scan(ref.Id, ref.Size)
if err != nil {
return errors.Wrap(err, "scan slice")
}
if clean {
// TODO: m.deleteSlice(ref.Id, ref.Size)
// avoid lint warning
_ = clean
}
}
return nil
}

func (m *dbMeta) scanPendingFiles(ctx Context, scan pendingFileScan) error {
if scan == nil {
return nil
}
Expand Down
Loading

0 comments on commit 90c71cb

Please sign in to comment.