Skip to content

Commit

Permalink
cmd/sync: allows customization of the IP address and port that the ma…
Browse files Browse the repository at this point in the history
…nagement node listens on (#3912)

* cmd/sync: allows customization of the IP address and port that the management node listens on

Co-authored-by: Davies Liu <[email protected]>
  • Loading branch information
zhijian-pro and davies authored Jul 26, 2023
1 parent fed3bec commit 2261e0a
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 21 deletions.
9 changes: 7 additions & 2 deletions cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,18 @@ func syncStorageFlags() []cli.Flag {
func clusterFlags() []cli.Flag {
return addCategories("CLUSTER", []cli.Flag{
&cli.StringFlag{
Name: "manager",
Usage: "manager address",
Name: "manager",
Usage: "the manager address used only by the worker node",
Hidden: true,
},
&cli.StringSliceFlag{
Name: "worker",
Usage: "hosts (separated by comma) to launch worker",
},
&cli.StringFlag{
Name: "manager-addr",
Usage: "the IP address to communicate with workers",
},
})
}

Expand Down
40 changes: 24 additions & 16 deletions pkg/sync/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func sendStats(addr string) {
}
}

func startManager(tasks <-chan object.Object) (string, error) {
func startManager(config *Config, tasks <-chan object.Object) (string, error) {
http.HandleFunc("/fetch", func(w http.ResponseWriter, req *http.Request) {
var objs []object.Object
obj, ok := <-tasks
Expand Down Expand Up @@ -171,29 +171,37 @@ func startManager(tasks <-chan object.Object) (string, error) {
logger.Debugf("receive stats %+v from %s", r, req.RemoteAddr)
_, _ = w.Write([]byte("OK"))
})
ips, err := utils.FindLocalIPs()
if err != nil {
return "", fmt.Errorf("find local ips: %s", err)
}
var ip string
for _, i := range ips {
if i = i.To4(); i != nil {
ip = i.String()
break
var addr string
if config.ManagerAddr != "" {
addr = config.ManagerAddr
} else {
ips, err := utils.FindLocalIPs()
if err != nil {
return "", fmt.Errorf("find local ips: %s", err)
}
var ip string
for _, i := range ips {
if i = i.To4(); i != nil {
ip = i.String()
break
}
}
if ip == "" {
return "", fmt.Errorf("no local ip found")
}
}
if ip == "" {
return "", fmt.Errorf("no local ip found")

if !strings.Contains(addr, ":") {
addr += ":"
}
l, err := net.Listen("tcp", ip+":")

l, err := net.Listen("tcp", addr)
if err != nil {
return "", fmt.Errorf("listen: %s", err)
}
logger.Infof("Listen at %s", l.Addr())
go func() { _ = http.Serve(l, nil) }()
ps := strings.Split(l.Addr().String(), ":")
port := ps[len(ps)-1]
return fmt.Sprintf("%s:%s", ip, port), nil
return l.Addr().String(), nil
}

func findSelfPath() (string, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sync/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ func (o *obj) StorageClass() string { return "" }
func TestCluster(t *testing.T) {
// manager
todo := make(chan object.Object, 100)
addr, err := startManager(todo)
var conf Config
addr, err := startManager(&conf, todo)
if err != nil {
t.Fatal(err)
}
// sendStats(addr)
// worker
var conf Config
conf.Manager = addr
mytodo := make(chan object.Object, 100)
go fetchJobs(mytodo, &conf)
Expand Down
2 changes: 2 additions & 0 deletions pkg/sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Config struct {
Limit int64
Manager string
Workers []string
ManagerAddr string
ListThreads int
ListDepth int
BWLimit int
Expand Down Expand Up @@ -148,6 +149,7 @@ func NewConfigFromCli(c *cli.Context) *Config {
Links: c.Bool("links"),
Limit: c.Int64("limit"),
Workers: c.StringSlice("worker"),
ManagerAddr: c.String("manager-addr"),
Manager: c.String("manager"),
BWLimit: c.Int("bwlimit"),
NoHTTPS: c.Bool("no-https"),
Expand Down
2 changes: 1 addition & 1 deletion pkg/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,7 @@ func Sync(src, dst object.ObjectStorage, config *Config) error {

if config.Manager == "" {
if len(config.Workers) > 0 {
addr, err := startManager(tasks)
addr, err := startManager(config, tasks)
if err != nil {
return err
}
Expand Down

0 comments on commit 2261e0a

Please sign in to comment.