Skip to content

Commit

Permalink
cmd/sync: filter files based on size and age (#4912)
Browse files Browse the repository at this point in the history
  • Loading branch information
davies authored May 31, 2024
1 parent e46be36 commit fbe92e3
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 12 deletions.
16 changes: 16 additions & 0 deletions cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,22 @@ func selectionFlags() []cli.Flag {
Name: "match-full-path",
Usage: "match filters again the full path",
},
&cli.StringFlag{
Name: "max-size",
Usage: "skip files larger than `SIZE`",
},
&cli.StringFlag{
Name: "min-size",
Usage: "skip files smaller than `SIZE`",
},
&cli.StringFlag{
Name: "max-age",
Usage: "skip files older than `DURATION`",
},
&cli.StringFlag{
Name: "min-age",
Usage: "skip files newer than `DURATION`",
},
&cli.Int64Flag{
Name: "limit",
Usage: "limit the number of objects that will be processed (-1 is unlimited, 0 is to process nothing)",
Expand Down
19 changes: 19 additions & 0 deletions pkg/sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package sync

import (
"math"
"os"
"strings"
"time"

"github.com/juicedata/juicefs/pkg/utils"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -56,6 +58,10 @@ type Config struct {
Quiet bool
CheckAll bool
CheckNew bool
MaxSize int64
MinSize int64
MaxAge time.Duration
MinAge time.Duration
Env map[string]string

rules []rule
Expand Down Expand Up @@ -164,8 +170,21 @@ func NewConfigFromCli(c *cli.Context) *Config {
Quiet: c.Bool("quiet"),
CheckAll: c.Bool("check-all"),
CheckNew: c.Bool("check-new"),
MaxSize: int64(utils.ParseBytes(c, "max-size", 'B')),
MinSize: int64(utils.ParseBytes(c, "min-size", 'B')),
MaxAge: utils.Duration(c.String("max-age")),
MinAge: utils.Duration(c.String("min-age")),
Env: make(map[string]string),
}
if !c.IsSet("max-size") {
cfg.MaxSize = math.MaxInt64
}
if cfg.MinSize > cfg.MaxSize {
logger.Fatal("min-size should not be larger than max-size")
}
if cfg.MinAge > cfg.MaxAge {
logger.Fatal("min-age should not be larger than max-age")
}
if cfg.Threads <= 0 {
logger.Warnf("threads should be larger than 0, reset it to 1")
cfg.Threads = 1
Expand Down
38 changes: 26 additions & 12 deletions pkg/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,10 +774,8 @@ func startSingleProducer(tasks chan<- object.Object, src, dst object.ObjectStora
}

func produce(tasks chan<- object.Object, srckeys, dstkeys <-chan object.Object, config *Config) error {
if len(config.rules) > 0 {
srckeys = filter(srckeys, config.rules, config)
dstkeys = filter(dstkeys, config.rules, config)
}
srckeys = filter(srckeys, config.rules, config)
dstkeys = filter(dstkeys, config.rules, config)
var dstobj object.Object
for obj := range srckeys {
if obj == nil {
Expand Down Expand Up @@ -913,25 +911,41 @@ func parseIncludeRules(args []string) (rules []rule) {
return
}

func filterKey(o object.Object, now time.Time, rules []rule, config *Config) bool {
var ok bool = true
if !o.IsDir() && !o.IsSymlink() {
ok = o.Size() >= int64(config.MinSize) && o.Size() <= int64(config.MaxSize)
if ok && config.MaxAge > 0 {
ok = o.Mtime().After(now.Add(-config.MaxAge))
}
if ok && config.MinAge > 0 {
ok = o.Mtime().Before(now.Add(-config.MinAge))
}
}
if ok {
if config.MatchFullPath {
ok = matchFullPath(rules, o.Key())
} else {
ok = matchLeveledPath(rules, o.Key())
}
}
return ok
}

func filter(keys <-chan object.Object, rules []rule, config *Config) <-chan object.Object {
r := make(chan object.Object)
now := time.Now()
go func() {
for o := range keys {
if o == nil {
// Telling that the listing has failed
r <- nil
break
}
var ok bool
if config.MatchFullPath {
ok = matchFullPath(rules, o.Key())
} else {
ok = matchLeveledPath(rules, o.Key())
}
if ok {
if filterKey(o, now, rules, config) {
r <- o
} else {
logger.Debugf("exclude %s", o.Key())
logger.Debugf("exclude %s size: %d, mtime: %s", o.Key(), o.Size(), o.Mtime())
}
}
close(r)
Expand Down
37 changes: 37 additions & 0 deletions pkg/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"reflect"
"strings"
"testing"
"time"

"github.com/juicedata/juicefs/pkg/object"
)
Expand Down Expand Up @@ -105,6 +106,7 @@ func TestSync(t *testing.T) {
DeleteDst: false,
Exclude: []string{"c*"},
Include: []string{"a[1-9]", "a*"},
MaxSize: math.MaxInt64,
Verbose: false,
Quiet: true,
}
Expand Down Expand Up @@ -192,6 +194,7 @@ func TestSyncIncludeAndExclude(t *testing.T) {
Verbose: false,
Limit: -1,
Quiet: true,
MaxSize: math.MaxInt64,
Exclude: []string{"1"},
}
a, _ := object.CreateStorage("file", "/tmp/a/", "", "", "")
Expand Down Expand Up @@ -330,6 +333,7 @@ func TestSyncLink(t *testing.T) {
Quiet: true,
Limit: -1,
ForceUpdate: true,
MaxSize: math.MaxInt64,
}); err != nil {
t.Fatalf("sync: %s", err)
}
Expand Down Expand Up @@ -385,6 +389,7 @@ func TestSyncLinkWithOutFollow(t *testing.T) {
Quiet: true,
ForceUpdate: true,
Limit: -1,
MaxSize: math.MaxInt64,
}); err != nil {
t.Fatalf("sync: %s", err)
}
Expand Down Expand Up @@ -419,6 +424,7 @@ func TestSingleLink(t *testing.T) {
Links: true,
Quiet: true,
Limit: -1,
MaxSize: math.MaxInt64,
ForceUpdate: true,
}); err != nil {
t.Fatalf("sync: %s", err)
Expand Down Expand Up @@ -455,6 +461,7 @@ func TestSyncCheckAllLink(t *testing.T) {
Links: true,
Quiet: true,
Limit: -1,
MaxSize: math.MaxInt64,
CheckAll: true,
}); err != nil {
t.Fatalf("sync: %s", err)
Expand Down Expand Up @@ -493,6 +500,7 @@ func TestSyncCheckNewLink(t *testing.T) {
Links: true,
Quiet: true,
Limit: -1,
MaxSize: math.MaxInt64,
CheckNew: true,
}); err != nil {
t.Fatalf("sync: %s", err)
Expand Down Expand Up @@ -546,6 +554,7 @@ func TestLimits(t *testing.T) {
Threads: 50,
Update: true,
Perms: true,
MaxSize: math.MaxInt64,
}
setConfig := func(config *Config, subC subConfig) {
config.Limit = subC.limit
Expand Down Expand Up @@ -738,3 +747,31 @@ func TestParseFilterRule(t *testing.T) {
}
}
}

type mockObject struct {
size int64
mtime time.Time
}

func (o *mockObject) Key() string { return "" }
func (o *mockObject) IsDir() bool { return false }
func (o *mockObject) IsSymlink() bool { return false }
func (o *mockObject) Size() int64 { return o.size }
func (o *mockObject) Mtime() time.Time { return o.mtime }
func (o *mockObject) StorageClass() string { return "" }

func TestFilterSizeAndAge(t *testing.T) {
config := &Config{
MaxSize: 100,
MinSize: 10,
MaxAge: time.Second * 100,
MinAge: time.Second * 10,
}
now := time.Now()
if !filterKey(&mockObject{10, now.Add(-time.Second * 15)}, now, nil, config) {
t.Fatalf("filterKey failed")
}
if filterKey(&mockObject{200, now.Add(-time.Second * 200)}, now, nil, config) {
t.Fatalf("filterKey should fail")
}
}

0 comments on commit fbe92e3

Please sign in to comment.