Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/sync: filter files based on size and age #4912

Merged
merged 4 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,22 @@ func selectionFlags() []cli.Flag {
Name: "match-full-path",
Usage: "match filters again the full path",
},
&cli.StringFlag{
Name: "max-size",
Usage: "skip files larger than `SIZE`",
},
&cli.StringFlag{
Name: "min-size",
Usage: "skip files smaller than `SIZE`",
},
&cli.StringFlag{
Name: "max-age",
Usage: "skip files older than `DURATION`",
},
&cli.StringFlag{
Name: "min-age",
Usage: "skip files newer than `DURATION`",
},
&cli.Int64Flag{
Name: "limit",
Usage: "limit the number of objects that will be processed (-1 is unlimited, 0 is to process nothing)",
Expand Down
19 changes: 19 additions & 0 deletions pkg/sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package sync

import (
"math"
"os"
"strings"
"time"

"github.com/juicedata/juicefs/pkg/utils"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -56,6 +58,10 @@ type Config struct {
Quiet bool
CheckAll bool
CheckNew bool
MaxSize int64
MinSize int64
MaxAge time.Duration
MinAge time.Duration
Env map[string]string

rules []rule
Expand Down Expand Up @@ -164,8 +170,21 @@ func NewConfigFromCli(c *cli.Context) *Config {
Quiet: c.Bool("quiet"),
CheckAll: c.Bool("check-all"),
CheckNew: c.Bool("check-new"),
MaxSize: int64(utils.ParseBytes(c, "max-size", 'B')),
MinSize: int64(utils.ParseBytes(c, "min-size", 'B')),
MaxAge: utils.Duration(c.String("max-age")),
MinAge: utils.Duration(c.String("min-age")),
Env: make(map[string]string),
}
if !c.IsSet("max-size") {
cfg.MaxSize = math.MaxInt64
}
if cfg.MinSize > cfg.MaxSize {
logger.Fatal("min-size should not be larger than max-size")
}
if cfg.MinAge > cfg.MaxAge {
logger.Fatal("min-age should not be larger than max-age")
}
if cfg.Threads <= 0 {
logger.Warnf("threads should be larger than 0, reset it to 1")
cfg.Threads = 1
Expand Down
38 changes: 26 additions & 12 deletions pkg/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,10 +774,8 @@ func startSingleProducer(tasks chan<- object.Object, src, dst object.ObjectStora
}

func produce(tasks chan<- object.Object, srckeys, dstkeys <-chan object.Object, config *Config) error {
if len(config.rules) > 0 {
srckeys = filter(srckeys, config.rules, config)
dstkeys = filter(dstkeys, config.rules, config)
}
srckeys = filter(srckeys, config.rules, config)
dstkeys = filter(dstkeys, config.rules, config)
var dstobj object.Object
for obj := range srckeys {
if obj == nil {
Expand Down Expand Up @@ -913,25 +911,41 @@ func parseIncludeRules(args []string) (rules []rule) {
return
}

func filterKey(o object.Object, now time.Time, rules []rule, config *Config) bool {
var ok bool = true
if !o.IsDir() && !o.IsSymlink() {
ok = o.Size() >= int64(config.MinSize) && o.Size() <= int64(config.MaxSize)
if ok && config.MaxAge > 0 {
ok = o.Mtime().After(now.Add(-config.MaxAge))
}
if ok && config.MinAge > 0 {
ok = o.Mtime().Before(now.Add(-config.MinAge))
}
}
if ok {
if config.MatchFullPath {
ok = matchFullPath(rules, o.Key())
} else {
ok = matchLeveledPath(rules, o.Key())
}
}
return ok
}

func filter(keys <-chan object.Object, rules []rule, config *Config) <-chan object.Object {
r := make(chan object.Object)
now := time.Now()
go func() {
for o := range keys {
if o == nil {
// Telling that the listing has failed
r <- nil
break
}
var ok bool
if config.MatchFullPath {
ok = matchFullPath(rules, o.Key())
} else {
ok = matchLeveledPath(rules, o.Key())
}
if ok {
if filterKey(o, now, rules, config) {
r <- o
} else {
logger.Debugf("exclude %s", o.Key())
logger.Debugf("exclude %s size: %d, mtime: %s", o.Key(), o.Size(), o.Mtime())
}
}
close(r)
Expand Down
37 changes: 37 additions & 0 deletions pkg/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"reflect"
"strings"
"testing"
"time"

"github.com/juicedata/juicefs/pkg/object"
)
Expand Down Expand Up @@ -105,6 +106,7 @@ func TestSync(t *testing.T) {
DeleteDst: false,
Exclude: []string{"c*"},
Include: []string{"a[1-9]", "a*"},
MaxSize: math.MaxInt64,
Verbose: false,
Quiet: true,
}
Expand Down Expand Up @@ -192,6 +194,7 @@ func TestSyncIncludeAndExclude(t *testing.T) {
Verbose: false,
Limit: -1,
Quiet: true,
MaxSize: math.MaxInt64,
Exclude: []string{"1"},
}
a, _ := object.CreateStorage("file", "/tmp/a/", "", "", "")
Expand Down Expand Up @@ -330,6 +333,7 @@ func TestSyncLink(t *testing.T) {
Quiet: true,
Limit: -1,
ForceUpdate: true,
MaxSize: math.MaxInt64,
}); err != nil {
t.Fatalf("sync: %s", err)
}
Expand Down Expand Up @@ -385,6 +389,7 @@ func TestSyncLinkWithOutFollow(t *testing.T) {
Quiet: true,
ForceUpdate: true,
Limit: -1,
MaxSize: math.MaxInt64,
}); err != nil {
t.Fatalf("sync: %s", err)
}
Expand Down Expand Up @@ -419,6 +424,7 @@ func TestSingleLink(t *testing.T) {
Links: true,
Quiet: true,
Limit: -1,
MaxSize: math.MaxInt64,
ForceUpdate: true,
}); err != nil {
t.Fatalf("sync: %s", err)
Expand Down Expand Up @@ -455,6 +461,7 @@ func TestSyncCheckAllLink(t *testing.T) {
Links: true,
Quiet: true,
Limit: -1,
MaxSize: math.MaxInt64,
CheckAll: true,
}); err != nil {
t.Fatalf("sync: %s", err)
Expand Down Expand Up @@ -493,6 +500,7 @@ func TestSyncCheckNewLink(t *testing.T) {
Links: true,
Quiet: true,
Limit: -1,
MaxSize: math.MaxInt64,
CheckNew: true,
}); err != nil {
t.Fatalf("sync: %s", err)
Expand Down Expand Up @@ -546,6 +554,7 @@ func TestLimits(t *testing.T) {
Threads: 50,
Update: true,
Perms: true,
MaxSize: math.MaxInt64,
}
setConfig := func(config *Config, subC subConfig) {
config.Limit = subC.limit
Expand Down Expand Up @@ -738,3 +747,31 @@ func TestParseFilterRule(t *testing.T) {
}
}
}

type mockObject struct {
size int64
mtime time.Time
}

func (o *mockObject) Key() string { return "" }
func (o *mockObject) IsDir() bool { return false }
func (o *mockObject) IsSymlink() bool { return false }
func (o *mockObject) Size() int64 { return o.size }
func (o *mockObject) Mtime() time.Time { return o.mtime }
func (o *mockObject) StorageClass() string { return "" }

func TestFilterSizeAndAge(t *testing.T) {
config := &Config{
MaxSize: 100,
MinSize: 10,
MaxAge: time.Second * 100,
MinAge: time.Second * 10,
}
now := time.Now()
if !filterKey(&mockObject{10, now.Add(-time.Second * 15)}, now, nil, config) {
t.Fatalf("filterKey failed")
}
if filterKey(&mockObject{200, now.Add(-time.Second * 200)}, now, nil, config) {
t.Fatalf("filterKey should fail")
}
}
Loading