Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Check if the cluster satisfies the version requirements before starting #61

Merged
merged 7 commits into from
Sep 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a // indirect
github.com/codahale/hdrhistogram v0.0.0-20160425231609-f8ad88b59a58 // indirect
github.com/coreos/etcd v3.2.18+incompatible // indirect
github.com/coreos/go-semver v0.2.0
github.com/cznic/golex v0.0.0-20160422121650-da5a7153a510 // indirect
github.com/cznic/mathutil v0.0.0-20160613104831-78ad7f262603
github.com/cznic/parser v0.0.0-20160622100904-31edd927e5b1
Expand Down
33 changes: 33 additions & 0 deletions lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package common

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"strings"
"time"
Expand Down Expand Up @@ -210,3 +213,33 @@ func isRetryableError(err error) bool {
func UniqueTable(schema string, table string) string {
return fmt.Sprintf("`%s`.`%s`", schema, table)
}

// GetJSON fetches a page and parses it as JSON. The parsed result will be
// stored into the `v`. The variable `v` must be a pointer to a type that can be
// unmarshalled from JSON.
//
// Example:
//
// client := &http.Client{}
// var resp struct { IP string }
// if err := util.GetJSON(client, "http://api.ipify.org/?format=json", &resp); err != nil {
// return errors.Trace(err)
// }
// fmt.Println(resp.IP)
func GetJSON(client *http.Client, url string, v interface{}) error {
resp, err := client.Get(url)
if err != nil {
return errors.Trace(err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return errors.Trace(err)
}
return errors.Errorf("get %s http status code != 200, message %s", url, string(body))
}

return errors.Trace(json.NewDecoder(resp.Body).Decode(v))
}
8 changes: 5 additions & 3 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ func (c *Config) String() string {

type Lightning struct {
common.LogConfig
TableConcurrency int `toml:"table-concurrency" json:"table-concurrency"`
RegionConcurrency int `toml:"region-concurrency" json:"region-concurrency"`
ProfilePort int `toml:"pprof-port" json:"pprof-port"`
TableConcurrency int `toml:"table-concurrency" json:"table-concurrency"`
RegionConcurrency int `toml:"region-concurrency" json:"region-concurrency"`
ProfilePort int `toml:"pprof-port" json:"pprof-port"`
CheckRequirements bool `toml:"check-requirements" json:"check-requirements"`
}

// PostRestore has some options which will be executed after kv restored.
Expand All @@ -90,6 +91,7 @@ func NewConfig() *Config {
App: Lightning{
RegionConcurrency: runtime.NumCPU(),
TableConcurrency: 8,
CheckRequirements: true,
},
TiDB: DBStore{
SQLMode: "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION",
Expand Down
6 changes: 6 additions & 0 deletions lightning/kv/kv-deliver.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package kv

import (
"fmt"
"math"
"os"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -639,6 +642,9 @@ func (c *KVDeliverClient) Switch(mode sstpb.SwitchMode) error {
}
_, err := c.cli.SwitchMode(c.ctx, req)
if err != nil {
if strings.Contains(err.Error(), "status: Unimplemented") {
fmt.Fprintln(os.Stderr, "Error: The TiKV instance does not support mode switching. Please make sure the TiKV version is 2.0.4 or above.")
}
return errors.Trace(err)
}
common.AppLogger.Infof("switch to tikv %s mode takes %v", mode, time.Since(timer))
Expand Down
119 changes: 119 additions & 0 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
"database/sql"
"fmt"
"io"
"net/http"
"os"
"strings"
"sync"
"time"

"github.com/coreos/go-semver/semver"
"github.com/juju/errors"
sstpb "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/tidb-lightning/lightning/common"
Expand Down Expand Up @@ -40,6 +43,12 @@ const (
closeEngineMaxRetry = 5
)

var (
requiredTiDBVersion = *semver.New("2.0.4")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use pointer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holys Their Compare function doesn't use pointers :(

requiredPDVersion = *semver.New("2.0.4")
requiredTiKVVersion = *semver.New("2.0.4")
)

func init() {
cfg := tidbcfg.GetGlobalConfig()
cfg.Log.SlowThreshold = 3000
Expand Down Expand Up @@ -78,6 +87,7 @@ func (rc *RestoreController) Close() {
func (rc *RestoreController) Run(ctx context.Context) {
timer := time.Now()
opts := []func(context.Context) error{
rc.checkRequirements,
rc.switchToImportMode,
rc.restoreSchema,
rc.restoreTables,
Expand All @@ -93,6 +103,7 @@ func (rc *RestoreController) Run(ctx context.Context) {
}
if err != nil {
common.AppLogger.Errorf("run cause error : %s", errors.ErrorStack(err))
fmt.Fprintf(os.Stderr, "Error: %s\n", err)
break // ps : not continue
}
}
Expand Down Expand Up @@ -378,6 +389,114 @@ func (rc *RestoreController) switchTiKVMode(ctx context.Context, mode sstpb.Swit
return errors.Trace(cli.Switch(mode))
}

func (rc *RestoreController) checkRequirements(_ context.Context) error {
// skip requirement check if explicitly turned off
if !rc.cfg.App.CheckRequirements {
return nil
}

client := &http.Client{}
if err := rc.checkTiDBVersion(client); err != nil {
return errors.Trace(err)
}
// TODO: Reenable the PD/TiKV version check after we upgrade the dependency to 2.1.
if err := rc.checkPDVersion(client); err != nil {
// return errors.Trace(err)
common.AppLogger.Infof("PD version check failed: %v", err)
}
if err := rc.checkTiKVVersion(client); err != nil {
// return errors.Trace(err)
common.AppLogger.Infof("TiKV version check failed: %v", err)
}

return nil
}

func (rc *RestoreController) checkTiDBVersion(client *http.Client) error {
url := fmt.Sprintf("http://%s:%d/status", rc.cfg.TiDB.Host, rc.cfg.TiDB.StatusPort)
var status struct{ Version string }
err := common.GetJSON(client, url, &status)
if err != nil {
return errors.Trace(err)
}

// version format: "5.7.10-TiDB-v2.1.0-rc.1-7-g38c939f"
// ^~~~~~~~~^ we only want this part
// version format: "5.7.10-TiDB-v2.0.4-1-g06a0bf5"
// ^~~~^
versions := strings.Split(status.Version, "-")
if len(versions) < 5 {
return errors.Errorf("not a valid TiDB version: %s", status.Version)
}
rawVersion := strings.Join(versions[2:len(versions)-2], "-")
rawVersion = strings.TrimPrefix(rawVersion, "v")

version, err := semver.NewVersion(rawVersion)
if err != nil {
return errors.Trace(err)
}

return checkVersion("TiDB", requiredTiDBVersion, *version)
}

func (rc *RestoreController) checkPDVersion(client *http.Client) error {
url := fmt.Sprintf("http://%s/pd/api/v1/config/cluster-version", rc.cfg.TiDB.PdAddr)
var rawVersion string
err := common.GetJSON(client, url, &rawVersion)
if err != nil {
return errors.Trace(err)
}

version, err := semver.NewVersion(rawVersion)
if err != nil {
return errors.Trace(err)
}

return checkVersion("PD", requiredPDVersion, *version)
}

func (rc *RestoreController) checkTiKVVersion(client *http.Client) error {
url := fmt.Sprintf("http://%s/pd/api/v1/stores", rc.cfg.TiDB.PdAddr)

var stores struct {
kennytm marked this conversation as resolved.
Show resolved Hide resolved
Stores []struct {
Store struct {
Address string
Version string
}
}
}
err := common.GetJSON(client, url, &stores)
if err != nil {
return errors.Trace(err)
}

for _, store := range stores.Stores {
version, err := semver.NewVersion(store.Store.Version)
if err != nil {
return errors.Annotate(err, store.Store.Address)
}
component := fmt.Sprintf("TiKV (at %s)", store.Store.Address)
err = checkVersion(component, requiredTiKVVersion, *version)
if err != nil {
return errors.Trace(err)
}
}
return nil
}

func checkVersion(component string, expected, actual semver.Version) error {
if actual.Compare(expected) >= 0 {
return nil
}
return errors.Errorf(
"%s version too old, expected '>=%s', found '%s'",
component,
expected,
actual,
)
}

func (rc *RestoreController) getTables() []string {
var numOfTables int
for _, dbMeta := range rc.dbMetas {
Expand Down
35 changes: 29 additions & 6 deletions lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
"time"

pb "github.com/pingcap/kvproto/pkg/import_kvpb"
"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/mydump"
"github.com/pingcap/tidb-lightning/lightning/restore"
"github.com/satori/go.uuid"

. "github.com/pingcap/check"
"golang.org/x/net/context"
Expand All @@ -25,7 +27,7 @@ import (

const (
tablesCount = 35
importDelay = 150 * time.Millisecond
importDelay = 500 * time.Millisecond
)

var _ = Suite(&testRestoreSuite{})
Expand Down Expand Up @@ -69,16 +71,27 @@ type mockKVService struct {
engineOverflowErrorFunc func() error
}

func formatUuid(uuidBytes []byte) string {
uuidValue, err := uuid.FromBytes(uuidBytes)
if err != nil {
panic(err)
}
return uuidValue.String()
}

func (s *mockKVService) SwitchMode(context.Context, *pb.SwitchModeRequest) (*pb.SwitchModeResponse, error) {
return &pb.SwitchModeResponse{}, nil
}
func (s *mockKVService) OpenEngine(_ context.Context, req *pb.OpenEngineRequest) (*pb.OpenEngineResponse, error) {
s.engineLock.Lock()
defer s.engineLock.Unlock()
s.engineList[string(req.Uuid)] += 0
uuid := formatUuid(req.Uuid)
s.engineList[uuid] += 0
if len(s.engineList) > s.engineOverflowLimit {
common.AppLogger.Errorf("[mock-importer] more than %d engines open: %v", s.engineOverflowLimit, s.engineList)
return nil, s.engineOverflowErrorFunc()
}
common.AppLogger.Infof("[mock-importer] opened engine %s; %v", uuid, s.engineList)
return &pb.OpenEngineResponse{}, nil
}
func (s *mockKVService) WriteEngine(wes pb.ImportKV_WriteEngineServer) error {
Expand All @@ -88,7 +101,8 @@ func (s *mockKVService) WriteEngine(wes pb.ImportKV_WriteEngineServer) error {
switch err {
case nil:
if head := req.GetHead(); head != nil {
engine = string(head.Uuid)
engine = formatUuid(head.Uuid)
common.AppLogger.Infof("[mock-importer] start write to engine %s", engine)
s.engineLock.Lock()
s.engineList[engine] += 1
s.engineLock.Unlock()
Expand All @@ -99,6 +113,7 @@ func (s *mockKVService) WriteEngine(wes pb.ImportKV_WriteEngineServer) error {
panic("Unexpected event type?")
}
case io.EOF:
common.AppLogger.Infof("[mock-importer] end write to engine %s", engine)
s.engineLock.Lock()
s.engineList[engine] -= 1
s.engineLock.Unlock()
Expand All @@ -111,7 +126,7 @@ func (s *mockKVService) WriteEngine(wes pb.ImportKV_WriteEngineServer) error {
func (s *mockKVService) CloseEngine(_ context.Context, req *pb.CloseEngineRequest) (*pb.CloseEngineResponse, error) {
s.engineLock.Lock()
defer s.engineLock.Unlock()
uuid := string(req.Uuid)
uuid := formatUuid(req.Uuid)
writerCount, exists := s.engineList[uuid]
if !exists {
return nil, fmt.Errorf("Engine %s not found", uuid)
Expand All @@ -120,10 +135,17 @@ func (s *mockKVService) CloseEngine(_ context.Context, req *pb.CloseEngineReques
return nil, fmt.Errorf("Engine %s still in use with %d writers left (EngineInUse)", uuid, writerCount)
}
delete(s.engineList, uuid)
common.AppLogger.Infof("[mock-importer] removed engine %s; %v", uuid, s.engineList)
return &pb.CloseEngineResponse{}, nil
}
func (s *mockKVService) ImportEngine(context.Context, *pb.ImportEngineRequest) (*pb.ImportEngineResponse, error) {
time.Sleep(importDelay)
s.engineLock.Lock()
defer s.engineLock.Unlock()
if len(s.engineList) > 0 {
// only simulate the slow import when multiple engines are open.
// this should speed up the test while still catching the original problem.
time.Sleep(importDelay)
}
s.events <- importEvent
return &pb.ImportEngineResponse{}, nil
}
Expand All @@ -137,7 +159,7 @@ func (s *mockKVService) CompactCluster(context.Context, *pb.CompactClusterReques
// Runs the mock tikv-importer gRPC service. Returns the server and its listening address.
func runMockKVServer(c *C, limit int, errorFunc func() error) (*grpc.Server, string, <-chan mockKVEvent) {
server := grpc.NewServer()
events := make(chan mockKVEvent, tablesCount*3)
events := make(chan mockKVEvent, tablesCount*4)
pb.RegisterImportKVServer(server, &mockKVService{
engineList: make(map[string]int),
events: events,
Expand Down Expand Up @@ -186,6 +208,7 @@ func createAppConfig(serverAddr string, concurrency int) *config.Config {
cfg := config.NewConfig()
cfg.TikvImporter.Addr = serverAddr
cfg.App.TableConcurrency = concurrency
cfg.App.CheckRequirements = false
// TODO Get rid of the TiDB test dependency!
cfg.TiDB.Host = "127.0.0.1"
cfg.TiDB.Port = 3306
Expand Down
Loading