From c66e61f5f50daf71f6d056d77ad6fd501a72e238 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 31 May 2024 12:12:28 +0800 Subject: [PATCH 1/4] cmd/sync: filter files based on size and age --- cmd/sync.go | 16 ++++++++++++++++ pkg/sync/config.go | 13 +++++++++++++ pkg/sync/sync.go | 26 ++++++++++++++++++++------ 3 files changed, 49 insertions(+), 6 deletions(-) diff --git a/cmd/sync.go b/cmd/sync.go index 35fc55082cae..8312e8ac161b 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -123,6 +123,22 @@ func selectionFlags() []cli.Flag { Name: "match-full-path", Usage: "match filters again the full path", }, + &cli.StringFlag{ + Name: "max-size", + Usage: "skip files larger than `SIZE`", + }, + &cli.StringFlag{ + Name: "min-size", + Usage: "skip files smaller than `SIZE`", + }, + &cli.StringFlag{ + Name: "max-age", + Usage: "skip files older than `DURATION`", + }, + &cli.StringFlag{ + Name: "max-age", + Usage: "skip files newer than `DURATION`", + }, &cli.Int64Flag{ Name: "limit", Usage: "limit the number of objects that will be processed (-1 is unlimited, 0 is to process nothing)", diff --git a/pkg/sync/config.go b/pkg/sync/config.go index 7463d3012f9a..514a2be16aa8 100644 --- a/pkg/sync/config.go +++ b/pkg/sync/config.go @@ -17,8 +17,10 @@ package sync import ( + "math" "os" "strings" + "time" "github.com/juicedata/juicefs/pkg/utils" "github.com/prometheus/client_golang/prometheus" @@ -56,6 +58,10 @@ type Config struct { Quiet bool CheckAll bool CheckNew bool + MaxSize int64 + MinSize int64 + MaxAge time.Duration + MinAge time.Duration Env map[string]string rules []rule @@ -164,8 +170,15 @@ func NewConfigFromCli(c *cli.Context) *Config { Quiet: c.Bool("quiet"), CheckAll: c.Bool("check-all"), CheckNew: c.Bool("check-new"), + MaxSize: int64(utils.ParseBytes(c, "max-size", 'B')), + MinSize: int64(utils.ParseBytes(c, "min-size", 'B')), + MaxAge: utils.Duration(c.String("max-age")), + MinAge: utils.Duration(c.String("min-age")), Env: make(map[string]string), } + if !c.IsSet("max-size") { + cfg.MaxSize = math.MaxInt64 + } if cfg.Threads <= 0 { logger.Warnf("threads should be larger than 0, reset it to 1") cfg.Threads = 1 diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index 92c0a267724d..02dd4bc380f6 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -915,6 +915,9 @@ func parseIncludeRules(args []string) (rules []rule) { func filter(keys <-chan object.Object, rules []rule, config *Config) <-chan object.Object { r := make(chan object.Object) + now := time.Now() + maxMtime := now.Add(-config.MaxAge) + minMtime := now.Add(-config.MinAge) go func() { for o := range keys { if o == nil { @@ -922,16 +925,27 @@ func filter(keys <-chan object.Object, rules []rule, config *Config) <-chan obje r <- nil break } - var ok bool - if config.MatchFullPath { - ok = matchFullPath(rules, o.Key()) - } else { - ok = matchLeveledPath(rules, o.Key()) + var ok bool = true + if !o.IsDir() && !o.IsSymlink() { + ok = o.Size() >= int64(config.MinSize) && o.Size() <= int64(config.MaxSize) + if ok && config.MaxAge > 0 { + ok = o.Mtime().After(maxMtime) + } + if ok && config.MinAge > 0 { + ok = o.Mtime().Before(minMtime) + } + } + if ok { + if config.MatchFullPath { + ok = matchFullPath(rules, o.Key()) + } else { + ok = matchLeveledPath(rules, o.Key()) + } } if ok { r <- o } else { - logger.Debugf("exclude %s", o.Key()) + logger.Debugf("exclude %s size: %d, mtime: %s", o.Key(), o.Size(), o.Mtime()) } } close(r) From 6f27fa561645b3c59c4ae9848202db1bc0b24811 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 31 May 2024 13:29:02 +0800 Subject: [PATCH 2/4] fix tests --- pkg/sync/sync.go | 6 ++---- pkg/sync/sync_test.go | 8 ++++++++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index 02dd4bc380f6..416dd4bddf04 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -774,10 +774,8 @@ func startSingleProducer(tasks chan<- object.Object, src, dst object.ObjectStora } func produce(tasks chan<- object.Object, srckeys, dstkeys <-chan object.Object, config *Config) error { - if len(config.rules) > 0 { - srckeys = filter(srckeys, config.rules, config) - dstkeys = filter(dstkeys, config.rules, config) - } + srckeys = filter(srckeys, config.rules, config) + dstkeys = filter(dstkeys, config.rules, config) var dstobj object.Object for obj := range srckeys { if obj == nil { diff --git a/pkg/sync/sync_test.go b/pkg/sync/sync_test.go index 789b851e847f..64aca70c8963 100644 --- a/pkg/sync/sync_test.go +++ b/pkg/sync/sync_test.go @@ -105,6 +105,7 @@ func TestSync(t *testing.T) { DeleteDst: false, Exclude: []string{"c*"}, Include: []string{"a[1-9]", "a*"}, + MaxSize: math.MaxInt64, Verbose: false, Quiet: true, } @@ -192,6 +193,7 @@ func TestSyncIncludeAndExclude(t *testing.T) { Verbose: false, Limit: -1, Quiet: true, + MaxSize: math.MaxInt64, Exclude: []string{"1"}, } a, _ := object.CreateStorage("file", "/tmp/a/", "", "", "") @@ -330,6 +332,7 @@ func TestSyncLink(t *testing.T) { Quiet: true, Limit: -1, ForceUpdate: true, + MaxSize: math.MaxInt64, }); err != nil { t.Fatalf("sync: %s", err) } @@ -385,6 +388,7 @@ func TestSyncLinkWithOutFollow(t *testing.T) { Quiet: true, ForceUpdate: true, Limit: -1, + MaxSize: math.MaxInt64, }); err != nil { t.Fatalf("sync: %s", err) } @@ -419,6 +423,7 @@ func TestSingleLink(t *testing.T) { Links: true, Quiet: true, Limit: -1, + MaxSize: math.MaxInt64, ForceUpdate: true, }); err != nil { t.Fatalf("sync: %s", err) @@ -455,6 +460,7 @@ func TestSyncCheckAllLink(t *testing.T) { Links: true, Quiet: true, Limit: -1, + MaxSize: math.MaxInt64, CheckAll: true, }); err != nil { t.Fatalf("sync: %s", err) @@ -493,6 +499,7 @@ func TestSyncCheckNewLink(t *testing.T) { Links: true, Quiet: true, Limit: -1, + MaxSize: math.MaxInt64, CheckNew: true, }); err != nil { t.Fatalf("sync: %s", err) @@ -546,6 +553,7 @@ func TestLimits(t *testing.T) { Threads: 50, Update: true, Perms: true, + MaxSize: math.MaxInt64, } setConfig := func(config *Config, subC subConfig) { config.Limit = subC.limit From 8b18997d23afb4813025217148197058d3564b01 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 31 May 2024 13:51:46 +0800 Subject: [PATCH 3/4] add unit test --- pkg/sync/config.go | 6 ++++++ pkg/sync/sync.go | 42 ++++++++++++++++++++++-------------------- pkg/sync/sync_test.go | 29 +++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 20 deletions(-) diff --git a/pkg/sync/config.go b/pkg/sync/config.go index 514a2be16aa8..1cc82e37c565 100644 --- a/pkg/sync/config.go +++ b/pkg/sync/config.go @@ -179,6 +179,12 @@ func NewConfigFromCli(c *cli.Context) *Config { if !c.IsSet("max-size") { cfg.MaxSize = math.MaxInt64 } + if cfg.MinSize > cfg.MaxSize { + logger.Fatal("min-size should not be larger than max-size") + } + if cfg.MinAge > cfg.MaxAge { + logger.Fatal("min-age should not be larger than max-age") + } if cfg.Threads <= 0 { logger.Warnf("threads should be larger than 0, reset it to 1") cfg.Threads = 1 diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index 416dd4bddf04..2e992361903f 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -911,11 +911,30 @@ func parseIncludeRules(args []string) (rules []rule) { return } +func filterKey(o object.Object, now time.Time, rules []rule, config *Config) bool { + var ok bool = true + if !o.IsDir() && !o.IsSymlink() { + ok = o.Size() >= int64(config.MinSize) && o.Size() <= int64(config.MaxSize) + if ok && config.MaxAge > 0 { + ok = o.Mtime().After(now.Add(-config.MaxAge)) + } + if ok && config.MinAge > 0 { + ok = o.Mtime().Before(now.Add(-config.MinAge)) + } + } + if ok { + if config.MatchFullPath { + ok = matchFullPath(rules, o.Key()) + } else { + ok = matchLeveledPath(rules, o.Key()) + } + } + return ok +} + func filter(keys <-chan object.Object, rules []rule, config *Config) <-chan object.Object { r := make(chan object.Object) now := time.Now() - maxMtime := now.Add(-config.MaxAge) - minMtime := now.Add(-config.MinAge) go func() { for o := range keys { if o == nil { @@ -923,24 +942,7 @@ func filter(keys <-chan object.Object, rules []rule, config *Config) <-chan obje r <- nil break } - var ok bool = true - if !o.IsDir() && !o.IsSymlink() { - ok = o.Size() >= int64(config.MinSize) && o.Size() <= int64(config.MaxSize) - if ok && config.MaxAge > 0 { - ok = o.Mtime().After(maxMtime) - } - if ok && config.MinAge > 0 { - ok = o.Mtime().Before(minMtime) - } - } - if ok { - if config.MatchFullPath { - ok = matchFullPath(rules, o.Key()) - } else { - ok = matchLeveledPath(rules, o.Key()) - } - } - if ok { + if filterKey(o, now, rules, config) { r <- o } else { logger.Debugf("exclude %s size: %d, mtime: %s", o.Key(), o.Size(), o.Mtime()) diff --git a/pkg/sync/sync_test.go b/pkg/sync/sync_test.go index 64aca70c8963..d5ea90329d8c 100644 --- a/pkg/sync/sync_test.go +++ b/pkg/sync/sync_test.go @@ -24,6 +24,7 @@ import ( "reflect" "strings" "testing" + "time" "github.com/juicedata/juicefs/pkg/object" ) @@ -746,3 +747,31 @@ func TestParseFilterRule(t *testing.T) { } } } + +type mockObject struct { + size int64 + mtime time.Time +} + +func (o *mockObject) Key() string { return "" } +func (o *mockObject) IsDir() bool { return false } +func (o *mockObject) IsSymlink() bool { return false } +func (o *mockObject) Size() int64 { return o.size } +func (o *mockObject) Mtime() time.Time { return o.mtime } +func (o *mockObject) StorageClass() string { return "" } + +func TestFilterSizeAndAge(t *testing.T) { + config := &Config{ + MaxSize: 100, + MinSize: 10, + MaxAge: time.Second * 100, + MinAge: time.Second * 10, + } + now := time.Now() + if !filterKey(&mockObject{10, now.Add(-time.Second * 15)}, now, nil, config) { + t.Fatalf("filterKey failed") + } + if filterKey(&mockObject{200, now.Add(-time.Second * 200)}, now, nil, config) { + t.Fatalf("filterKey should fail") + } +} From d4d399ed3f3bef5b48313fabb817ce5014291a1d Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 31 May 2024 13:55:18 +0800 Subject: [PATCH 4/4] fix flag --- cmd/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/sync.go b/cmd/sync.go index 8312e8ac161b..dbfb2dc3c6a1 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -136,7 +136,7 @@ func selectionFlags() []cli.Flag { Usage: "skip files older than `DURATION`", }, &cli.StringFlag{ - Name: "max-age", + Name: "min-age", Usage: "skip files newer than `DURATION`", }, &cli.Int64Flag{