diff --git a/cmd/sync.go b/cmd/sync.go index 977178a45699..4d0d7c38d650 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -213,13 +213,18 @@ func syncStorageFlags() []cli.Flag { func clusterFlags() []cli.Flag { return addCategories("CLUSTER", []cli.Flag{ &cli.StringFlag{ - Name: "manager", - Usage: "manager address", + Name: "manager", + Usage: "the manager address used only by the worker node", + Hidden: true, }, &cli.StringSliceFlag{ Name: "worker", Usage: "hosts (separated by comma) to launch worker", }, + &cli.StringFlag{ + Name: "manager-addr", + Usage: "the IP address to communicate with workers", + }, }) } diff --git a/pkg/sync/cluster.go b/pkg/sync/cluster.go index 63fa55092352..656e820f4d41 100644 --- a/pkg/sync/cluster.go +++ b/pkg/sync/cluster.go @@ -119,7 +119,7 @@ func sendStats(addr string) { } } -func startManager(tasks <-chan object.Object) (string, error) { +func startManager(config *Config, tasks <-chan object.Object) (string, error) { http.HandleFunc("/fetch", func(w http.ResponseWriter, req *http.Request) { var objs []object.Object obj, ok := <-tasks @@ -171,29 +171,37 @@ func startManager(tasks <-chan object.Object) (string, error) { logger.Debugf("receive stats %+v from %s", r, req.RemoteAddr) _, _ = w.Write([]byte("OK")) }) - ips, err := utils.FindLocalIPs() - if err != nil { - return "", fmt.Errorf("find local ips: %s", err) - } - var ip string - for _, i := range ips { - if i = i.To4(); i != nil { - ip = i.String() - break + var addr string + if config.ManagerAddr != "" { + addr = config.ManagerAddr + } else { + ips, err := utils.FindLocalIPs() + if err != nil { + return "", fmt.Errorf("find local ips: %s", err) + } + var ip string + for _, i := range ips { + if i = i.To4(); i != nil { + ip = i.String() + break + } + } + if ip == "" { + return "", fmt.Errorf("no local ip found") } } - if ip == "" { - return "", fmt.Errorf("no local ip found") + + if !strings.Contains(addr, ":") { + addr += ":" } - l, err := net.Listen("tcp", ip+":") + + l, err := net.Listen("tcp", addr) if err != nil { return "", fmt.Errorf("listen: %s", err) } logger.Infof("Listen at %s", l.Addr()) go func() { _ = http.Serve(l, nil) }() - ps := strings.Split(l.Addr().String(), ":") - port := ps[len(ps)-1] - return fmt.Sprintf("%s:%s", ip, port), nil + return l.Addr().String(), nil } func findSelfPath() (string, error) { diff --git a/pkg/sync/cluster_test.go b/pkg/sync/cluster_test.go index cde0fcea26a8..84e3c8834883 100644 --- a/pkg/sync/cluster_test.go +++ b/pkg/sync/cluster_test.go @@ -40,13 +40,13 @@ func (o *obj) StorageClass() string { return "" } func TestCluster(t *testing.T) { // manager todo := make(chan object.Object, 100) - addr, err := startManager(todo) + var conf Config + addr, err := startManager(&conf, todo) if err != nil { t.Fatal(err) } // sendStats(addr) // worker - var conf Config conf.Manager = addr mytodo := make(chan object.Object, 100) go fetchJobs(mytodo, &conf) diff --git a/pkg/sync/config.go b/pkg/sync/config.go index 3a9e09003205..bad0ee524a0e 100644 --- a/pkg/sync/config.go +++ b/pkg/sync/config.go @@ -44,6 +44,7 @@ type Config struct { Limit int64 Manager string Workers []string + ManagerAddr string ListThreads int ListDepth int BWLimit int @@ -149,6 +150,7 @@ func NewConfigFromCli(c *cli.Context) *Config { Links: c.Bool("links"), Limit: c.Int64("limit"), Workers: c.StringSlice("worker"), + ManagerAddr: c.String("manager-addr"), Manager: c.String("manager"), BWLimit: c.Int("bwlimit"), NoHTTPS: c.Bool("no-https"), diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index a6697eef42f5..358287814ec6 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -1034,7 +1034,7 @@ func Sync(src, dst object.ObjectStorage, config *Config) error { if config.Manager == "" { if len(config.Workers) > 0 { - addr, err := startManager(tasks) + addr, err := startManager(config, tasks) if err != nil { return err }