diff --git a/.github/workflows/pjdfstest.yml b/.github/workflows/pjdfstest.yml index 4cdd467c61b9..6b48713f7aaa 100644 --- a/.github/workflows/pjdfstest.yml +++ b/.github/workflows/pjdfstest.yml @@ -149,7 +149,7 @@ jobs: needs: [pjdfstest] if: always() steps: - - uses: technote-space/workflow-conclusion-action@v2 + - uses: technote-space/workflow-conclusion-action@v3 - uses: actions/checkout@v3 - name: Check Failure diff --git a/cmd/gc.go b/cmd/gc.go index 64cdf2af099a..5d7bebb8cd6f 100644 --- a/cmd/gc.go +++ b/cmd/gc.go @@ -155,7 +155,7 @@ func gc(ctx *cli.Context) error { err = m.ScanDeletedObject( c, - nil, + nil, nil, nil, func(_ meta.Ino, size uint64, ts int64) (bool, error) { delayedFileSpin.IncrInt64(int64(size)) if delete { @@ -225,7 +225,7 @@ func gc(ctx *cli.Context) error { } return false, nil }, - nil, + nil, nil, nil, ) if err != nil { logger.Fatalf("statistic: %s", err) diff --git a/cmd/status.go b/cmd/status.go index b82ed00d22e2..4d978d0ebfd4 100644 --- a/cmd/status.go +++ b/cmd/status.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "syscall" + "time" "github.com/juicedata/juicefs/pkg/meta" "github.com/juicedata/juicefs/pkg/utils" @@ -64,14 +65,18 @@ type sections struct { } type statistic struct { - UsedSpace uint64 - AvailableSpace uint64 - UsedInodes uint64 - AvailableInodes uint64 - TrashSliceCount int64 `json:",omitempty"` - TrashSliceSize int64 `json:",omitempty"` - PendingDeletedFileCount int64 `json:",omitempty"` - PendingDeletedFileSize int64 `json:",omitempty"` + UsedSpace uint64 + AvailableSpace uint64 + UsedInodes uint64 + AvailableInodes uint64 + TrashFileCount int64 `json:",omitempty"` + TrashFileSize int64 `json:",omitempty"` + PendingDeletedFileCount int64 `json:",omitempty"` + PendingDeletedFileSize int64 `json:",omitempty"` + TrashSliceCount int64 `json:",omitempty"` + TrashSliceSize int64 `json:",omitempty"` + PendingDeletedSliceCount int64 `json:",omitempty"` + PendingDeletedSliceSize int64 `json:",omitempty"` } func printJson(v interface{}) { @@ -115,30 +120,43 @@ func status(ctx *cli.Context) error { if ctx.Bool("more") { progress := utils.NewProgress(false, false) - slicesSpinner := progress.AddDoubleSpinner("Trash Slices") - defer slicesSpinner.Done() - fileSpinner := progress.AddDoubleSpinner("Pending Deleted Files") - defer fileSpinner.Done() - + trashFileSpinner := progress.AddDoubleSpinner("Trash Files") + pendingDeletedFileSpinner := progress.AddDoubleSpinner("Pending Deleted Files") + trashSlicesSpinner := progress.AddDoubleSpinner("Trash Slices") + pendingDeletedSlicesSpinner := progress.AddDoubleSpinner("Pending Deleted Slices") err = m.ScanDeletedObject( meta.WrapContext(ctx.Context), func(ss []meta.Slice, _ int64) (bool, error) { for _, s := range ss { - slicesSpinner.IncrInt64(int64(s.Size)) + trashSlicesSpinner.IncrInt64(int64(s.Size)) } return false, nil }, + func(_ uint64, size uint32) (bool, error) { + pendingDeletedSlicesSpinner.IncrInt64(int64(size)) + return false, nil + }, + func(_ meta.Ino, size uint64, _ time.Time) (bool, error) { + trashFileSpinner.IncrInt64(int64(size)) + return false, nil + }, func(_ meta.Ino, size uint64, _ int64) (bool, error) { - fileSpinner.IncrInt64(int64(size)) + pendingDeletedFileSpinner.IncrInt64(int64(size)) return false, nil }, ) if err != nil { logger.Fatalf("statistic: %s", err) - return err } - stat.TrashSliceCount, stat.TrashSliceSize = slicesSpinner.Current() - stat.PendingDeletedFileCount, stat.PendingDeletedFileSize = fileSpinner.Current() + trashSlicesSpinner.Done() + pendingDeletedSlicesSpinner.Done() + trashFileSpinner.Done() + pendingDeletedFileSpinner.Done() + progress.Done() + stat.TrashSliceCount, stat.TrashSliceSize = trashSlicesSpinner.Current() + stat.PendingDeletedSliceCount, stat.PendingDeletedSliceSize = pendingDeletedSlicesSpinner.Current() + stat.TrashFileCount, stat.TrashFileSize = trashFileSpinner.Current() + stat.PendingDeletedFileCount, stat.PendingDeletedFileSize = pendingDeletedFileSpinner.Current() } printJson(§ions{format, sessions, stat}) diff --git a/pkg/meta/base.go b/pkg/meta/base.go index 6328354fc1a2..43b53d374300 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -32,6 +32,7 @@ import ( "github.com/juicedata/juicefs/pkg/utils" "github.com/juicedata/juicefs/pkg/version" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "golang.org/x/sync/errgroup" ) @@ -81,14 +82,17 @@ type engine interface { doGetParents(ctx Context, inode Ino) map[Ino]int doRepair(ctx Context, inode Ino, attr *Attr) syscall.Errno - scanDeletedSlices(Context, deletedSliceScan) error - scanDeletedFiles(Context, deletedFileScan) error + scanTrashSlices(Context, trashSliceScan) error + scanPendingSlices(Context, pendingSliceScan) error + scanPendingFiles(Context, pendingFileScan) error GetSession(sid uint64, detail bool) (*Session, error) } -type deletedSliceScan func(ss []Slice, ts int64) (clean bool, err error) -type deletedFileScan func(ino Ino, size uint64, ts int64) (clean bool, err error) +type trashSliceScan func(ss []Slice, ts int64) (clean bool, err error) +type pendingSliceScan func(id uint64, size uint32) (clean bool, err error) +type trashFileScan func(inode Ino, size uint64, ts time.Time) (clean bool, err error) +type pendingFileScan func(ino Ino, size uint64, ts int64) (clean bool, err error) // fsStat aligned for atomic operations // nolint:structcheck @@ -1438,6 +1442,42 @@ func (m *baseMeta) CleanupTrashBefore(ctx Context, edge time.Time, increProgress } } +func (m *baseMeta) scanTrashFiles(ctx Context, scan trashFileScan) error { + var st syscall.Errno + var entries []*Entry + if st = m.en.doReaddir(ctx, TrashInode, 1, &entries, -1); st != 0 { + return errors.Wrap(st, "read trash") + } + + var subEntries []*Entry + for _, entry := range entries { + ts, err := time.Parse("2006-01-02-15", string(entry.Name)) + if err != nil { + logger.Warnf("bad entry as a subTrash: %s", entry.Name) + continue + } + subEntries = subEntries[:0] + if st = m.en.doReaddir(ctx, entry.Inode, 1, &subEntries, -1); st != 0 { + logger.Warnf("readdir subEntry %d: %s", entry.Inode, st) + continue + } + for _, se := range subEntries { + if se.Attr.Typ == TypeFile { + clean, err := scan(se.Inode, se.Attr.Length, ts) + if err != nil { + return errors.Wrap(err, "scan trash files") + } + if clean { + // TODO: m.en.doUnlink(ctx, entry.Attr.Parent, string(entry.Name)) + // avoid lint warning + _ = clean + } + } + } + } + return nil +} + func (m *baseMeta) doCleanupTrash(force bool) { edge := time.Now().Add(-time.Duration(24*m.fmt.TrashDays+1) * time.Hour) if force { @@ -1457,16 +1497,26 @@ func (m *baseMeta) cleanupDelayedSlices() { } } -func (m *baseMeta) ScanDeletedObject(ctx Context, sliceScan deletedSliceScan, fileScan deletedFileScan) error { +func (m *baseMeta) ScanDeletedObject(ctx Context, tss trashSliceScan, pss pendingSliceScan, tfs trashFileScan, pfs pendingFileScan) error { eg := errgroup.Group{} - if sliceScan != nil { + if tss != nil { + eg.Go(func() error { + return m.en.scanTrashSlices(ctx, tss) + }) + } + if pss != nil { + eg.Go(func() error { + return m.en.scanPendingSlices(ctx, pss) + }) + } + if tfs != nil { eg.Go(func() error { - return m.en.scanDeletedSlices(ctx, sliceScan) + return m.scanTrashFiles(ctx, tfs) }) } - if fileScan != nil { + if pfs != nil { eg.Go(func() error { - return m.en.scanDeletedFiles(ctx, fileScan) + return m.en.scanPendingFiles(ctx, pfs) }) } return eg.Wait() diff --git a/pkg/meta/interface.go b/pkg/meta/interface.go index 19cc143ce4b1..7de7219cecc9 100644 --- a/pkg/meta/interface.go +++ b/pkg/meta/interface.go @@ -270,7 +270,7 @@ type Meta interface { // ListSessions returns all client sessions. ListSessions() ([]*Session, error) // ScanDeletedObject scan deleted objects by customized scanner. - ScanDeletedObject(Context, deletedSliceScan, deletedFileScan) error + ScanDeletedObject(Context, trashSliceScan, pendingSliceScan, trashFileScan, pendingFileScan) error // ListLocks returns all locks of a inode. ListLocks(ctx context.Context, inode Ino) ([]PLockItem, []FLockItem, error) // CleanStaleSessions cleans up sessions not active for more than 5 minutes diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index 70ad5b54344c..1e2b9ff1e31e 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -2810,7 +2810,7 @@ func (m *redisMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, return errno(err) } - return errno(m.scanDeletedSlices(ctx, func(ss []Slice, _ int64) (bool, error) { + return errno(m.scanTrashSlices(ctx, func(ss []Slice, _ int64) (bool, error) { slices[1] = append(slices[1], ss...) if showProgress != nil { for range ss { @@ -2821,7 +2821,7 @@ func (m *redisMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, })) } -func (m *redisMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error { +func (m *redisMeta) scanTrashSlices(ctx Context, scan trashSliceScan) error { if scan == nil { return nil } @@ -2893,7 +2893,60 @@ func (m *redisMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error return nil } -func (m *redisMeta) scanDeletedFiles(ctx Context, scan deletedFileScan) error { +func (m *redisMeta) scanPendingSlices(ctx Context, scan pendingSliceScan) error { + if scan == nil { + return nil + } + + pendingKeys := make(chan string, 1000) + c, cancel := context.WithCancel(ctx) + defer cancel() + go func() { + _ = m.hscan(c, m.sliceRefs(), func(keys []string) error { + for i := 0; i < len(keys); i += 2 { + val := keys[i+1] + refs, err := strconv.ParseInt(val, 10, 64) + if err != nil { + // ignored + logger.Warn(errors.Wrapf(err, "parse slice ref: %s", val)) + return nil + } + if refs < 0 { + pendingKeys <- keys[i] + } + } + return nil + }) + close(pendingKeys) + }() + + for key := range pendingKeys { + ps := strings.Split(key[1:], "_") + if len(ps) != 2 { + return fmt.Errorf("invalid key %s", key) + } + id, err := strconv.ParseUint(ps[0], 10, 64) + if err != nil { + return errors.Wrapf(err, "invalid key %s, fail to parse id", key) + } + size, err := strconv.ParseUint(ps[1], 10, 64) + if err != nil { + return errors.Wrapf(err, "invalid key %s, fail to parse size", key) + } + clean, err := scan(id, uint32(size)) + if err != nil { + return errors.Wrap(err, "scan pending slices") + } + if clean { + // TODO: m.deleteSlice(id, uint32(size)) + // avoid lint warning + _ = clean + } + } + return nil +} + +func (m *redisMeta) scanPendingFiles(ctx Context, scan pendingFileScan) error { if scan == nil { return nil } diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index ff5060ac5818..e7b1cd1de8ed 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -2598,7 +2598,7 @@ func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, sh return 0 } - return errno(m.scanDeletedSlices(ctx, func(ss []Slice, _ int64) (bool, error) { + return errno(m.scanTrashSlices(ctx, func(ss []Slice, _ int64) (bool, error) { slices[1] = append(slices[1], ss...) if showProgress != nil { for range ss { @@ -2609,7 +2609,7 @@ func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, sh })) } -func (m *dbMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error { +func (m *dbMeta) scanTrashSlices(ctx Context, scan trashSliceScan) error { if scan == nil { return nil } @@ -2676,7 +2676,37 @@ func (m *dbMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error { return nil } -func (m *dbMeta) scanDeletedFiles(ctx Context, scan deletedFileScan) error { +func (m *dbMeta) scanPendingSlices(ctx Context, scan pendingSliceScan) error { + if scan == nil { + return nil + } + var refs []sliceRef + err := m.roTxn(func(tx *xorm.Session) error { + if ok, err := tx.IsTableExist(&sliceRef{}); err != nil { + return err + } else if !ok { + return nil + } + return tx.Where("refs <= 0").Find(&refs) + }) + if err != nil { + return errors.Wrap(err, "scan slice refs") + } + for _, ref := range refs { + clean, err := scan(ref.Id, ref.Size) + if err != nil { + return errors.Wrap(err, "scan slice") + } + if clean { + // TODO: m.deleteSlice(ref.Id, ref.Size) + // avoid lint warning + _ = clean + } + } + return nil +} + +func (m *dbMeta) scanPendingFiles(ctx Context, scan pendingFileScan) error { if scan == nil { return nil } diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 2f8d95a546b0..19f6ecda327f 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -2237,7 +2237,7 @@ func (m *kvMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, sh return 0 } - return errno(m.scanDeletedSlices(ctx, func(ss []Slice, _ int64) (bool, error) { + return errno(m.scanTrashSlices(ctx, func(ss []Slice, _ int64) (bool, error) { slices[1] = append(slices[1], ss...) if showProgress != nil { for range ss { @@ -2248,7 +2248,7 @@ func (m *kvMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, sh })) } -func (m *kvMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error { +func (m *kvMeta) scanTrashSlices(ctx Context, scan trashSliceScan) error { if scan == nil { return nil } @@ -2304,7 +2304,39 @@ func (m *kvMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error { return nil } -func (m *kvMeta) scanDeletedFiles(ctx Context, scan deletedFileScan) error { +func (m *kvMeta) scanPendingSlices(ctx Context, scan pendingSliceScan) error { + if scan == nil { + return nil + } + + // slice refs: Kiiiiiiiissss + klen := 1 + 8 + 4 + pairs, err := m.scanValues(m.fmtKey("K"), -1, func(k, v []byte) bool { + refs := parseCounter(v) + return len(k) == klen && refs < 0 + }) + if err != nil { + return err + } + + for key := range pairs { + b := utils.ReadBuffer([]byte(key)[1:]) + id := b.Get64() + size := b.Get32() + clean, err := scan(id, size) + if err != nil { + return errors.Wrap(err, "scan pending deleted slices") + } + if clean { + // TODO: m.deleteSlice(id, size) + // avoid lint warning + _ = clean + } + } + return nil +} + +func (m *kvMeta) scanPendingFiles(ctx Context, scan pendingFileScan) error { if scan == nil { return nil }