diff --git a/.github/workflows/cluster_endtoend.yml b/.github/workflows/cluster_endtoend.yml index 93fb133d8bd..a9c03dc806a 100644 --- a/.github/workflows/cluster_endtoend.yml +++ b/.github/workflows/cluster_endtoend.yml @@ -6,7 +6,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - name: [11, 12, 13, 14, 15, 16, 17] + name: [11, 12, 13, 14, 15, 16, 17, 18] steps: - name: Set up Go diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index b73a37a18c3..5d0650ba5b8 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -37,7 +37,8 @@ const ( ) var ( - keepData = flag.Bool("keep-data", false, "don't delete the per-test VTDATAROOT subfolders") + keepData = flag.Bool("keep-data", false, "don't delete the per-test VTDATAROOT subfolders") + topoFlavor = flag.String("topo-flavor", "etcd2", "choose a topo server from etcd2, zk2 or consul") ) // LocalProcessCluster Testcases need to use this to iniate a cluster @@ -60,7 +61,7 @@ type LocalProcessCluster struct { VtctlProcess VtctlProcess // background executable processes - TopoProcess EtcdProcess + TopoProcess TopoProcess VtctldProcess VtctldProcess VtgateProcess VtgateProcess VtworkerProcess VtworkerProcess @@ -140,22 +141,25 @@ func (cluster *LocalProcessCluster) StartTopo() (err error) { } cluster.TopoPort = cluster.GetAndReservePort() cluster.TmpDirectory = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/tmp_%d", cluster.GetAndReservePort())) - cluster.TopoProcess = *EtcdProcessInstance(cluster.TopoPort, cluster.GetAndReservePort(), cluster.Hostname, "global") - log.Info(fmt.Sprintf("Starting etcd server on port : %d", cluster.TopoPort)) - if err = cluster.TopoProcess.Setup(); err != nil { - log.Error(err.Error()) - return - } + cluster.TopoProcess = *TopoProcessInstance(cluster.TopoPort, cluster.GetAndReservePort(), cluster.Hostname, *topoFlavor, "global") - log.Info("Creating topo dirs") - if err = cluster.TopoProcess.ManageTopoDir("mkdir", "/vitess/global"); err != nil { + log.Info(fmt.Sprintf("Starting topo server %v on port : %d", topoFlavor, cluster.TopoPort)) + if err = cluster.TopoProcess.Setup(*topoFlavor, cluster); err != nil { log.Error(err.Error()) return } - if err = cluster.TopoProcess.ManageTopoDir("mkdir", "/vitess/"+cluster.Cell); err != nil { - log.Error(err.Error()) - return + if *topoFlavor == "etcd2" { + log.Info("Creating topo dirs") + if err = cluster.TopoProcess.ManageTopoDir("mkdir", "/vitess/global"); err != nil { + log.Error(err.Error()) + return + } + + if err = cluster.TopoProcess.ManageTopoDir("mkdir", "/vitess/"+cluster.Cell); err != nil { + log.Error(err.Error()) + return + } } log.Info("Adding cell info") @@ -165,7 +169,8 @@ func (cluster *LocalProcessCluster) StartTopo() (err error) { return } - cluster.VtctldProcess = *VtctldProcessInstance(cluster.GetAndReservePort(), cluster.GetAndReservePort(), cluster.TopoProcess.Port, cluster.Hostname, cluster.TmpDirectory) + cluster.VtctldProcess = *VtctldProcessInstance(cluster.GetAndReservePort(), cluster.GetAndReservePort(), + cluster.TopoProcess.Port, cluster.Hostname, cluster.TmpDirectory) log.Info(fmt.Sprintf("Starting vtctld server on port : %d", cluster.VtctldProcess.Port)) cluster.VtctldHTTPPort = cluster.VtctldProcess.Port if err = cluster.VtctldProcess.Setup(cluster.Cell, cluster.VtctldExtraArgs...); err != nil { @@ -479,8 +484,8 @@ func (cluster *LocalProcessCluster) Teardown() { log.Errorf("Error in vtctld teardown - %s", err.Error()) } - if err := cluster.TopoProcess.TearDown(cluster.Cell, cluster.OriginalVTDATAROOT, cluster.CurrentVTDATAROOT, *keepData); err != nil { - log.Errorf("Error in etcd teardown - %s", err.Error()) + if err := cluster.TopoProcess.TearDown(cluster.Cell, cluster.OriginalVTDATAROOT, cluster.CurrentVTDATAROOT, *keepData, *topoFlavor); err != nil { + log.Errorf("Error in topo server teardown - %s", err.Error()) } } diff --git a/go/test/endtoend/cluster/etcd_process.go b/go/test/endtoend/cluster/etcd_process.go deleted file mode 100644 index 98d274105d6..00000000000 --- a/go/test/endtoend/cluster/etcd_process.go +++ /dev/null @@ -1,179 +0,0 @@ -/* -Copyright 2019 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cluster - -import ( - "fmt" - "net/http" - "os" - "os/exec" - "path" - "strings" - "syscall" - "time" - - "vitess.io/vitess/go/vt/log" -) - -// EtcdProcess is a generic handle for a running Etcd . -// It can be spawned manually -type EtcdProcess struct { - Name string - Binary string - DataDirectory string - ListenClientURL string - AdvertiseClientURL string - Port int - PeerPort int - Host string - VerifyURL string - PeerURL string - - proc *exec.Cmd - exit chan error -} - -// Setup spawns a new etcd service and initializes it with the defaults. -// The service is kept running in the background until TearDown() is called. -func (etcd *EtcdProcess) Setup() (err error) { - etcd.proc = exec.Command( - etcd.Binary, - "--name", etcd.Name, - "--data-dir", etcd.DataDirectory, - "--listen-client-urls", etcd.ListenClientURL, - "--advertise-client-urls", etcd.AdvertiseClientURL, - "--initial-advertise-peer-urls", etcd.PeerURL, - "--listen-peer-urls", etcd.PeerURL, - "--initial-cluster", fmt.Sprintf("%s=%s", etcd.Name, etcd.PeerURL), - ) - - errFile, _ := os.Create(path.Join(etcd.DataDirectory, "etcd-stderr.txt")) - etcd.proc.Stderr = errFile - - etcd.proc.Env = append(etcd.proc.Env, os.Environ()...) - - log.Infof("%v %v", strings.Join(etcd.proc.Args, " ")) - println("Starting etcd with args " + strings.Join(etcd.proc.Args, " ")) - err = etcd.proc.Start() - if err != nil { - return - } - - etcd.exit = make(chan error) - go func() { - etcd.exit <- etcd.proc.Wait() - }() - - timeout := time.Now().Add(60 * time.Second) - for time.Now().Before(timeout) { - if etcd.IsHealthy() { - return - } - select { - case err := <-etcd.exit: - return fmt.Errorf("process '%s' exited prematurely (err: %s)", etcd.Binary, err) - default: - time.Sleep(300 * time.Millisecond) - } - } - - return fmt.Errorf("process '%s' timed out after 60s (err: %s)", etcd.Binary, <-etcd.exit) -} - -// TearDown shutdowns the running mysqld service -func (etcd *EtcdProcess) TearDown(Cell string, originalVtRoot string, currentRoot string, keepdata bool) error { - if etcd.proc == nil || etcd.exit == nil { - return nil - } - - etcd.removeTopoDirectories(Cell) - - // Attempt graceful shutdown with SIGTERM first - _ = etcd.proc.Process.Signal(syscall.SIGTERM) - if !*keepData { - _ = os.RemoveAll(etcd.DataDirectory) - _ = os.RemoveAll(currentRoot) - } - _ = os.Setenv("VTDATAROOT", originalVtRoot) - - select { - case <-etcd.exit: - etcd.proc = nil - return nil - - case <-time.After(10 * time.Second): - etcd.proc.Process.Kill() - etcd.proc = nil - return <-etcd.exit - } - -} - -// IsHealthy function checks if etcd server is up and running -func (etcd *EtcdProcess) IsHealthy() bool { - resp, err := http.Get(etcd.VerifyURL) - if err != nil { - return false - } - if resp.StatusCode == 200 { - return true - } - return false -} - -func (etcd *EtcdProcess) removeTopoDirectories(Cell string) { - _ = etcd.ManageTopoDir("rmdir", "/vitess/global") - _ = etcd.ManageTopoDir("rmdir", "/vitess/"+Cell) -} - -// ManageTopoDir creates global and zone in etcd2 -func (etcd *EtcdProcess) ManageTopoDir(command string, directory string) (err error) { - url := etcd.VerifyURL + directory - payload := strings.NewReader(`{"dir":"true"}`) - if command == "mkdir" { - req, _ := http.NewRequest("PUT", url, payload) - req.Header.Add("content-type", "application/json") - _, err = http.DefaultClient.Do(req) - return err - } else if command == "rmdir" { - req, _ := http.NewRequest("DELETE", url+"?dir=true", payload) - _, err = http.DefaultClient.Do(req) - return err - } else { - return nil - } -} - -// EtcdProcessInstance returns a EtcdProcess handle for a etcd sevice, -// configured with the given Config. -// The process must be manually started by calling setup() -func EtcdProcessInstance(port int, peerPort int, hostname string, name string) *EtcdProcess { - etcd := &EtcdProcess{ - Name: name, - Binary: "etcd", - Port: port, - Host: hostname, - PeerPort: peerPort, - } - - etcd.AdvertiseClientURL = fmt.Sprintf("http://%s:%d", etcd.Host, etcd.Port) - etcd.ListenClientURL = fmt.Sprintf("http://%s:%d", etcd.Host, etcd.Port) - etcd.DataDirectory = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("%s_%d", "etcd", port)) - etcd.VerifyURL = fmt.Sprintf("http://%s:%d/v2/keys", etcd.Host, etcd.Port) - etcd.PeerURL = fmt.Sprintf("http://%s:%d", hostname, peerPort) - return etcd -} diff --git a/go/test/endtoend/cluster/topo_process.go b/go/test/endtoend/cluster/topo_process.go new file mode 100644 index 00000000000..21e62857917 --- /dev/null +++ b/go/test/endtoend/cluster/topo_process.go @@ -0,0 +1,302 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "fmt" + "io/ioutil" + "net/http" + "os" + "os/exec" + "path" + "strings" + "syscall" + "time" + + "vitess.io/vitess/go/vt/log" +) + +// TopoProcess is a generic handle for a running Topo service . +// It can be spawned manually +type TopoProcess struct { + Name string + Binary string + DataDirectory string + LogDirectory string + ListenClientURL string + AdvertiseClientURL string + Port int + Host string + VerifyURL string + PeerURL string + ZKPorts string + + proc *exec.Cmd + exit chan error +} + +// Setup starts a new topo service +func (topo *TopoProcess) Setup(topoFlavor string, cluster *LocalProcessCluster) (err error) { + switch topoFlavor { + case "zk2": + return topo.SetupZookeeper(cluster) + case "consul": + return topo.SetupConsul(cluster) + default: + return topo.SetupEtcd() + } +} + +// SetupEtcd spawns a new etcd service and initializes it with the defaults. +// The service is kept running in the background until TearDown() is called. +func (topo *TopoProcess) SetupEtcd() (err error) { + topo.proc = exec.Command( + topo.Binary, + "--name", topo.Name, + "--data-dir", topo.DataDirectory, + "--listen-client-urls", topo.ListenClientURL, + "--advertise-client-urls", topo.AdvertiseClientURL, + "--initial-advertise-peer-urls", topo.PeerURL, + "--listen-peer-urls", topo.PeerURL, + "--initial-cluster", fmt.Sprintf("%s=%s", topo.Name, topo.PeerURL), + ) + + errFile, _ := os.Create(path.Join(topo.DataDirectory, "topo-stderr.txt")) + topo.proc.Stderr = errFile + + topo.proc.Env = append(topo.proc.Env, os.Environ()...) + + log.Infof("%v %v", strings.Join(topo.proc.Args, " ")) + println("Starting topo with args " + strings.Join(topo.proc.Args, " ")) + err = topo.proc.Start() + if err != nil { + return + } + + topo.exit = make(chan error) + go func() { + topo.exit <- topo.proc.Wait() + }() + + timeout := time.Now().Add(60 * time.Second) + for time.Now().Before(timeout) { + if topo.IsHealthy() { + return + } + select { + case err := <-topo.exit: + return fmt.Errorf("process '%s' exited prematurely (err: %s)", topo.Binary, err) + default: + time.Sleep(300 * time.Millisecond) + } + } + + return fmt.Errorf("process '%s' timed out after 60s (err: %s)", topo.Binary, <-topo.exit) +} + +// SetupZookeeper spawns a new zookeeper topo service and initializes it with the defaults. +// The service is kept running in the background until TearDown() is called. +func (topo *TopoProcess) SetupZookeeper(cluster *LocalProcessCluster) (err error) { + + topo.ZKPorts = fmt.Sprintf("%d:%d:%d", cluster.GetAndReservePort(), cluster.GetAndReservePort(), topo.Port) + + topo.proc = exec.Command( + topo.Binary, + "-log_dir", topo.LogDirectory, + "-zk.cfg", fmt.Sprintf("1@%v:%s", topo.Host, topo.ZKPorts), + "init", + ) + + errFile, _ := os.Create(path.Join(topo.DataDirectory, "topo-stderr.txt")) + topo.proc.Stderr = errFile + topo.proc.Env = append(topo.proc.Env, os.Environ()...) + + log.Infof("%v %v", strings.Join(topo.proc.Args, " ")) + fmt.Println(strings.Join(topo.proc.Args, " ")) + err = topo.proc.Run() + if err != nil { + return + } + return +} + +// SetupConsul spawns a new consul service and initializes it with the defaults. +// The service is kept running in the background until TearDown() is called. +func (topo *TopoProcess) SetupConsul(cluster *LocalProcessCluster) (err error) { + + topo.VerifyURL = fmt.Sprintf("http://%s:%d/v1/kv/?keys", topo.Host, topo.Port) + + configFile := path.Join(os.Getenv("VTDATAROOT"), "consul.json") + + config := fmt.Sprintf(`{"ports":{"dns":%d,"http":%d,"serf_lan":%d,"serf_wan":%d}}`, + cluster.GetAndReservePort(), topo.Port, cluster.GetAndReservePort(), cluster.GetAndReservePort()) + + err = ioutil.WriteFile(configFile, []byte(config), 0666) + if err != nil { + return + } + + topo.proc = exec.Command( + topo.Binary, "agent", + "-dev", + "-config-file", configFile, + ) + + errFile, _ := os.Create(path.Join(topo.DataDirectory, "topo-stderr.txt")) + topo.proc.Stderr = errFile + + topo.proc.Env = append(topo.proc.Env, os.Environ()...) + + log.Infof("%v %v", strings.Join(topo.proc.Args, " ")) + println("Starting consul with args " + strings.Join(topo.proc.Args, " ")) + err = topo.proc.Start() + if err != nil { + return + } + + topo.exit = make(chan error) + go func() { + topo.exit <- topo.proc.Wait() + }() + + timeout := time.Now().Add(60 * time.Second) + for time.Now().Before(timeout) { + if topo.IsHealthy() { + return + } + select { + case err := <-topo.exit: + return fmt.Errorf("process '%s' exited prematurely (err: %s)", topo.Binary, err) + default: + time.Sleep(300 * time.Millisecond) + } + } + + return fmt.Errorf("process '%s' timed out after 60s (err: %s)", topo.Binary, <-topo.exit) +} + +// TearDown shutdowns the running topo service +func (topo *TopoProcess) TearDown(Cell string, originalVtRoot string, currentRoot string, keepdata bool, topoFlavor string) error { + + if topoFlavor == "zk2" { + cmd := "shutdown" + if keepdata { + cmd = "teardown" + } + topo.proc = exec.Command( + topo.Binary, + "-log_dir", topo.LogDirectory, + "-zk.cfg", fmt.Sprintf("1@%v:%s", topo.Host, topo.ZKPorts), + cmd, + ) + + err := topo.proc.Run() + if err != nil { + return err + } + } else { + if topo.proc == nil || topo.exit == nil { + return nil + } + + topo.removeTopoDirectories(Cell) + + // Attempt graceful shutdown with SIGTERM first + _ = topo.proc.Process.Signal(syscall.SIGTERM) + + select { + case <-topo.exit: + topo.proc = nil + return nil + + case <-time.After(10 * time.Second): + topo.proc.Process.Kill() + topo.proc = nil + return <-topo.exit + } + } + + if !*keepData { + _ = os.RemoveAll(topo.DataDirectory) + _ = os.RemoveAll(currentRoot) + } + _ = os.Setenv("VTDATAROOT", originalVtRoot) + return nil +} + +// IsHealthy function checks if topo server is up and running +func (topo *TopoProcess) IsHealthy() bool { + resp, err := http.Get(topo.VerifyURL) + if err != nil { + return false + } + if resp.StatusCode == 200 { + return true + } + return false +} + +func (topo *TopoProcess) removeTopoDirectories(Cell string) { + _ = topo.ManageTopoDir("rmdir", "/vitess/global") + _ = topo.ManageTopoDir("rmdir", "/vitess/"+Cell) +} + +// ManageTopoDir creates global and zone in etcd2 +func (topo *TopoProcess) ManageTopoDir(command string, directory string) (err error) { + url := topo.VerifyURL + directory + payload := strings.NewReader(`{"dir":"true"}`) + if command == "mkdir" { + req, _ := http.NewRequest("PUT", url, payload) + req.Header.Add("content-type", "application/json") + _, err = http.DefaultClient.Do(req) + return err + } else if command == "rmdir" { + req, _ := http.NewRequest("DELETE", url+"?dir=true", payload) + _, err = http.DefaultClient.Do(req) + return err + } else { + return nil + } +} + +// TopoProcessInstance returns a TopoProcess handle for a etcd sevice, +// configured with the given Config. +// The process must be manually started by calling setup() +func TopoProcessInstance(port int, peerPort int, hostname string, flavor string, name string) *TopoProcess { + binary := "etcd" + if flavor == "zk2" { + binary = "zkctl" + } + if flavor == "consul" { + binary = "consul" + } + + topo := &TopoProcess{ + Name: name, + Binary: binary, + Port: port, + Host: hostname, + } + + topo.AdvertiseClientURL = fmt.Sprintf("http://%s:%d", topo.Host, topo.Port) + topo.ListenClientURL = fmt.Sprintf("http://%s:%d", topo.Host, topo.Port) + topo.DataDirectory = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("%s_%d", "topo", port)) + topo.LogDirectory = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("%s_%d", "topo", port), "logs") + topo.VerifyURL = fmt.Sprintf("http://%s:%d/v2/keys", topo.Host, topo.Port) + topo.PeerURL = fmt.Sprintf("http://%s:%d", hostname, peerPort) + return topo +} diff --git a/go/test/endtoend/cluster/vtctl_process.go b/go/test/endtoend/cluster/vtctl_process.go index 0174e222252..a8005e91200 100644 --- a/go/test/endtoend/cluster/vtctl_process.go +++ b/go/test/endtoend/cluster/vtctl_process.go @@ -33,6 +33,7 @@ type VtctlProcess struct { TopoGlobalAddress string TopoGlobalRoot string TopoServerAddress string + TopoRootPath string } // AddCellInfo executes vtctl command to add cell info @@ -43,11 +44,12 @@ func (vtctl *VtctlProcess) AddCellInfo(Cell string) (err error) { "-topo_global_server_address", vtctl.TopoGlobalAddress, "-topo_global_root", vtctl.TopoGlobalRoot, "AddCellInfo", - "-root", "/vitess/"+Cell, + "-root", vtctl.TopoRootPath+Cell, "-server_address", vtctl.TopoServerAddress, Cell, ) log.Info(fmt.Sprintf("Adding Cell into Keyspace with arguments %v", strings.Join(tmpProcess.Args, " "))) + fmt.Println(fmt.Sprintf("Adding Cell into Keyspace with arguments %v", strings.Join(tmpProcess.Args, " "))) return tmpProcess.Run() } @@ -99,13 +101,31 @@ func (vtctl *VtctlProcess) ExecuteCommand(args ...string) (err error) { // configured with the given Config. // The process must be manually started by calling setup() func VtctlProcessInstance(topoPort int, hostname string) *VtctlProcess { + + // Default values for etcd2 topo server. + topoImplementation := "etcd2" + topoGlobalRoot := "/vitess/global" + topoRootPath := "/" + + // Checking and resetting the parameters for required topo server. + switch *topoFlavor { + case "zk2": + topoImplementation = "zk2" + case "consul": + topoImplementation = "consul" + topoGlobalRoot = "global" + // For consul we do not need "/" in the path + topoRootPath = "" + } + vtctl := &VtctlProcess{ Name: "vtctl", Binary: "vtctl", - TopoImplementation: "etcd2", + TopoImplementation: topoImplementation, TopoGlobalAddress: fmt.Sprintf("%s:%d", hostname, topoPort), - TopoGlobalRoot: "/vitess/global", + TopoGlobalRoot: topoGlobalRoot, TopoServerAddress: fmt.Sprintf("%s:%d", hostname, topoPort), + TopoRootPath: topoRootPath, } return vtctl } diff --git a/test/config.json b/test/config.json index e6904a4004e..889d2f2970f 100644 --- a/test/config.json +++ b/test/config.json @@ -132,32 +132,6 @@ "RetryMax": 0, "Tags": [] }, - "tabletmanager_consul": { - "File": "tabletmanager.py", - "Args": [ - "--topo-server-flavor=consul" - ], - "Command": [], - "Manual": false, - "Shard": 3, - "RetryMax": 0, - "Tags": [ - "site_test" - ] - }, - "tabletmanager_zk2": { - "File": "tabletmanager.py", - "Args": [ - "--topo-server-flavor=zk2" - ], - "Command": [], - "Manual": false, - "Shard": 2, - "RetryMax": 0, - "Tags": [ - "site_test" - ] - }, "update_stream": { "File": "update_stream.py", "Args": [], @@ -418,7 +392,33 @@ "Args": ["vitess.io/vitess/go/test/endtoend/tabletmanager"], "Command": [], "Manual": false, - "Shard": 16, + "Shard": 18, + "RetryMax": 0, + "Tags": [ + "site_test" + ] + }, + "tabletmanager_consul": { + "File": "tabletmanager.go", + "Args": [ + "vitess.io/vitess/go/test/endtoend/tabletmanager","--topo-server-flavor=consul" + ], + "Command": [], + "Manual": false, + "Shard": 18, + "RetryMax": 0, + "Tags": [ + "site_test" + ] + }, + "tabletmanager_zk2": { + "File": "tabletmanager.go", + "Args": [ + "vitess.io/vitess/go/test/endtoend/tabletmanager","--topo-server-flavor=zk2" + ], + "Command": [], + "Manual": false, + "Shard": 18, "RetryMax": 0, "Tags": [ "site_test"