Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scan TrashFile and PendingDeletedSlice in status subcommand #3145

Merged
merged 18 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pjdfstest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ jobs:
needs: [pjdfstest]
if: always()
steps:
- uses: technote-space/workflow-conclusion-action@v2
- uses: technote-space/workflow-conclusion-action@v3
- uses: actions/checkout@v3

- name: Check Failure
Expand Down
4 changes: 2 additions & 2 deletions cmd/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func gc(ctx *cli.Context) error {

err = m.ScanDeletedObject(
c,
nil,
nil, nil, nil,
func(_ meta.Ino, size uint64, ts int64) (bool, error) {
delayedFileSpin.IncrInt64(int64(size))
if delete {
Expand Down Expand Up @@ -225,7 +225,7 @@ func gc(ctx *cli.Context) error {
}
return false, nil
},
nil,
nil, nil, nil,
)
if err != nil {
logger.Fatalf("statistic: %s", err)
Expand Down
54 changes: 36 additions & 18 deletions cmd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"syscall"
"time"

"github.com/juicedata/juicefs/pkg/meta"
"github.com/juicedata/juicefs/pkg/utils"
Expand Down Expand Up @@ -64,14 +65,18 @@ type sections struct {
}

type statistic struct {
UsedSpace uint64
AvailableSpace uint64
UsedInodes uint64
AvailableInodes uint64
TrashSliceCount int64 `json:",omitempty"`
TrashSliceSize int64 `json:",omitempty"`
PendingDeletedFileCount int64 `json:",omitempty"`
PendingDeletedFileSize int64 `json:",omitempty"`
UsedSpace uint64
AvailableSpace uint64
UsedInodes uint64
AvailableInodes uint64
TrashSliceCount int64 `json:",omitempty"`
TrashSliceSize int64 `json:",omitempty"`
PendingDeletedSliceCount int64 `json:",omitempty"`
PendingDeletedSliceSize int64 `json:",omitempty"`
TrashFileCount int64 `json:",omitempty"`
SandyXSD marked this conversation as resolved.
Show resolved Hide resolved
TrashFileSize int64 `json:",omitempty"`
PendingDeletedFileCount int64 `json:",omitempty"`
PendingDeletedFileSize int64 `json:",omitempty"`
}

func printJson(v interface{}) {
Expand Down Expand Up @@ -115,30 +120,43 @@ func status(ctx *cli.Context) error {

if ctx.Bool("more") {
progress := utils.NewProgress(false, false)
slicesSpinner := progress.AddDoubleSpinner("Trash Slices")
defer slicesSpinner.Done()
fileSpinner := progress.AddDoubleSpinner("Pending Deleted Files")
defer fileSpinner.Done()

trashSlicesSpinner := progress.AddDoubleSpinner("Trash Slices")
pendingDeletedSlicesSpinner := progress.AddDoubleSpinner("Pending Deleted Slices")
trashFileSpinner := progress.AddDoubleSpinner("Trash Files")
SandyXSD marked this conversation as resolved.
Show resolved Hide resolved
pendingDeletedFileSpinner := progress.AddDoubleSpinner("Pending Deleted Files")
err = m.ScanDeletedObject(
meta.WrapContext(ctx.Context),
func(ss []meta.Slice, _ int64) (bool, error) {
for _, s := range ss {
slicesSpinner.IncrInt64(int64(s.Size))
trashSlicesSpinner.IncrInt64(int64(s.Size))
}
return false, nil
},
func(_ uint64, size uint32) (bool, error) {
pendingDeletedSlicesSpinner.IncrInt64(int64(size))
return false, nil
},
func(_ meta.Ino, size uint64, _ time.Time) (bool, error) {
trashFileSpinner.IncrInt64(int64(size))
return false, nil
},
func(_ meta.Ino, size uint64, _ int64) (bool, error) {
fileSpinner.IncrInt64(int64(size))
pendingDeletedFileSpinner.IncrInt64(int64(size))
return false, nil
},
)
if err != nil {
logger.Fatalf("statistic: %s", err)
return err
}
stat.TrashSliceCount, stat.TrashSliceSize = slicesSpinner.Current()
stat.PendingDeletedFileCount, stat.PendingDeletedFileSize = fileSpinner.Current()
trashSlicesSpinner.Done()
pendingDeletedSlicesSpinner.Done()
trashFileSpinner.Done()
pendingDeletedFileSpinner.Done()
progress.Done()
stat.TrashSliceCount, stat.TrashSliceSize = trashSlicesSpinner.Current()
stat.PendingDeletedSliceCount, stat.PendingDeletedSliceSize = pendingDeletedSlicesSpinner.Current()
stat.TrashFileCount, stat.TrashFileSize = trashFileSpinner.Current()
stat.PendingDeletedFileCount, stat.PendingDeletedFileSize = pendingDeletedFileSpinner.Current()
}

printJson(&sections{format, sessions, stat})
Expand Down
92 changes: 83 additions & 9 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/juicedata/juicefs/pkg/utils"
"github.com/juicedata/juicefs/pkg/version"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -81,14 +82,17 @@ type engine interface {
doGetParents(ctx Context, inode Ino) map[Ino]int
doRepair(ctx Context, inode Ino, attr *Attr) syscall.Errno

scanDeletedSlices(Context, deletedSliceScan) error
scanDeletedFiles(Context, deletedFileScan) error
scanTrashSlices(Context, trashSliceScan) error
scanPendingSlices(Context, pendingSliceScan) error
scanPendingFiles(Context, pendingFileScan) error

GetSession(sid uint64, detail bool) (*Session, error)
}

type deletedSliceScan func(ss []Slice, ts int64) (clean bool, err error)
type deletedFileScan func(ino Ino, size uint64, ts int64) (clean bool, err error)
type trashSliceScan func(ss []Slice, ts int64) (clean bool, err error)
type pendingSliceScan func(id uint64, size uint32) (clean bool, err error)
type trashFileScan func(inode Ino, size uint64, ts time.Time) (clean bool, err error)
SandyXSD marked this conversation as resolved.
Show resolved Hide resolved
type pendingFileScan func(ino Ino, size uint64, ts int64) (clean bool, err error)

// fsStat aligned for atomic operations
// nolint:structcheck
Expand Down Expand Up @@ -1438,6 +1442,66 @@ func (m *baseMeta) CleanupTrashBefore(ctx Context, edge time.Time, increProgress
}
}

func (m *baseMeta) scanTrashFiles(ctx Context, scan trashFileScan) error {
var st syscall.Errno
var entries []*Entry
if st = m.en.doReaddir(ctx, TrashInode, 1, &entries, -1); st != 0 {
return errors.Wrap(st, "read trash")
}

type scannedEntry struct {
deletedAt time.Time
*Entry
}

scannedEntries := make([]scannedEntry, 0, len(entries))
for _, e := range entries {
ts, err := time.Parse("2006-01-02-15", string(e.Name))
if err != nil {
logger.Warnf("bad entry as a subTrash: %s", e.Name)
continue
}
scannedEntries = append(scannedEntries, scannedEntry{
deletedAt: ts,
Entry: e,
})
}

for len(scannedEntries) != 0 {
poped := scannedEntries
scannedEntries = nil
for _, entry := range poped {
if entry.Attr.Typ == TypeFile {
clean, err := scan(entry.Inode, entry.Attr.Length, entry.deletedAt)
if err != nil {
return errors.Wrap(err, "scan trash files")
}
if clean {
SandyXSD marked this conversation as resolved.
Show resolved Hide resolved
st = m.en.doUnlink(ctx, entry.Attr.Parent, string(entry.Name))
if st != 0 {
return errors.Wrap(st, "unlink trash file")
}
}
continue
}
if entry.Attr.Typ == TypeDirectory {
var subEntries []*Entry
if st = m.en.doReaddir(ctx, entry.Inode, 1, &subEntries, -1); st != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The directory should be empty

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should the directory be empty? Here we won't remove the directory, just read it and push subentries into the queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trash only has three levels, trash/directory/file or empty directory.
We should avoid the readdir for empty directory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, the new implementation ignores empty directories.

logger.Warnf("readdir subTrash %d: %s", entry.Inode, st)
continue
}
for _, se := range subEntries {
scannedEntries = append(scannedEntries, scannedEntry{
SandyXSD marked this conversation as resolved.
Show resolved Hide resolved
deletedAt: entry.deletedAt,
Entry: se,
})
}
}
}
}
return nil
}

func (m *baseMeta) doCleanupTrash(force bool) {
edge := time.Now().Add(-time.Duration(24*m.fmt.TrashDays+1) * time.Hour)
if force {
Expand All @@ -1457,16 +1521,26 @@ func (m *baseMeta) cleanupDelayedSlices() {
}
}

func (m *baseMeta) ScanDeletedObject(ctx Context, sliceScan deletedSliceScan, fileScan deletedFileScan) error {
func (m *baseMeta) ScanDeletedObject(ctx Context, tss trashSliceScan, pss pendingSliceScan, tfs trashFileScan, pfs pendingFileScan) error {
eg := errgroup.Group{}
if sliceScan != nil {
if tss != nil {
eg.Go(func() error {
return m.en.scanTrashSlices(ctx, tss)
})
}
if pss != nil {
eg.Go(func() error {
return m.en.scanPendingSlices(ctx, pss)
})
}
if tfs != nil {
eg.Go(func() error {
return m.en.scanDeletedSlices(ctx, sliceScan)
return m.scanTrashFiles(ctx, tfs)
})
}
if fileScan != nil {
if pfs != nil {
eg.Go(func() error {
return m.en.scanDeletedFiles(ctx, fileScan)
return m.en.scanPendingFiles(ctx, pfs)
})
}
return eg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ type Meta interface {
// ListSessions returns all client sessions.
ListSessions() ([]*Session, error)
// ScanDeletedObject scan deleted objects by customized scanner.
ScanDeletedObject(Context, deletedSliceScan, deletedFileScan) error
ScanDeletedObject(Context, trashSliceScan, pendingSliceScan, trashFileScan, pendingFileScan) error
// ListLocks returns all locks of a inode.
ListLocks(ctx context.Context, inode Ino) ([]PLockItem, []FLockItem, error)
// CleanStaleSessions cleans up sessions not active for more than 5 minutes
Expand Down
64 changes: 61 additions & 3 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2810,7 +2810,7 @@ func (m *redisMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool,
return errno(err)
}

return errno(m.scanDeletedSlices(ctx, func(ss []Slice, _ int64) (bool, error) {
return errno(m.scanTrashSlices(ctx, func(ss []Slice, _ int64) (bool, error) {
slices[1] = append(slices[1], ss...)
if showProgress != nil {
for range ss {
Expand All @@ -2821,7 +2821,7 @@ func (m *redisMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool,
}))
}

func (m *redisMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error {
func (m *redisMeta) scanTrashSlices(ctx Context, scan trashSliceScan) error {
if scan == nil {
return nil
}
Expand Down Expand Up @@ -2893,7 +2893,65 @@ func (m *redisMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error
return nil
}

func (m *redisMeta) scanDeletedFiles(ctx Context, scan deletedFileScan) error {
func (m *redisMeta) scanPendingSlices(ctx Context, scan pendingSliceScan) error {
if scan == nil {
return nil
}

pendingKeys := make(chan string, 1000)
c, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
_ = m.hscan(c, m.sliceRefs(), func(keys []string) error {
for i := 0; i < len(keys); i += 2 {
pendingKeys <- keys[i]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keys[i+1] is the value, we should check the value here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
return nil
})
close(pendingKeys)
}()

for key := range pendingKeys {
ps := strings.Split(key[1:], "_")
if len(ps) != 2 {
return fmt.Errorf("invalid key %s", key)
}
id, err := strconv.ParseUint(ps[0], 10, 64)
if err != nil {
return errors.Wrapf(err, "invalid key %s, fail to parse id", key)
}
size, err := strconv.ParseUint(ps[1], 10, 64)
if err != nil {
return errors.Wrapf(err, "invalid key %s, fail to parse size", key)
}
var clean bool
task := func(tx *redis.Tx) error {
refs, err := tx.HIncrBy(ctx, m.sliceRefs(), key, 0).Result()
if err == redis.Nil {
return nil
} else if err != nil {
return errors.Wrap(err, "increment slice refs")
}
if refs < 0 {
clean, err = scan(id, uint32(size))
if err != nil {
return errors.Wrap(err, "scan pending slices")
}
}
return nil
}
err = m.txn(ctx, task, m.sliceRefs())
if err != nil {
return err
}
if clean {
m.deleteSlice(id, uint32(size))
}
}
return nil
}

func (m *redisMeta) scanPendingFiles(ctx Context, scan pendingFileScan) error {
if scan == nil {
return nil
}
Expand Down
34 changes: 31 additions & 3 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2598,7 +2598,7 @@ func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, sh
return 0
}

return errno(m.scanDeletedSlices(ctx, func(ss []Slice, _ int64) (bool, error) {
return errno(m.scanTrashSlices(ctx, func(ss []Slice, _ int64) (bool, error) {
slices[1] = append(slices[1], ss...)
if showProgress != nil {
for range ss {
Expand All @@ -2609,7 +2609,7 @@ func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, sh
}))
}

func (m *dbMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error {
func (m *dbMeta) scanTrashSlices(ctx Context, scan trashSliceScan) error {
if scan == nil {
return nil
}
Expand Down Expand Up @@ -2676,7 +2676,35 @@ func (m *dbMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error {
return nil
}

func (m *dbMeta) scanDeletedFiles(ctx Context, scan deletedFileScan) error {
func (m *dbMeta) scanPendingSlices(ctx Context, scan pendingSliceScan) error {
if scan == nil {
return nil
}
var refs []sliceRef
err := m.roTxn(func(tx *xorm.Session) error {
if ok, err := tx.IsTableExist(&sliceRef{}); err != nil {
return err
} else if !ok {
return nil
}
return tx.Where("refs <= 0").Find(&refs)
})
if err != nil {
return errors.Wrap(err, "scan slice refs")
}
for _, ref := range refs {
clean, err := scan(ref.Id, ref.Size)
if err != nil {
return errors.Wrap(err, "scan slice")
}
if clean {
m.deleteSlice(ref.Id, ref.Size)
}
}
return nil
}

func (m *dbMeta) scanPendingFiles(ctx Context, scan pendingFileScan) error {
if scan == nil {
return nil
}
Expand Down
Loading