diff --git a/cmd/sync.go b/cmd/sync.go index 35fc55082cae..dbfb2dc3c6a1 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -123,6 +123,22 @@ func selectionFlags() []cli.Flag { Name: "match-full-path", Usage: "match filters again the full path", }, + &cli.StringFlag{ + Name: "max-size", + Usage: "skip files larger than `SIZE`", + }, + &cli.StringFlag{ + Name: "min-size", + Usage: "skip files smaller than `SIZE`", + }, + &cli.StringFlag{ + Name: "max-age", + Usage: "skip files older than `DURATION`", + }, + &cli.StringFlag{ + Name: "min-age", + Usage: "skip files newer than `DURATION`", + }, &cli.Int64Flag{ Name: "limit", Usage: "limit the number of objects that will be processed (-1 is unlimited, 0 is to process nothing)", diff --git a/pkg/sync/config.go b/pkg/sync/config.go index 7463d3012f9a..1cc82e37c565 100644 --- a/pkg/sync/config.go +++ b/pkg/sync/config.go @@ -17,8 +17,10 @@ package sync import ( + "math" "os" "strings" + "time" "github.com/juicedata/juicefs/pkg/utils" "github.com/prometheus/client_golang/prometheus" @@ -56,6 +58,10 @@ type Config struct { Quiet bool CheckAll bool CheckNew bool + MaxSize int64 + MinSize int64 + MaxAge time.Duration + MinAge time.Duration Env map[string]string rules []rule @@ -164,8 +170,21 @@ func NewConfigFromCli(c *cli.Context) *Config { Quiet: c.Bool("quiet"), CheckAll: c.Bool("check-all"), CheckNew: c.Bool("check-new"), + MaxSize: int64(utils.ParseBytes(c, "max-size", 'B')), + MinSize: int64(utils.ParseBytes(c, "min-size", 'B')), + MaxAge: utils.Duration(c.String("max-age")), + MinAge: utils.Duration(c.String("min-age")), Env: make(map[string]string), } + if !c.IsSet("max-size") { + cfg.MaxSize = math.MaxInt64 + } + if cfg.MinSize > cfg.MaxSize { + logger.Fatal("min-size should not be larger than max-size") + } + if cfg.MinAge > cfg.MaxAge { + logger.Fatal("min-age should not be larger than max-age") + } if cfg.Threads <= 0 { logger.Warnf("threads should be larger than 0, reset it to 1") cfg.Threads = 1 diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index 92c0a267724d..2e992361903f 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -774,10 +774,8 @@ func startSingleProducer(tasks chan<- object.Object, src, dst object.ObjectStora } func produce(tasks chan<- object.Object, srckeys, dstkeys <-chan object.Object, config *Config) error { - if len(config.rules) > 0 { - srckeys = filter(srckeys, config.rules, config) - dstkeys = filter(dstkeys, config.rules, config) - } + srckeys = filter(srckeys, config.rules, config) + dstkeys = filter(dstkeys, config.rules, config) var dstobj object.Object for obj := range srckeys { if obj == nil { @@ -913,8 +911,30 @@ func parseIncludeRules(args []string) (rules []rule) { return } +func filterKey(o object.Object, now time.Time, rules []rule, config *Config) bool { + var ok bool = true + if !o.IsDir() && !o.IsSymlink() { + ok = o.Size() >= int64(config.MinSize) && o.Size() <= int64(config.MaxSize) + if ok && config.MaxAge > 0 { + ok = o.Mtime().After(now.Add(-config.MaxAge)) + } + if ok && config.MinAge > 0 { + ok = o.Mtime().Before(now.Add(-config.MinAge)) + } + } + if ok { + if config.MatchFullPath { + ok = matchFullPath(rules, o.Key()) + } else { + ok = matchLeveledPath(rules, o.Key()) + } + } + return ok +} + func filter(keys <-chan object.Object, rules []rule, config *Config) <-chan object.Object { r := make(chan object.Object) + now := time.Now() go func() { for o := range keys { if o == nil { @@ -922,16 +942,10 @@ func filter(keys <-chan object.Object, rules []rule, config *Config) <-chan obje r <- nil break } - var ok bool - if config.MatchFullPath { - ok = matchFullPath(rules, o.Key()) - } else { - ok = matchLeveledPath(rules, o.Key()) - } - if ok { + if filterKey(o, now, rules, config) { r <- o } else { - logger.Debugf("exclude %s", o.Key()) + logger.Debugf("exclude %s size: %d, mtime: %s", o.Key(), o.Size(), o.Mtime()) } } close(r) diff --git a/pkg/sync/sync_test.go b/pkg/sync/sync_test.go index 789b851e847f..d5ea90329d8c 100644 --- a/pkg/sync/sync_test.go +++ b/pkg/sync/sync_test.go @@ -24,6 +24,7 @@ import ( "reflect" "strings" "testing" + "time" "github.com/juicedata/juicefs/pkg/object" ) @@ -105,6 +106,7 @@ func TestSync(t *testing.T) { DeleteDst: false, Exclude: []string{"c*"}, Include: []string{"a[1-9]", "a*"}, + MaxSize: math.MaxInt64, Verbose: false, Quiet: true, } @@ -192,6 +194,7 @@ func TestSyncIncludeAndExclude(t *testing.T) { Verbose: false, Limit: -1, Quiet: true, + MaxSize: math.MaxInt64, Exclude: []string{"1"}, } a, _ := object.CreateStorage("file", "/tmp/a/", "", "", "") @@ -330,6 +333,7 @@ func TestSyncLink(t *testing.T) { Quiet: true, Limit: -1, ForceUpdate: true, + MaxSize: math.MaxInt64, }); err != nil { t.Fatalf("sync: %s", err) } @@ -385,6 +389,7 @@ func TestSyncLinkWithOutFollow(t *testing.T) { Quiet: true, ForceUpdate: true, Limit: -1, + MaxSize: math.MaxInt64, }); err != nil { t.Fatalf("sync: %s", err) } @@ -419,6 +424,7 @@ func TestSingleLink(t *testing.T) { Links: true, Quiet: true, Limit: -1, + MaxSize: math.MaxInt64, ForceUpdate: true, }); err != nil { t.Fatalf("sync: %s", err) @@ -455,6 +461,7 @@ func TestSyncCheckAllLink(t *testing.T) { Links: true, Quiet: true, Limit: -1, + MaxSize: math.MaxInt64, CheckAll: true, }); err != nil { t.Fatalf("sync: %s", err) @@ -493,6 +500,7 @@ func TestSyncCheckNewLink(t *testing.T) { Links: true, Quiet: true, Limit: -1, + MaxSize: math.MaxInt64, CheckNew: true, }); err != nil { t.Fatalf("sync: %s", err) @@ -546,6 +554,7 @@ func TestLimits(t *testing.T) { Threads: 50, Update: true, Perms: true, + MaxSize: math.MaxInt64, } setConfig := func(config *Config, subC subConfig) { config.Limit = subC.limit @@ -738,3 +747,31 @@ func TestParseFilterRule(t *testing.T) { } } } + +type mockObject struct { + size int64 + mtime time.Time +} + +func (o *mockObject) Key() string { return "" } +func (o *mockObject) IsDir() bool { return false } +func (o *mockObject) IsSymlink() bool { return false } +func (o *mockObject) Size() int64 { return o.size } +func (o *mockObject) Mtime() time.Time { return o.mtime } +func (o *mockObject) StorageClass() string { return "" } + +func TestFilterSizeAndAge(t *testing.T) { + config := &Config{ + MaxSize: 100, + MinSize: 10, + MaxAge: time.Second * 100, + MinAge: time.Second * 10, + } + now := time.Now() + if !filterKey(&mockObject{10, now.Add(-time.Second * 15)}, now, nil, config) { + t.Fatalf("filterKey failed") + } + if filterKey(&mockObject{200, now.Add(-time.Second * 200)}, now, nil, config) { + t.Fatalf("filterKey should fail") + } +}