From 1f947a47297c6bc452aabb4680229d829fd72ba4 Mon Sep 17 00:00:00 2001 From: xixi Date: Wed, 23 Nov 2022 13:25:02 +0800 Subject: [PATCH 01/12] add delete api for statistic Signed-off-by: xixi --- cmd/status.go | 14 +++--- pkg/meta/base.go | 18 +++---- pkg/meta/context.go | 55 ++++++++++------------ pkg/meta/interface.go | 3 +- pkg/meta/redis.go | 106 ++++++++++++++++++++++++++++++++---------- pkg/meta/sql.go | 96 ++++++++++++++++++++++++++------------ pkg/meta/tkv.go | 84 ++++++++++++++++++++++++--------- 7 files changed, 253 insertions(+), 123 deletions(-) diff --git a/cmd/status.go b/cmd/status.go index 46a2559c3709..a45a764fc21f 100644 --- a/cmd/status.go +++ b/cmd/status.go @@ -121,14 +121,16 @@ func status(ctx *cli.Context) error { defer fileSpinner.Done() err = m.Statistic( - ctx.Context, - func(s meta.Slice) error { - slicesSpinner.IncrInt64(int64(s.Size)) - return nil + meta.WrapContext(ctx.Context), + func(ss []meta.Slice, _ int64) (bool, error) { + for _, s := range ss { + slicesSpinner.IncrInt64(int64(s.Size)) + } + return false, nil }, - func(_ meta.Ino, size uint64) error { + func(_ meta.Ino, size uint64, _ int64) (bool, error) { fileSpinner.IncrInt64(int64(size)) - return nil + return false, nil }, ) if err != nil { diff --git a/pkg/meta/base.go b/pkg/meta/base.go index 0d403653a267..5887cd83a35f 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -17,7 +17,6 @@ package meta import ( - "context" "encoding/json" "fmt" "path" @@ -82,12 +81,15 @@ type engine interface { doGetParents(ctx Context, inode Ino) map[Ino]int doRepair(ctx Context, inode Ino, attr *Attr) syscall.Errno - scanDeletedSlices(ctx context.Context, visitor func(s Slice) error) error - scanDeletedFiles(ctx context.Context, visitor func(ino Ino, size uint64) error) error + scanDeletedSlices(Context, deletedSliceScan) error + scanDeletedFiles(Context, deletedFileScan) error GetSession(sid uint64, detail bool) (*Session, error) } +type deletedSliceScan func(ss []Slice, ts int64) (clean bool, err error) +type deletedFileScan func(ino Ino, size uint64, ts int64) (clean bool, err error) + // fsStat aligned for atomic operations // nolint:structcheck type fsStat struct { @@ -1445,16 +1447,16 @@ func (m *baseMeta) cleanupDelayedSlices() { } } -func (m *baseMeta) Statistic(ctx context.Context, slicesDeletedScan func(Slice) error, fileDeletedScan func(ino Ino, size uint64) error) error { +func (m *baseMeta) Statistic(ctx Context, sliceScan deletedSliceScan, fileScan deletedFileScan) error { eg := errgroup.Group{} - if slicesDeletedScan != nil { + if sliceScan != nil { eg.Go(func() error { - return m.en.scanDeletedSlices(ctx, slicesDeletedScan) + return m.en.scanDeletedSlices(ctx, sliceScan) }) } - if fileDeletedScan != nil { + if fileScan != nil { eg.Go(func() error { - return m.en.scanDeletedFiles(ctx, fileDeletedScan) + return m.en.scanDeletedFiles(ctx, fileScan) }) } return eg.Wait() diff --git a/pkg/meta/context.go b/pkg/meta/context.go index f6ead9265716..a60b4a1f8c9c 100644 --- a/pkg/meta/context.go +++ b/pkg/meta/context.go @@ -40,58 +40,53 @@ type Context interface { Canceled() bool } -type emptyContext struct { - context.Context -} - -func (ctx *emptyContext) Gid() uint32 { return 0 } -func (ctx *emptyContext) Gids() []uint32 { return []uint32{0} } -func (ctx *emptyContext) Uid() uint32 { return 0 } -func (ctx *emptyContext) Pid() uint32 { return 1 } -func (ctx *emptyContext) Cancel() {} -func (ctx *emptyContext) Canceled() bool { return false } -func (ctx *emptyContext) WithValue(k, v interface{}) { - ctx.Context = context.WithValue(ctx.Context, k, v) -} - -var Background Context = &emptyContext{context.Background()} +var Background Context = WrapContext(context.Background()) -type myContext struct { +type wrapContext struct { context.Context - pid uint32 - uid uint32 - gids []uint32 - canceled bool + cancel func() + pid uint32 + uid uint32 + gids []uint32 } -func (c *myContext) Uid() uint32 { +func (c *wrapContext) Uid() uint32 { return c.uid } -func (c *myContext) Gid() uint32 { +func (c *wrapContext) Gid() uint32 { return c.gids[0] } -func (c *myContext) Gids() []uint32 { +func (c *wrapContext) Gids() []uint32 { return c.gids } -func (c *myContext) Pid() uint32 { +func (c *wrapContext) Pid() uint32 { return c.pid } -func (c *myContext) Cancel() { - c.canceled = true +func (c *wrapContext) Cancel() { + c.cancel() } -func (c *myContext) Canceled() bool { - return c.canceled +func (c *wrapContext) Canceled() bool { + return c.Err() != nil } -func (c *myContext) WithValue(k, v interface{}) { +func (c *wrapContext) WithValue(k, v interface{}) { c.Context = context.WithValue(c.Context, k, v) } func NewContext(pid, uid uint32, gids []uint32) Context { - return &myContext{context.Background(), pid, uid, gids, false} + return wrap(context.Background(), pid, uid, gids) +} + +func WrapContext(ctx context.Context) Context { + return wrap(ctx, 0, 0, []uint32{0}) +} + +func wrap(ctx context.Context, pid, uid uint32, gids []uint32) Context { + c, cancel := context.WithCancel(ctx) + return &wrapContext{c, cancel, pid, uid, gids} } diff --git a/pkg/meta/interface.go b/pkg/meta/interface.go index 7a700d994f47..cfa74fffee45 100644 --- a/pkg/meta/interface.go +++ b/pkg/meta/interface.go @@ -17,7 +17,6 @@ package meta import ( - "context" "fmt" "io" "net/url" @@ -268,7 +267,7 @@ type Meta interface { // ListSessions returns all client sessions. ListSessions() ([]*Session, error) // Statistic scan metadata by visitors. - Statistic(ctx context.Context, slicesDelayedScan func(Slice) error, fileDelayedScan func(ino Ino, size uint64) error) error + Statistic(Context, deletedSliceScan, deletedFileScan) error // CleanStaleSessions cleans up sessions not active for more than 5 minutes CleanStaleSessions() diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index 6cdd24f8c5f4..96ab0b069ad4 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -2809,40 +2809,91 @@ func (m *redisMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, return errno(err) } - return errno(m.scanDeletedSlices(ctx, func(s Slice) error { - slices[1] = append(slices[1], s) + return errno(m.scanDeletedSlices(ctx, func(ss []Slice, _ int64) (bool, error) { + slices[1] = append(slices[1], ss...) if showProgress != nil { - showProgress() + for range ss { + showProgress() + } } - return nil + return false, nil })) } -func (m *redisMeta) scanDeletedSlices(ctx context.Context, visitor func(s Slice) error) error { - if visitor == nil { +func (m *redisMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error { + if scan == nil { return nil } - visited := make(map[uint64]bool) - return m.hscan(ctx, m.delSlices(), func(keys []string) error { - var ss []Slice - for i := 0; i < len(keys); i += 2 { - ss = ss[:0] - m.decodeDelayedSlices([]byte(keys[i+1]), &ss) - for _, s := range ss { - if s.Id > 0 && !visited[s.Id] { - visited[s.Id] = true - if err := visitor(s); err != nil { - return err + + delKeys := make(chan string, 1000) + c, cancel := context.WithCancel(ctx) + defer cancel() + go func() { + _ = m.hscan(c, m.delSlices(), func(keys []string) error { + for i := 0; i < len(keys); i += 2 { + delKeys <- keys[i] + } + return nil + }) + close(delKeys) + }() + + var ss []Slice + var rs []*redis.IntCmd + for key := range delKeys { + ss = ss[:0] + rs = rs[:0] + var clean bool + task := func(tx *redis.Tx) error { + val, err := tx.HGet(ctx, m.delSlices(), key).Result() + if err == redis.Nil { + return nil + } else if err != nil { + return err + } + ps := strings.Split(key, "_") + if len(ps) != 2 { + return fmt.Errorf("invalid key %s", key) + } + ts, err := strconv.ParseInt(ps[1], 10, 64) + if err != nil { + return fmt.Errorf("invalid key %s, fail to parse timestamp", key) + } + + m.decodeDelayedSlices([]byte(val), &ss) + clean, err = scan(ss, ts) + if err != nil { + return err + } + if clean { + _, err = tx.Pipelined(ctx, func(pipe redis.Pipeliner) error { + for _, s := range ss { + rs = append(rs, pipe.HIncrBy(ctx, m.sliceRefs(), m.sliceKey(s.Id, s.Size), -1)) } + pipe.HDel(ctx, m.delSlices(), key) + return nil + }) + } + return err + } + err := m.txn(ctx, task, m.delSlices()) + if err != nil { + return err + } + if clean && len(rs) == len(ss) { + for i, s := range ss { + if rs[i].Err() == nil && rs[i].Val() < 0 { + m.deleteSlice(s.Id, s.Size) } } } - return nil - }) + } + + return nil } -func (m *redisMeta) scanDeletedFiles(ctx context.Context, visitor func(ino Ino, size uint64) error) error { - if visitor == nil { +func (m *redisMeta) scanDeletedFiles(ctx Context, scan deletedFileScan) error { + if scan == nil { return nil } @@ -2850,12 +2901,13 @@ func (m *redisMeta) scanDeletedFiles(ctx context.Context, visitor func(ino Ino, start := int64(0) const batchSize = 1000 for { - vals, err := m.rdb.ZRange(Background, m.delfiles(), start, start+batchSize).Result() + pairs, err := m.rdb.ZRangeWithScores(Background, m.delfiles(), start, start+batchSize).Result() if err != nil { return err } start += batchSize - for _, v := range vals { + for _, p := range pairs { + v := p.Member.(string) ps := strings.Split(v, ":") if len(ps) != 2 { // will be cleaned up as legacy continue @@ -2866,11 +2918,15 @@ func (m *redisMeta) scanDeletedFiles(ctx context.Context, visitor func(ino Ino, } visited[Ino(inode)] = true size, _ := strconv.ParseUint(ps[1], 10, 64) - if err := visitor(Ino(inode), size); err != nil { + clean, err := scan(Ino(inode), size, int64(p.Score)) + if err != nil { return err } + if clean { + m.doDeleteFileData_(Ino(inode), size, v) + } } - if len(vals) < batchSize { + if len(pairs) < batchSize { break } } diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index d57e1e3bbe6d..60cbf66ba024 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -22,7 +22,6 @@ package meta import ( "bufio" "bytes" - "context" "database/sql" "encoding/json" "errors" @@ -2588,68 +2587,103 @@ func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, sh return 0 } - return errno(m.scanDeletedSlices(ctx, func(s Slice) error { - slices[1] = append(slices[1], s) + return errno(m.scanDeletedSlices(ctx, func(ss []Slice, _ int64) (bool, error) { + slices[1] = append(slices[1], ss...) if showProgress != nil { - showProgress() + for range ss { + showProgress() + } } - return nil + return false, nil })) } -func (m *dbMeta) scanDeletedSlices(ctx context.Context, visitor func(s Slice) error) error { - if visitor == nil { +func (m *dbMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error { + if scan == nil { return nil } - return m.roTxn(func(s *xorm.Session) error { - if ok, err := s.IsTableExist(&delslices{}); err != nil { + var dss []delslices + + err := m.roTxn(func(tx *xorm.Session) error { + if ok, err := tx.IsTableExist(&delslices{}); err != nil { return err } else if !ok { return nil } - var dss []delslices - err := s.Find(&dss) + return tx.Find(&dss) + }) + if err != nil { + return err + } + var ss []Slice + for _, ds := range dss { + ss = ss[:0] + var clean bool + m.decodeDelayedSlices(ds.Slices, &ss) + err = m.txn(func(tx *xorm.Session) error { + clean, err = scan(ss, ds.Deleted) + if err != nil { + return err + } + if clean { + for _, s := range ss { + if _, e := tx.Exec("update jfs_chunk_ref set refs=refs-1 where chunkid=? and size=?", s.Id, s.Size); e != nil { + return e + } + } + _, err = tx.Delete(&delslices{Id: ds.Id}) + } + return err + }) if err != nil { return err } - var ss []Slice - for _, ds := range dss { - ss = ss[:0] - m.decodeDelayedSlices(ds.Slices, &ss) + if clean { for _, s := range ss { - if s.Id > 0 { - if err := visitor(s); err != nil { - return err + var ref = sliceRef{Id: s.Id} + err := m.roTxn(func(tx *xorm.Session) error { + ok, err := tx.Get(&ref) + if err == nil && !ok { + err = errors.New("not found") } + return err + }) + if err == nil && ref.Refs <= 0 { + m.deleteSlice(s.Id, s.Size) } } } - return nil - }) + } + return nil } -func (m *dbMeta) scanDeletedFiles(ctx context.Context, visitor func(ino Ino, size uint64) error) error { - if visitor == nil { +func (m *dbMeta) scanDeletedFiles(ctx Context, scan deletedFileScan) error { + if scan == nil { return nil } - return m.roTxn(func(s *xorm.Session) error { + + var dfs []delfile + err := m.roTxn(func(s *xorm.Session) error { if ok, err := s.IsTableExist(&delfile{}); err != nil { return err } else if !ok { return nil } - var dfs []delfile - err := s.Find(&dfs) + return s.Find(&dfs) + }) + if err != nil { + return err + } + for _, ds := range dfs { + clean, err := scan(ds.Inode, ds.Length, ds.Expire) if err != nil { return err } - for _, ds := range dfs { - if err := visitor(ds.Inode, ds.Length); err != nil { - return err - } + if clean { + m.doDeleteFileData(ds.Inode, ds.Length) } - return nil - }) + } + return nil } func (m *dbMeta) doRepair(ctx Context, inode Ino, attr *Attr) syscall.Errno { diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 6a9186ce1c9f..d3b7ce46e0e7 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -19,7 +19,6 @@ package meta import ( "bufio" "bytes" - "context" "encoding/binary" "encoding/json" "fmt" @@ -2200,57 +2199,100 @@ func (m *kvMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, sh return 0 } - return errno(m.scanDeletedSlices(ctx, func(s Slice) error { - slices[1] = append(slices[1], s) + return errno(m.scanDeletedSlices(ctx, func(ss []Slice, _ int64) (bool, error) { + slices[1] = append(slices[1], ss...) if showProgress != nil { - showProgress() + for range ss { + showProgress() + } } - return nil + return false, nil })) } -func (m *kvMeta) scanDeletedSlices(ctx context.Context, visitor func(s Slice) error) error { +func (m *kvMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error { + if scan == nil { + return nil + } + // delayed slices: Lttttttttcccccccc klen := 1 + 8 + 8 - result, err := m.scanValues(m.fmtKey("L"), -1, func(k, v []byte) bool { - return len(k) == klen - }) + keys, err := m.scanKeys(m.fmtKey("L")) if err != nil { return err } var ss []Slice - for _, value := range result { - ss = ss[:0] - m.decodeDelayedSlices(value, &ss) - for _, s := range ss { - if s.Id > 0 { - if err := visitor(s); err != nil { - return err + var rs []int64 + for _, key := range keys { + if len(keys) != klen { + continue + } + ss := ss[:0] + rs := rs[:0] + var clean bool + err = m.txn(func(tx kvTxn) error { + v := tx.get(key) + if len(v) == 0 { + return nil + } + b := utils.ReadBuffer([]byte(key)[1:]) + ts := b.Get64() + m.decodeDelayedSlices(v, &ss) + clean, err = scan(ss, int64(ts)) + if err != nil { + return err + } + if clean { + for _, s := range ss { + rs = append(rs, tx.incrBy(m.sliceKey(s.Id, s.Size), -1)) + } + tx.dels(key) + } + return nil + }) + if err != nil { + return err + } + if clean && len(rs) == len(ss) { + for i, s := range ss { + if rs[i] < 0 { + m.deleteSlice(s.Id, s.Size) } } } + } return nil } -func (m *kvMeta) scanDeletedFiles(ctx context.Context, visitor func(ino Ino, size uint64) error) error { - // deleted files: Dttttttttcccccccc +func (m *kvMeta) scanDeletedFiles(ctx Context, scan deletedFileScan) error { + if scan == nil { + return nil + } + // deleted files: Diiiiiiiissssssss klen := 1 + 8 + 8 - keys, err := m.scanKeys(m.fmtKey("D")) + pairs, err := m.scanValues(m.fmtKey("D"), -1, func(k, v []byte) bool { + return len(k) == klen + }) if err != nil { return err } - for _, key := range keys { + for key, value := range pairs { if len(key) != klen { return fmt.Errorf("invalid key %x", key) } ino := m.decodeInode([]byte(key)[1:9]) size := binary.BigEndian.Uint64([]byte(key)[9:]) - if err := visitor(ino, size); err != nil { + ts := m.parseInt64(value) + clean, err := scan(ino, size, ts) + if err != nil { return err } + if clean { + m.doDeleteFileData(ino, size) + } } return nil } From 0462b2f6457de3d61ad756bd2db39be659401fae Mon Sep 17 00:00:00 2001 From: xixi Date: Wed, 23 Nov 2022 17:17:41 +0800 Subject: [PATCH 02/12] scan and delete delfiles and delslices in gc subcommand Signed-off-by: xixi --- cmd/gc.go | 39 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/cmd/gc.go b/cmd/gc.go index 78c418fe2ecb..22e2735692c2 100644 --- a/cmd/gc.go +++ b/cmd/gc.go @@ -174,12 +174,49 @@ func gc(ctx *cli.Context) error { } // List all slices in metadata engine - var c = meta.NewContext(0, 0, []uint32{0}) + var c = meta.WrapContext(ctx.Context) slices := make(map[meta.Ino][]meta.Slice) r := m.ListSlices(c, slices, delete, sliceCSpin.Increment) if r != 0 { logger.Fatalf("list all slices: %s", r) } + + delayedSliceSpin := progress.AddDoubleSpinner("Delslices") + cleanedSliceSpin := progress.AddDoubleSpinner("Cleaned delslices") + delayedFileSpin := progress.AddDoubleSpinner("Delfiles") + cleanedFileSpin := progress.AddDoubleSpinner("Cleaned delfiles") + deadline := time.Now().Unix() - int64(format.TrashDays)*24*3600 + err = m.Statistic( + c, + func(ss []meta.Slice, ts int64) (bool, error) { + for _, s := range ss { + delayedSliceSpin.IncrInt64(int64(s.Size)) + if delete && ts < deadline { + cleanedSliceSpin.IncrInt64(int64(s.Size)) + } + } + if delete && ts < deadline { + return true, nil + } + return false, nil + }, + func(_ meta.Ino, size uint64, ts int64) (bool, error) { + delayedFileSpin.IncrInt64(int64(size)) + if delete { + cleanedFileSpin.IncrInt64(int64(size)) + return true, nil + } + return false, nil + }, + ) + if err != nil { + logger.Fatalf("statistic: %s", err) + } + delayedSliceSpin.Done() + cleanedSliceSpin.Done() + delayedFileSpin.Done() + cleanedFileSpin.Done() + if delete { close(sliceChan) wg.Wait() From 1500dc4cb87f0e78524d90f8cf24f8fa9b271635 Mon Sep 17 00:00:00 2001 From: xixi Date: Fri, 25 Nov 2022 13:19:20 +0800 Subject: [PATCH 03/12] fix slice leak Signed-off-by: xixi --- cmd/gc.go | 115 ++++++++++++++++++++++++------------------ pkg/meta/base.go | 15 ++++-- pkg/meta/interface.go | 2 + 3 files changed, 78 insertions(+), 54 deletions(-) diff --git a/cmd/gc.go b/cmd/gc.go index 22e2735692c2..c56f43d9de5b 100644 --- a/cmd/gc.go +++ b/cmd/gc.go @@ -110,12 +110,69 @@ func gc(ctx *cli.Context) error { // Scan all chunks first and do compaction if necessary progress := utils.NewProgress(false, false) - if ctx.Bool("compact") { - bar := progress.AddCountBar("Compacted chunks", 0) - spin := progress.AddDoubleSpinner("Compacted slices") + // Delete pending slices while listing all slices + delete := ctx.Bool("delete") + threads := ctx.Int("threads") + compact := ctx.Bool("compact") + if (delete || compact) && threads <= 0 { + logger.Fatal("threads should be greater than 0 to delete or compact objects") + } + + var wg sync.WaitGroup + var delSpin *utils.Bar + var sliceChan chan *dSlice // pending delete slices + + if delete || compact { + delSpin = progress.AddCountSpinner("Deleted pending") + sliceChan = make(chan *dSlice, 10240) m.OnMsg(meta.DeleteSlice, func(args ...interface{}) error { - return store.Remove(args[0].(uint64), int(args[1].(uint32))) + delSpin.Increment() + sliceChan <- &dSlice{args[0].(uint64), args[1].(uint32)} + return nil }) + for i := 0; i < threads; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for s := range sliceChan { + if err := store.Remove(s.id, int(s.length)); err != nil { + logger.Warnf("remove %d_%d: %s", s.id, s.length, err) + } + } + }() + } + } + + c := meta.WrapContext(ctx.Context) + delayedFileSpin := progress.AddDoubleSpinner("Delfiles") + cleanedFileSpin := progress.AddDoubleSpinner("Cleaned delfiles") + edge := time.Now().Add(-time.Duration(format.TrashDays) * 24 * time.Hour) + if delete { + cleanTrashSpin := progress.AddCountSpinner("Cleaned trash") + m.CleanupTrashBefore(c, edge, cleanTrashSpin.Increment) + cleanTrashSpin.Done() + } + err = m.Statistic( + c, + nil, + func(_ meta.Ino, size uint64, ts int64) (bool, error) { + delayedFileSpin.IncrInt64(int64(size)) + if delete { + cleanedFileSpin.IncrInt64(int64(size)) + return true, nil + } + return false, nil + }, + ) + if err != nil { + logger.Fatalf("statistic: %s", err) + } + delayedFileSpin.Done() + cleanedFileSpin.Done() + + if compact { + bar := progress.AddCountBar("Compacted chunks", 0) + spin := progress.AddDoubleSpinner("Compacted slices") m.OnMsg(meta.CompactChunk, func(args ...interface{}) error { slices := args[0].([]meta.Slice) err := vfs.Compact(chunkConf, store, slices, args[1].(uint64)) @@ -143,38 +200,7 @@ func gc(ctx *cli.Context) error { // put it above delete count spinner sliceCSpin := progress.AddCountSpinner("Listed slices") - // Delete pending slices while listing all slices - delete := ctx.Bool("delete") - threads := ctx.Int("threads") - if delete && threads <= 0 { - logger.Fatal("threads should be greater than 0 to delete objects") - } - var delSpin *utils.Bar - var sliceChan chan *dSlice // pending delete slices - var wg sync.WaitGroup - if delete { - delSpin = progress.AddCountSpinner("Deleted pending") - sliceChan = make(chan *dSlice, 10240) - m.OnMsg(meta.DeleteSlice, func(args ...interface{}) error { - delSpin.Increment() - sliceChan <- &dSlice{args[0].(uint64), args[1].(uint32)} - return nil - }) - for i := 0; i < threads; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for s := range sliceChan { - if err := store.Remove(s.id, int(s.length)); err != nil { - logger.Warnf("remove %d_%d: %s", s.id, s.length, err) - } - } - }() - } - } - // List all slices in metadata engine - var c = meta.WrapContext(ctx.Context) slices := make(map[meta.Ino][]meta.Slice) r := m.ListSlices(c, slices, delete, sliceCSpin.Increment) if r != 0 { @@ -183,39 +209,28 @@ func gc(ctx *cli.Context) error { delayedSliceSpin := progress.AddDoubleSpinner("Delslices") cleanedSliceSpin := progress.AddDoubleSpinner("Cleaned delslices") - delayedFileSpin := progress.AddDoubleSpinner("Delfiles") - cleanedFileSpin := progress.AddDoubleSpinner("Cleaned delfiles") - deadline := time.Now().Unix() - int64(format.TrashDays)*24*3600 + err = m.Statistic( c, func(ss []meta.Slice, ts int64) (bool, error) { for _, s := range ss { delayedSliceSpin.IncrInt64(int64(s.Size)) - if delete && ts < deadline { + if delete && ts < edge.Unix() { cleanedSliceSpin.IncrInt64(int64(s.Size)) } } - if delete && ts < deadline { - return true, nil - } - return false, nil - }, - func(_ meta.Ino, size uint64, ts int64) (bool, error) { - delayedFileSpin.IncrInt64(int64(size)) - if delete { - cleanedFileSpin.IncrInt64(int64(size)) + if delete && ts < edge.Unix() { return true, nil } return false, nil }, + nil, ) if err != nil { logger.Fatalf("statistic: %s", err) } delayedSliceSpin.Done() cleanedSliceSpin.Done() - delayedFileSpin.Done() - cleanedFileSpin.Done() if delete { close(sliceChan) diff --git a/pkg/meta/base.go b/pkg/meta/base.go index 5887cd83a35f..3d757c1465cf 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -1365,9 +1365,8 @@ func (m *baseMeta) cleanupTrash() { } } -func (m *baseMeta) doCleanupTrash(force bool) { +func (m *baseMeta) CleanupTrashBefore(ctx Context, edge time.Time, increProgress func()) { logger.Debugf("cleanup trash: started") - ctx := Background now := time.Now() var st syscall.Errno var entries []*Entry @@ -1385,7 +1384,6 @@ func (m *baseMeta) doCleanupTrash(force bool) { } }() batch := 1000000 - edge := now.Add(-time.Duration(24*m.fmt.TrashDays+1) * time.Hour) for len(entries) > 0 { e := entries[0] ts, err := time.Parse("2006-01-02-15", string(e.Name)) @@ -1393,7 +1391,7 @@ func (m *baseMeta) doCleanupTrash(force bool) { logger.Warnf("bad entry as a subTrash: %s", e.Name) continue } - if ts.Before(edge) || force { + if ts.Before(edge) { var subEntries []*Entry if st = m.en.doReaddir(ctx, e.Inode, 0, &subEntries, batch); st != 0 { logger.Warnf("readdir subTrash %d: %s", e.Inode, st) @@ -1411,6 +1409,7 @@ func (m *baseMeta) doCleanupTrash(force bool) { } if st == 0 { count++ + increProgress() } else { logger.Warnf("delete from trash %s/%s: %s", e.Name, se.Name, st) rmdir = false @@ -1431,6 +1430,14 @@ func (m *baseMeta) doCleanupTrash(force bool) { } } +func (m *baseMeta) doCleanupTrash(force bool) { + edge := time.Now().Add(-time.Duration(24*m.fmt.TrashDays+1) * time.Hour) + if force { + edge = time.Now() + } + m.CleanupTrashBefore(Background, edge, nil) +} + func (m *baseMeta) cleanupDelayedSlices() { now := time.Now() edge := now.Unix() - int64(m.fmt.TrashDays)*24*3600 diff --git a/pkg/meta/interface.go b/pkg/meta/interface.go index cfa74fffee45..6d76b005a682 100644 --- a/pkg/meta/interface.go +++ b/pkg/meta/interface.go @@ -270,6 +270,8 @@ type Meta interface { Statistic(Context, deletedSliceScan, deletedFileScan) error // CleanStaleSessions cleans up sessions not active for more than 5 minutes CleanStaleSessions() + // CleanupTrashBefore deletes all files in trash before the given time. + CleanupTrashBefore(ctx Context, edge time.Time, increProgress func()) // StatFS returns summary statistics of a volume. StatFS(ctx Context, totalspace, availspace, iused, iavail *uint64) syscall.Errno From b4c9cc190c75dfe129af4cb58c94ed6f2f1a48b3 Mon Sep 17 00:00:00 2001 From: xixi Date: Fri, 25 Nov 2022 17:33:15 +0800 Subject: [PATCH 04/12] fix increProgress nil bug Signed-off-by: xixi --- pkg/meta/base.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/meta/base.go b/pkg/meta/base.go index 3d757c1465cf..088ffd6396f6 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -1409,7 +1409,9 @@ func (m *baseMeta) CleanupTrashBefore(ctx Context, edge time.Time, increProgress } if st == 0 { count++ - increProgress() + if increProgress != nil { + increProgress() + } } else { logger.Warnf("delete from trash %s/%s: %s", e.Name, se.Name, st) rmdir = false From 204555dc181879626a06496d9426bc716df2c089 Mon Sep 17 00:00:00 2001 From: xixi Date: Mon, 28 Nov 2022 14:29:49 +0800 Subject: [PATCH 05/12] fix bugs of tkv meta engine Signed-off-by: xixi --- pkg/meta/tkv.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index d3b7ce46e0e7..4026d3d4b12f 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -2222,10 +2222,13 @@ func (m *kvMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error { return err } + fmt.Println("find keys", len(keys)) + var ss []Slice var rs []int64 for _, key := range keys { - if len(keys) != klen { + if len(key) != klen { + fmt.Println("invalid key length", len(key)) continue } ss := ss[:0] From b1ed7a9f3cdca63a72c8fdaff1aff814e560e097 Mon Sep 17 00:00:00 2001 From: xixi Date: Mon, 28 Nov 2022 14:31:51 +0800 Subject: [PATCH 06/12] remove unnecessary info Signed-off-by: xixi --- pkg/meta/tkv.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 4026d3d4b12f..150a3e9cbb62 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -2222,13 +2222,10 @@ func (m *kvMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error { return err } - fmt.Println("find keys", len(keys)) - var ss []Slice var rs []int64 for _, key := range keys { if len(key) != klen { - fmt.Println("invalid key length", len(key)) continue } ss := ss[:0] From 37afae51bccdd574bf8826ab40bde6696e73ce92 Mon Sep 17 00:00:00 2001 From: xixi Date: Thu, 1 Dec 2022 14:46:22 +0800 Subject: [PATCH 07/12] rename Statistic to ScanDeletedObject Signed-off-by: xixi --- cmd/gc.go | 4 ++-- cmd/status.go | 2 +- pkg/meta/base.go | 2 +- pkg/meta/interface.go | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/gc.go b/cmd/gc.go index c56f43d9de5b..fe199d517de3 100644 --- a/cmd/gc.go +++ b/cmd/gc.go @@ -152,7 +152,7 @@ func gc(ctx *cli.Context) error { m.CleanupTrashBefore(c, edge, cleanTrashSpin.Increment) cleanTrashSpin.Done() } - err = m.Statistic( + err = m.ScanDeletedObject( c, nil, func(_ meta.Ino, size uint64, ts int64) (bool, error) { @@ -210,7 +210,7 @@ func gc(ctx *cli.Context) error { delayedSliceSpin := progress.AddDoubleSpinner("Delslices") cleanedSliceSpin := progress.AddDoubleSpinner("Cleaned delslices") - err = m.Statistic( + err = m.ScanDeletedObject( c, func(ss []meta.Slice, ts int64) (bool, error) { for _, s := range ss { diff --git a/cmd/status.go b/cmd/status.go index a45a764fc21f..515f293fd29a 100644 --- a/cmd/status.go +++ b/cmd/status.go @@ -120,7 +120,7 @@ func status(ctx *cli.Context) error { fileSpinner := progress.AddDoubleSpinner("Delayed Files") defer fileSpinner.Done() - err = m.Statistic( + err = m.ScanDeletedObject( meta.WrapContext(ctx.Context), func(ss []meta.Slice, _ int64) (bool, error) { for _, s := range ss { diff --git a/pkg/meta/base.go b/pkg/meta/base.go index 088ffd6396f6..5871def8763e 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -1456,7 +1456,7 @@ func (m *baseMeta) cleanupDelayedSlices() { } } -func (m *baseMeta) Statistic(ctx Context, sliceScan deletedSliceScan, fileScan deletedFileScan) error { +func (m *baseMeta) ScanDeletedObject(ctx Context, sliceScan deletedSliceScan, fileScan deletedFileScan) error { eg := errgroup.Group{} if sliceScan != nil { eg.Go(func() error { diff --git a/pkg/meta/interface.go b/pkg/meta/interface.go index 6d76b005a682..674b58ff1562 100644 --- a/pkg/meta/interface.go +++ b/pkg/meta/interface.go @@ -266,8 +266,8 @@ type Meta interface { GetSession(sid uint64, detail bool) (*Session, error) // ListSessions returns all client sessions. ListSessions() ([]*Session, error) - // Statistic scan metadata by visitors. - Statistic(Context, deletedSliceScan, deletedFileScan) error + // ScanDeletedObject scan deleted objects by customized scanner. + ScanDeletedObject(Context, deletedSliceScan, deletedFileScan) error // CleanStaleSessions cleans up sessions not active for more than 5 minutes CleanStaleSessions() // CleanupTrashBefore deletes all files in trash before the given time. From 85b11167f6d71eb95a3a5549b7fb8d9600e92d68 Mon Sep 17 00:00:00 2001 From: xixi Date: Thu, 1 Dec 2022 14:56:53 +0800 Subject: [PATCH 08/12] update description and information of gc subcommand Signed-off-by: xixi --- cmd/gc.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/cmd/gc.go b/cmd/gc.go index fe199d517de3..202d42d2728e 100644 --- a/cmd/gc.go +++ b/cmd/gc.go @@ -42,7 +42,7 @@ func cmdGC() *cli.Command { ArgsUsage: "META-URL", Description: ` It scans all objects in data storage and slices in metadata, comparing them to see if there is any -leaked object. It can also actively trigger compaction of slices. +leaked object. It can also actively trigger compaction of slices and the cleanup of delayed deleted slices or files. Use this command if you find that data storage takes more than expected. Examples: @@ -52,7 +52,7 @@ $ juicefs gc redis://localhost # Trigger compaction of all slices $ juicefs gc redis://localhost --compact -# Delete leaked objects +# Delete leaked objects and delayed deleted slices or files $ juicefs gc redis://localhost --delete`, Flags: []cli.Flag{ &cli.BoolFlag{ @@ -61,7 +61,7 @@ $ juicefs gc redis://localhost --delete`, }, &cli.BoolFlag{ Name: "delete", - Usage: "delete leaked objects", + Usage: "delete leaked objects and delayed deleted slices or files", }, &cli.IntFlag{ Name: "threads", @@ -373,8 +373,10 @@ func gc(ctx *cli.Context) error { cc, cb := compacted.Current() lc, lb := leaked.Current() sc, sb := skipped.Current() - logger.Infof("scanned %d objects, %d valid, %d compacted (%d bytes), %d leaked (%d bytes), %d skipped (%d bytes)", - bar.Current(), vc, cc, cb, lc, lb, sc, sb) + dsc, dsb := cleanedSliceSpin.Current() + fc, fb := cleanedFileSpin.Current() + logger.Infof("scanned %d objects, %d valid, %d compacted (%d bytes), %d leaked (%d bytes), %d delslices (%d bytes), %d delfiles (%d bytes), %d skipped (%d bytes)", + bar.Current(), vc, cc, cb, lc, lb, dsc, dsb, fc, fb, sc, sb) if lc > 0 && !delete { logger.Infof("Please add `--delete` to clean leaked objects") } From 203f33dd83497fc8ac5dc1c65d220596ce3e94c9 Mon Sep 17 00:00:00 2001 From: xixi Date: Mon, 5 Dec 2022 11:51:22 +0800 Subject: [PATCH 09/12] change log Signed-off-by: xixi --- cmd/gc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/gc.go b/cmd/gc.go index 202d42d2728e..8461207d38b3 100644 --- a/cmd/gc.go +++ b/cmd/gc.go @@ -165,7 +165,7 @@ func gc(ctx *cli.Context) error { }, ) if err != nil { - logger.Fatalf("statistic: %s", err) + logger.Fatalf("scan deleted object: %s", err) } delayedFileSpin.Done() cleanedFileSpin.Done() From d2566a86151ced4ec38fceee2ac454b6c6969bfa Mon Sep 17 00:00:00 2001 From: xixi Date: Mon, 5 Dec 2022 12:20:59 +0800 Subject: [PATCH 10/12] fix bugs on conflicts Signed-off-by: xixi --- pkg/meta/redis.go | 4 ++-- pkg/meta/sql.go | 18 +++++++++++++----- pkg/meta/tkv.go | 4 ++-- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index 96ab0b069ad4..765421d88ef8 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -2841,10 +2841,10 @@ func (m *redisMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error var ss []Slice var rs []*redis.IntCmd for key := range delKeys { - ss = ss[:0] - rs = rs[:0] var clean bool task := func(tx *redis.Tx) error { + ss = ss[:0] + rs = rs[:0] val, err := tx.HGet(ctx, m.delSlices(), key).Result() if err == redis.Nil { return nil diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 9aeded63ccc2..cccc075f9551 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -24,7 +24,6 @@ import ( "bytes" "database/sql" "encoding/json" - "errors" "fmt" "io" "net/url" @@ -37,6 +36,7 @@ import ( "time" "github.com/juicedata/juicefs/pkg/utils" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "xorm.io/xorm" "xorm.io/xorm/log" @@ -2619,11 +2619,19 @@ func (m *dbMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error { } var ss []Slice for _, ds := range dss { - ss = ss[:0] var clean bool - m.decodeDelayedSlices(ds.Slices, &ss) err = m.txn(func(tx *xorm.Session) error { - clean, err = scan(ss, ds.Deleted) + ss = ss[:0] + del := delslices{Id: ds.Id} + found, err := tx.Get(&del) + if err != nil { + return errors.Wrapf(err, "get delslices %d", ds.Id) + } + if !found { + return nil + } + m.decodeDelayedSlices(del.Slices, &ss) + clean, err = scan(ss, del.Deleted) if err != nil { return err } @@ -2633,7 +2641,7 @@ func (m *dbMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error { return e } } - _, err = tx.Delete(&delslices{Id: ds.Id}) + _, err = tx.Delete(del) } return err }) diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 150a3e9cbb62..fbfd04164fde 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -2228,10 +2228,10 @@ func (m *kvMeta) scanDeletedSlices(ctx Context, scan deletedSliceScan) error { if len(key) != klen { continue } - ss := ss[:0] - rs := rs[:0] var clean bool err = m.txn(func(tx kvTxn) error { + ss := ss[:0] + rs := rs[:0] v := tx.get(key) if len(v) == 0 { return nil From 067689a786d66c0c2b3ff98a5a3527c6b8028184 Mon Sep 17 00:00:00 2001 From: xixi Date: Mon, 5 Dec 2022 17:35:29 +0800 Subject: [PATCH 11/12] fix unit tests Signed-off-by: xixi --- cmd/gc.go | 105 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 58 insertions(+), 47 deletions(-) diff --git a/cmd/gc.go b/cmd/gc.go index 8461207d38b3..2591ccdbd6d2 100644 --- a/cmd/gc.go +++ b/cmd/gc.go @@ -152,23 +152,27 @@ func gc(ctx *cli.Context) error { m.CleanupTrashBefore(c, edge, cleanTrashSpin.Increment) cleanTrashSpin.Done() } - err = m.ScanDeletedObject( - c, - nil, - func(_ meta.Ino, size uint64, ts int64) (bool, error) { - delayedFileSpin.IncrInt64(int64(size)) - if delete { - cleanedFileSpin.IncrInt64(int64(size)) - return true, nil - } - return false, nil - }, - ) - if err != nil { - logger.Fatalf("scan deleted object: %s", err) - } - delayedFileSpin.Done() - cleanedFileSpin.Done() + wg.Add(1) + go func() { + defer delayedFileSpin.Done() + defer cleanedFileSpin.Done() + defer wg.Done() + err := m.ScanDeletedObject( + c, + nil, + func(_ meta.Ino, size uint64, ts int64) (bool, error) { + delayedFileSpin.IncrInt64(int64(size)) + if delete { + cleanedFileSpin.IncrInt64(int64(size)) + return true, nil + } + return false, nil + }, + ) + if err != nil { + logger.Fatalf("scan deleted object: %s", err) + } + }() if compact { bar := progress.AddCountBar("Compacted chunks", 0) @@ -182,8 +186,6 @@ func gc(ctx *cli.Context) error { return err }) if st := m.CompactAll(meta.Background, ctx.Int("threads"), bar); st == 0 { - bar.Done() - spin.Done() if progress.Quiet { c, b := spin.Current() logger.Infof("Compacted %d chunks (%d slices, %d bytes).", bar.Current(), c, b) @@ -191,6 +193,8 @@ func gc(ctx *cli.Context) error { } else { logger.Errorf("compact all chunks: %s", st) } + bar.Done() + spin.Done() } else { m.OnMsg(meta.CompactChunk, func(args ...interface{}) error { return nil // ignore compaction @@ -210,37 +214,31 @@ func gc(ctx *cli.Context) error { delayedSliceSpin := progress.AddDoubleSpinner("Delslices") cleanedSliceSpin := progress.AddDoubleSpinner("Cleaned delslices") - err = m.ScanDeletedObject( - c, - func(ss []meta.Slice, ts int64) (bool, error) { - for _, s := range ss { - delayedSliceSpin.IncrInt64(int64(s.Size)) + wg.Add(1) + go func() { + defer delayedSliceSpin.Done() + defer cleanedSliceSpin.Done() + defer wg.Done() + err := m.ScanDeletedObject( + c, + func(ss []meta.Slice, ts int64) (bool, error) { + for _, s := range ss { + delayedSliceSpin.IncrInt64(int64(s.Size)) + if delete && ts < edge.Unix() { + cleanedSliceSpin.IncrInt64(int64(s.Size)) + } + } if delete && ts < edge.Unix() { - cleanedSliceSpin.IncrInt64(int64(s.Size)) + return true, nil } - } - if delete && ts < edge.Unix() { - return true, nil - } - return false, nil - }, - nil, - ) - if err != nil { - logger.Fatalf("statistic: %s", err) - } - delayedSliceSpin.Done() - cleanedSliceSpin.Done() - - if delete { - close(sliceChan) - wg.Wait() - delSpin.Done() - if progress.Quiet { - logger.Infof("Deleted %d pending slices", delSpin.Current()) + return false, nil + }, + nil, + ) + if err != nil { + logger.Fatalf("statistic: %s", err) } - } - sliceCSpin.Done() + }() // Scan all objects to find leaked ones blob = object.WithPrefix(blob, "chunks/") @@ -365,8 +363,21 @@ func gc(ctx *cli.Context) error { } } } + m.OnMsg(meta.DeleteSlice, func(args ...interface{}) error { + return nil + }) + if sliceChan != nil { + close(sliceChan) + } close(leakedObj) wg.Wait() + if delete || compact { + delSpin.Done() + if progress.Quiet { + logger.Infof("Deleted %d pending slices", delSpin.Current()) + } + } + sliceCSpin.Done() progress.Done() vc, _ := valid.Current() From 4c5c274c992513a461de68e5214e3546d7764ec8 Mon Sep 17 00:00:00 2001 From: xixi Date: Mon, 5 Dec 2022 17:54:32 +0800 Subject: [PATCH 12/12] make scan sync Signed-off-by: xixi --- cmd/gc.go | 81 +++++++++++++++++++++++++------------------------------ 1 file changed, 37 insertions(+), 44 deletions(-) diff --git a/cmd/gc.go b/cmd/gc.go index 2591ccdbd6d2..64cdf2af099a 100644 --- a/cmd/gc.go +++ b/cmd/gc.go @@ -152,27 +152,24 @@ func gc(ctx *cli.Context) error { m.CleanupTrashBefore(c, edge, cleanTrashSpin.Increment) cleanTrashSpin.Done() } - wg.Add(1) - go func() { - defer delayedFileSpin.Done() - defer cleanedFileSpin.Done() - defer wg.Done() - err := m.ScanDeletedObject( - c, - nil, - func(_ meta.Ino, size uint64, ts int64) (bool, error) { - delayedFileSpin.IncrInt64(int64(size)) - if delete { - cleanedFileSpin.IncrInt64(int64(size)) - return true, nil - } - return false, nil - }, - ) - if err != nil { - logger.Fatalf("scan deleted object: %s", err) - } - }() + + err = m.ScanDeletedObject( + c, + nil, + func(_ meta.Ino, size uint64, ts int64) (bool, error) { + delayedFileSpin.IncrInt64(int64(size)) + if delete { + cleanedFileSpin.IncrInt64(int64(size)) + return true, nil + } + return false, nil + }, + ) + if err != nil { + logger.Fatalf("scan deleted object: %s", err) + } + delayedFileSpin.Done() + cleanedFileSpin.Done() if compact { bar := progress.AddCountBar("Compacted chunks", 0) @@ -214,31 +211,27 @@ func gc(ctx *cli.Context) error { delayedSliceSpin := progress.AddDoubleSpinner("Delslices") cleanedSliceSpin := progress.AddDoubleSpinner("Cleaned delslices") - wg.Add(1) - go func() { - defer delayedSliceSpin.Done() - defer cleanedSliceSpin.Done() - defer wg.Done() - err := m.ScanDeletedObject( - c, - func(ss []meta.Slice, ts int64) (bool, error) { - for _, s := range ss { - delayedSliceSpin.IncrInt64(int64(s.Size)) - if delete && ts < edge.Unix() { - cleanedSliceSpin.IncrInt64(int64(s.Size)) - } - } + err = m.ScanDeletedObject( + c, + func(ss []meta.Slice, ts int64) (bool, error) { + for _, s := range ss { + delayedSliceSpin.IncrInt64(int64(s.Size)) if delete && ts < edge.Unix() { - return true, nil + cleanedSliceSpin.IncrInt64(int64(s.Size)) } - return false, nil - }, - nil, - ) - if err != nil { - logger.Fatalf("statistic: %s", err) - } - }() + } + if delete && ts < edge.Unix() { + return true, nil + } + return false, nil + }, + nil, + ) + if err != nil { + logger.Fatalf("statistic: %s", err) + } + delayedSliceSpin.Done() + cleanedSliceSpin.Done() // Scan all objects to find leaked ones blob = object.WithPrefix(blob, "chunks/")