From cdb474a8540dbc35811cf8df838c7cc581b13d13 Mon Sep 17 00:00:00 2001 From: zhijian Date: Tue, 18 Jul 2023 23:04:29 +0800 Subject: [PATCH 1/5] cmd/sync: allows customization of the IP address and port that the management node listens on --- cmd/sync.go | 5 +++++ pkg/sync/cluster.go | 9 ++++++--- pkg/sync/cluster_test.go | 4 ++-- pkg/sync/config.go | 2 ++ pkg/sync/sync.go | 10 +++++----- 5 files changed, 20 insertions(+), 10 deletions(-) diff --git a/cmd/sync.go b/cmd/sync.go index 977178a45699..fdf0539b8f08 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -220,6 +220,11 @@ func clusterFlags() []cli.Flag { Name: "worker", Usage: "hosts (separated by comma) to launch worker", }, + &cli.BoolFlag{ + Name: "is-worker", + Usage: "start as a worker", + Hidden: true, + }, }) } diff --git a/pkg/sync/cluster.go b/pkg/sync/cluster.go index 63fa55092352..43e48382d22a 100644 --- a/pkg/sync/cluster.go +++ b/pkg/sync/cluster.go @@ -119,7 +119,7 @@ func sendStats(addr string) { } } -func startManager(tasks <-chan object.Object) (string, error) { +func startManager(config *Config, tasks <-chan object.Object) (string, error) { http.HandleFunc("/fetch", func(w http.ResponseWriter, req *http.Request) { var objs []object.Object obj, ok := <-tasks @@ -171,6 +171,9 @@ func startManager(tasks <-chan object.Object) (string, error) { logger.Debugf("receive stats %+v from %s", r, req.RemoteAddr) _, _ = w.Write([]byte("OK")) }) + if config.Manager != "" && !config.IsWorker { + return config.Manager, nil + } ips, err := utils.FindLocalIPs() if err != nil { return "", fmt.Errorf("find local ips: %s", err) @@ -259,9 +262,9 @@ func launchWorker(address string, config *Config, wg *sync.WaitGroup) { args = append(args, rpath) if strings.HasSuffix(path, "juicefs") { args = append(args, os.Args[1:]...) - args = append(args, "--manager", address) + args = append(args, "--is-worker", "--manager", address) } else { - args = append(args, "--manager", address) + args = append(args, "--is-worker", "--manager", address) args = append(args, os.Args[1:]...) } if !config.Verbose && !config.Quiet { diff --git a/pkg/sync/cluster_test.go b/pkg/sync/cluster_test.go index cde0fcea26a8..84e3c8834883 100644 --- a/pkg/sync/cluster_test.go +++ b/pkg/sync/cluster_test.go @@ -40,13 +40,13 @@ func (o *obj) StorageClass() string { return "" } func TestCluster(t *testing.T) { // manager todo := make(chan object.Object, 100) - addr, err := startManager(todo) + var conf Config + addr, err := startManager(&conf, todo) if err != nil { t.Fatal(err) } // sendStats(addr) // worker - var conf Config conf.Manager = addr mytodo := make(chan object.Object, 100) go fetchJobs(mytodo, &conf) diff --git a/pkg/sync/config.go b/pkg/sync/config.go index 3a9e09003205..7ab541509abf 100644 --- a/pkg/sync/config.go +++ b/pkg/sync/config.go @@ -44,6 +44,7 @@ type Config struct { Limit int64 Manager string Workers []string + IsWorker bool ListThreads int ListDepth int BWLimit int @@ -149,6 +150,7 @@ func NewConfigFromCli(c *cli.Context) *Config { Links: c.Bool("links"), Limit: c.Int64("limit"), Workers: c.StringSlice("worker"), + IsWorker: c.Bool("is-worker"), Manager: c.String("manager"), BWLimit: c.Int("bwlimit"), NoHTTPS: c.Bool("no-https"), diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index a6697eef42f5..1fcc99172540 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -980,7 +980,7 @@ func Sync(src, dst object.ObjectStorage, config *Config) error { } var bufferSize = 10240 - if config.Manager != "" { + if config.IsWorker { bufferSize = 100 } tasks := make(chan object.Object, bufferSize) @@ -991,7 +991,7 @@ func Sync(src, dst object.ObjectStorage, config *Config) error { limiter = ratelimit.NewBucketWithRate(bps, int64(bps)*3) } - progress := utils.NewProgress(config.Verbose || config.Quiet || config.Manager != "") + progress := utils.NewProgress(config.Verbose || config.Quiet || config.IsWorker) handled = progress.AddCountBar("Scanned objects", 0) skipped = progress.AddCountSpinner("Skipped objects") pending = progress.AddCountSpinner("Pending objects") @@ -1032,9 +1032,9 @@ func Sync(src, dst object.ObjectStorage, config *Config) error { config.rules = rules } - if config.Manager == "" { + if !config.IsWorker { if len(config.Workers) > 0 { - addr, err := startManager(tasks) + addr, err := startManager(config, tasks) if err != nil { return err } @@ -1067,7 +1067,7 @@ func Sync(src, dst object.ObjectStorage, config *Config) error { pending.SetCurrent(0) progress.Done() - if config.Manager == "" { + if !config.IsWorker { msg := fmt.Sprintf("Found: %d, skipped: %d, copied: %d (%s)", handled.Current(), skipped.Current(), copied.Current(), formatSize(copiedBytes.Current())) if checked != nil { From e9532fc5bc0dbc36361b582bf8bcb64489a57bbc Mon Sep 17 00:00:00 2001 From: zhijian Date: Wed, 19 Jul 2023 13:10:41 +0800 Subject: [PATCH 2/5] fix --- pkg/sync/cluster.go | 38 ++++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/pkg/sync/cluster.go b/pkg/sync/cluster.go index 43e48382d22a..7e20991dff6c 100644 --- a/pkg/sync/cluster.go +++ b/pkg/sync/cluster.go @@ -171,32 +171,34 @@ func startManager(config *Config, tasks <-chan object.Object) (string, error) { logger.Debugf("receive stats %+v from %s", r, req.RemoteAddr) _, _ = w.Write([]byte("OK")) }) + var addr string if config.Manager != "" && !config.IsWorker { - return config.Manager, nil - } - ips, err := utils.FindLocalIPs() - if err != nil { - return "", fmt.Errorf("find local ips: %s", err) - } - var ip string - for _, i := range ips { - if i = i.To4(); i != nil { - ip = i.String() - break + addr = config.Manager + } else { + ips, err := utils.FindLocalIPs() + if err != nil { + return "", fmt.Errorf("find local ips: %s", err) } + var ip string + for _, i := range ips { + if i = i.To4(); i != nil { + ip = i.String() + break + } + } + if ip == "" { + return "", fmt.Errorf("no local ip found") + } + addr = ip + ":" } - if ip == "" { - return "", fmt.Errorf("no local ip found") - } - l, err := net.Listen("tcp", ip+":") + + l, err := net.Listen("tcp", addr) if err != nil { return "", fmt.Errorf("listen: %s", err) } logger.Infof("Listen at %s", l.Addr()) go func() { _ = http.Serve(l, nil) }() - ps := strings.Split(l.Addr().String(), ":") - port := ps[len(ps)-1] - return fmt.Sprintf("%s:%s", ip, port), nil + return l.Addr().String(), nil } func findSelfPath() (string, error) { From 2719d5aacaaf7d1ea56fbc6b030549358b2147bf Mon Sep 17 00:00:00 2001 From: zhijian Date: Fri, 21 Jul 2023 15:16:37 +0800 Subject: [PATCH 3/5] adjust --- pkg/sync/cluster.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/sync/cluster.go b/pkg/sync/cluster.go index 7e20991dff6c..d431a4304767 100644 --- a/pkg/sync/cluster.go +++ b/pkg/sync/cluster.go @@ -189,7 +189,10 @@ func startManager(config *Config, tasks <-chan object.Object) (string, error) { if ip == "" { return "", fmt.Errorf("no local ip found") } - addr = ip + ":" + } + + if !strings.Contains(addr, ":") { + addr += ":" } l, err := net.Listen("tcp", addr) From 78129f2d617a8ca9b9f7d350f5d2b77a3ba20cab Mon Sep 17 00:00:00 2001 From: zhijian Date: Mon, 24 Jul 2023 11:06:30 +0800 Subject: [PATCH 4/5] fix --- cmd/sync.go | 12 ++++++------ pkg/sync/cluster.go | 8 ++++---- pkg/sync/config.go | 4 ++-- pkg/sync/sync.go | 8 ++++---- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/cmd/sync.go b/cmd/sync.go index fdf0539b8f08..2ccb2c2f7810 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -213,17 +213,17 @@ func syncStorageFlags() []cli.Flag { func clusterFlags() []cli.Flag { return addCategories("CLUSTER", []cli.Flag{ &cli.StringFlag{ - Name: "manager", - Usage: "manager address", + Name: "manager", + Usage: "the manager address used only by the worker node", + Hidden: true, }, &cli.StringSliceFlag{ Name: "worker", Usage: "hosts (separated by comma) to launch worker", }, - &cli.BoolFlag{ - Name: "is-worker", - Usage: "start as a worker", - Hidden: true, + &cli.StringFlag{ + Name: "manager-address", + Usage: "manager address", }, }) } diff --git a/pkg/sync/cluster.go b/pkg/sync/cluster.go index d431a4304767..656e820f4d41 100644 --- a/pkg/sync/cluster.go +++ b/pkg/sync/cluster.go @@ -172,8 +172,8 @@ func startManager(config *Config, tasks <-chan object.Object) (string, error) { _, _ = w.Write([]byte("OK")) }) var addr string - if config.Manager != "" && !config.IsWorker { - addr = config.Manager + if config.ManagerAddr != "" { + addr = config.ManagerAddr } else { ips, err := utils.FindLocalIPs() if err != nil { @@ -267,9 +267,9 @@ func launchWorker(address string, config *Config, wg *sync.WaitGroup) { args = append(args, rpath) if strings.HasSuffix(path, "juicefs") { args = append(args, os.Args[1:]...) - args = append(args, "--is-worker", "--manager", address) + args = append(args, "--manager", address) } else { - args = append(args, "--is-worker", "--manager", address) + args = append(args, "--manager", address) args = append(args, os.Args[1:]...) } if !config.Verbose && !config.Quiet { diff --git a/pkg/sync/config.go b/pkg/sync/config.go index 7ab541509abf..065aebcbb873 100644 --- a/pkg/sync/config.go +++ b/pkg/sync/config.go @@ -44,7 +44,7 @@ type Config struct { Limit int64 Manager string Workers []string - IsWorker bool + ManagerAddr string ListThreads int ListDepth int BWLimit int @@ -150,7 +150,7 @@ func NewConfigFromCli(c *cli.Context) *Config { Links: c.Bool("links"), Limit: c.Int64("limit"), Workers: c.StringSlice("worker"), - IsWorker: c.Bool("is-worker"), + ManagerAddr: c.String("manager-address"), Manager: c.String("manager"), BWLimit: c.Int("bwlimit"), NoHTTPS: c.Bool("no-https"), diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index 1fcc99172540..358287814ec6 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -980,7 +980,7 @@ func Sync(src, dst object.ObjectStorage, config *Config) error { } var bufferSize = 10240 - if config.IsWorker { + if config.Manager != "" { bufferSize = 100 } tasks := make(chan object.Object, bufferSize) @@ -991,7 +991,7 @@ func Sync(src, dst object.ObjectStorage, config *Config) error { limiter = ratelimit.NewBucketWithRate(bps, int64(bps)*3) } - progress := utils.NewProgress(config.Verbose || config.Quiet || config.IsWorker) + progress := utils.NewProgress(config.Verbose || config.Quiet || config.Manager != "") handled = progress.AddCountBar("Scanned objects", 0) skipped = progress.AddCountSpinner("Skipped objects") pending = progress.AddCountSpinner("Pending objects") @@ -1032,7 +1032,7 @@ func Sync(src, dst object.ObjectStorage, config *Config) error { config.rules = rules } - if !config.IsWorker { + if config.Manager == "" { if len(config.Workers) > 0 { addr, err := startManager(config, tasks) if err != nil { @@ -1067,7 +1067,7 @@ func Sync(src, dst object.ObjectStorage, config *Config) error { pending.SetCurrent(0) progress.Done() - if !config.IsWorker { + if config.Manager == "" { msg := fmt.Sprintf("Found: %d, skipped: %d, copied: %d (%s)", handled.Current(), skipped.Current(), copied.Current(), formatSize(copiedBytes.Current())) if checked != nil { From 03992680997af1cf5032a21b30f9d5fffa1239f0 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 26 Jul 2023 10:56:32 +0800 Subject: [PATCH 5/5] update usage --- cmd/sync.go | 4 ++-- pkg/sync/config.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/sync.go b/cmd/sync.go index 2ccb2c2f7810..4d0d7c38d650 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -222,8 +222,8 @@ func clusterFlags() []cli.Flag { Usage: "hosts (separated by comma) to launch worker", }, &cli.StringFlag{ - Name: "manager-address", - Usage: "manager address", + Name: "manager-addr", + Usage: "the IP address to communicate with workers", }, }) } diff --git a/pkg/sync/config.go b/pkg/sync/config.go index 065aebcbb873..bad0ee524a0e 100644 --- a/pkg/sync/config.go +++ b/pkg/sync/config.go @@ -150,7 +150,7 @@ func NewConfigFromCli(c *cli.Context) *Config { Links: c.Bool("links"), Limit: c.Int64("limit"), Workers: c.StringSlice("worker"), - ManagerAddr: c.String("manager-address"), + ManagerAddr: c.String("manager-addr"), Manager: c.String("manager"), BWLimit: c.Int("bwlimit"), NoHTTPS: c.Bool("no-https"),