diff --git a/go.mod b/go.mod index 47f25d9b6fa..1ff58dc17cc 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/Azure/go-autorest/autorest v0.10.0 // indirect github.com/GeertJohan/go.rice v1.0.0 github.com/PuerkitoBio/goquery v1.5.1 + github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296 github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect github.com/aws/aws-sdk-go v1.28.8 github.com/buger/jsonparser v0.0.0-20200322175846-f7e751efca13 @@ -46,12 +47,12 @@ require ( github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/krishicks/yaml-patch v0.0.10 github.com/magiconair/properties v1.8.1 - github.com/mattn/go-runewidth v0.0.3 // indirect + github.com/manifoldco/promptui v0.7.0 github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1 github.com/mitchellh/go-ps v1.0.0 // indirect github.com/mitchellh/go-testing-interface v1.14.0 // indirect github.com/mitchellh/mapstructure v1.2.3 // indirect - github.com/olekukonko/tablewriter v0.0.0-20180130162743-b8a9be070da4 + github.com/olekukonko/tablewriter v0.0.5-0.20200416053754-163badb3bac6 github.com/onsi/ginkgo v1.10.3 // indirect github.com/onsi/gomega v1.7.1 // indirect github.com/opentracing-contrib/go-grpc v0.0.0-20180928155321-4b5a12d3ff02 @@ -64,6 +65,7 @@ require ( github.com/prometheus/common v0.9.1 github.com/satori/go.uuid v1.2.0 // indirect github.com/smartystreets/goconvey v1.6.4 // indirect + github.com/spf13/cobra v0.0.5 github.com/stretchr/testify v1.4.0 github.com/tchap/go-patricia v0.0.0-20160729071656-dd168db6051b github.com/tebeka/selenium v0.9.9 diff --git a/go.sum b/go.sum index c41fd8aff02..d017a2f9d3b 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,8 @@ github.com/PuerkitoBio/purell v1.1.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= +github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296 h1:JYWTroLXcNzSCgu66NMgdjwoMHQRbv2SoOVNFb4kRkE= +github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296/go.mod h1:VSw57q4QFiWDbRnjdX8Cb3Ow0SFncRw+bA/ofY6Q83w= github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM= github.com/akavel/rsrc v0.8.0 h1:zjWn7ukO9Kc5Q62DOJCcxGpXC18RawVtYAGdz2aLlfw= github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= @@ -95,6 +97,10 @@ github.com/buger/jsonparser v0.0.0-20200322175846-f7e751efca13 h1:+qUNY4VRkEH46b github.com/buger/jsonparser v0.0.0-20200322175846-f7e751efca13/go.mod h1:tgcrVJ81GPSF0mz+0nu1Xaz0fazGPrmmJfJtxjbHhUQ= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e h1:fY5BOSpyZCqRo5OhCuC+XN+r/bBCmeuuJtjz+bCNIf8= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible h1:C29Ae4G5GtYyYMm1aztcyj/J5ckgJm2zwdDajFbx1NY= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonusllhist v0.1.3 h1:TJH+oke8D16535+jHExHj4nQvzlZrj7ug5D7I/orNUA= @@ -354,6 +360,8 @@ github.com/jtolds/gls v4.2.1+incompatible h1:fSuqC+Gmlu6l/ZYAoZzx2pyucC8Xza35fpR github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/juju/ansiterm v0.0.0-20180109212912-720a0952cc2a h1:FaWFmfWdAUKbSCtOU2QjDaorUexogfaMgbipgYATUMU= +github.com/juju/ansiterm v0.0.0-20180109212912-720a0952cc2a/go.mod h1:UJSiEoRfvx3hP73CvoARgeLjaIOjybY9vj8PUPPFGeU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= @@ -380,6 +388,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/krishicks/yaml-patch v0.0.10 h1:H4FcHpnNwVmw8u0MjPRjWyIXtco6zM2F78t+57oNM3E= github.com/krishicks/yaml-patch v0.0.10/go.mod h1:Sm5TchwZS6sm7RJoyg87tzxm2ZcKzdRE4Q7TjNhPrME= +github.com/lunixbochs/vtclean v0.0.0-20180621232353-2d01aacdc34a h1:weJVJJRzAJBFRlAiJQROKQs8oC9vOxvm4rZmBBk0ONw= +github.com/lunixbochs/vtclean v0.0.0-20180621232353-2d01aacdc34a/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= @@ -390,6 +400,9 @@ github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs= +github.com/manifoldco/promptui v0.7.0 h1:3l11YT8tm9MnwGFQ4kETwkzpAwY2Jt9lCrumCUW4+z4= +github.com/manifoldco/promptui v0.7.0/go.mod h1:n4zTdgP0vr0S3w7/O/g98U+e0gwLScEXGwov2nIKuGQ= +github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.6 h1:6Su7aK7lXmJ/U79bYtBjLNaha4Fs1Rg9plHpcH+vvnE= @@ -397,6 +410,7 @@ github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope github.com/mattn/go-ieproxy v0.0.0-20190610004146-91bb50d98149 h1:HfxbT6/JcvIljmERptWhwa8XzP7H3T+Z2N26gTsaDaA= github.com/mattn/go-ieproxy v0.0.0-20190610004146-91bb50d98149/go.mod h1:31jz6HNzdxOmlERGGEc4v/dMssOfmp2p5bT/okiKFFc= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= +github.com/mattn/go-isatty v0.0.4 h1:bnP0vzxcAdeI1zdubAl5PjU6zsERjGZb7raWodagDYs= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= @@ -405,8 +419,9 @@ github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHX github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-runewidth v0.0.2 h1:UnlwIPBGaTZfPQ6T1IGzPI0EkYAQmT9fAEJ/poFC63o= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= -github.com/mattn/go-runewidth v0.0.3 h1:a+kO+98RDGEfo6asOGMmpodZq4FNtnGP54yps8BzLR4= github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mattn/go-runewidth v0.0.7 h1:Ei8KR0497xHyKJPAv59M1dkC+rOZCMBJ+t3fZ+twI54= +github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.0.14 h1:9jZdLNd/P4+SfEJ0TNyxYpsK8N4GtfylBLqtbYN1sbA= @@ -447,8 +462,10 @@ github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229 h1:E2B8qYyeSgv github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5 h1:58+kh9C6jJVXYjt8IE48G2eWl6BjwU5Gj0gqY84fy78= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= -github.com/olekukonko/tablewriter v0.0.0-20180130162743-b8a9be070da4 h1:Mm4XQCBICntJzH8fKglsRuEiFUJYnTnM4BBFvpP5BWs= -github.com/olekukonko/tablewriter v0.0.0-20180130162743-b8a9be070da4/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= +github.com/olekukonko/tablewriter v0.0.4 h1:vHD/YYe1Wolo78koG299f7V/VAS08c6IpCLn+Ejf/w8= +github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA= +github.com/olekukonko/tablewriter v0.0.5-0.20200416053754-163badb3bac6 h1:F721VBMijn0OBFZ5wUSuMVVLQj2IJiiupn6UNd7UbBE= +github.com/olekukonko/tablewriter v0.0.5-0.20200416053754-163badb3bac6/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA= github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -688,6 +705,7 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190124100055-b90733256f2e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190209173611-3b5209105503/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/go/test/endtoend/vreplication/config.go b/go/test/endtoend/vreplication/config.go index 2e831d2c43e..c22a486e39c 100644 --- a/go/test/endtoend/vreplication/config.go +++ b/go/test/endtoend/vreplication/config.go @@ -146,6 +146,9 @@ create table order_seq(id int, next_id bigint, cache bigint, primary key(id)) co }, "cproduct": { "type": "reference" + }, + "vproduct": { + "type": "reference" } } } diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 7d657f6d9f6..79163546ccc 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -67,6 +67,7 @@ func TestBasicVreplicationWorkflow(t *testing.T) { shardMerchant(t) materializeProduct(t) + materializeMerchantOrders(t) materializeSales(t) materializeMerchantSales(t) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index eb103f4db9d..284fd58af32 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -94,6 +94,8 @@ import ( "sync" "time" + querypb "vitess.io/vitess/go/vt/proto/query" + "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" "golang.org/x/net/context" @@ -454,6 +456,22 @@ var commands = []commandGroup{ "Outputs a JSON structure that contains information about the ShardReplication."}, }, }, + { + "Workflow", []command{ + {"VExec", commandVExec, + " --dry-run", + "Runs query on all tablets in workflow. Example: VExec merchant.morders \"update _vt.vreplication set Status='Running'\"", + }, + }, + }, + { + "Workflow", []command{ + {"Workflow", commandWorkflow, + " --dry-run", + "Start/Stop/Delete Workflow on all target tablets in workflow. Example: Workflow merchant.morders Start", + }, + }, + }, } func init() { @@ -2805,6 +2823,135 @@ func commandHelp(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Flag return nil } +func commandVExec(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { + json := subFlags.Bool("json", false, "Output JSON instead of human-readable table") + dryRun := subFlags.Bool("dry_run", false, "Does a dry run of VExec and only reports the final query and list of masters on which it will be applied") + if err := subFlags.Parse(args); err != nil { + return err + } + if subFlags.NArg() != 2 { + return fmt.Errorf("usage: VExec --dry-run keyspace.workflow \"\"") + } + keyspace, workflow, err := splitKeyspaceWorkflow(subFlags.Arg(0)) + if err != nil { + return err + } + _, err = wr.TopoServer().GetKeyspace(ctx, keyspace) + if err != nil { + wr.Logger().Errorf("keyspace %s not found", keyspace) + } + query := subFlags.Arg(1) + + results, err := wr.VExec(ctx, workflow, keyspace, query, *dryRun) + if err != nil { + return err + } + if *dryRun { + return nil + } + if len(results) == 0 { + wr.Logger().Printf("no result returned\n") + } + var qr *sqltypes.Result + var numFields int + for _, result := range results { + numFields = len(result.Fields) + break + } + if numFields != 0 { + qr = queryResultForTabletResults(results) + } else { + qr = queryResultForRowsAffected(results) + } + if len(qr.Rows) == 0 { + return nil + } + if *json { + return printJSON(wr.Logger(), qr) + } + printQueryResult(loggerWriter{wr.Logger()}, qr) + return nil +} + +// called for workflow stop/start/delete. Only rows affected are reported per tablet +func queryResultForRowsAffected(results map[*topo.TabletInfo]*sqltypes.Result) *sqltypes.Result { + var qr = &sqltypes.Result{} + qr.RowsAffected = uint64(len(results)) + qr.Fields = []*querypb.Field{{ + Name: "Tablet", + Type: sqltypes.VarBinary, + }, { + Name: "RowsAffected", + Type: sqltypes.Uint64, + }} + var row2 []sqltypes.Value + for tablet, result := range results { + row2 = nil + row2 = append(row2, sqltypes.NewVarBinary(tablet.AliasString())) + row2 = append(row2, sqltypes.NewUint64(result.RowsAffected)) + qr.Rows = append(qr.Rows, row2) + } + return qr +} + +func queryResultForTabletResults(results map[*topo.TabletInfo]*sqltypes.Result) *sqltypes.Result { + var qr = &sqltypes.Result{} + qr.RowsAffected = uint64(len(results)) + qr.Fields = []*querypb.Field{{ + Name: "Tablet", + Type: sqltypes.VarBinary, + }} + var row2 []sqltypes.Value + for tablet, result := range results { + for _, row := range result.Rows { + if len(qr.Fields) == 1 { + qr.Fields = append(qr.Fields, result.Fields...) + } + row2 = nil + row2 = append(row2, sqltypes.NewVarBinary(tablet.AliasString())) + row2 = append(row2, row...) + qr.Rows = append(qr.Rows, row2) + } + } + return qr +} + +func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { + dryRun := subFlags.Bool("dry_run", false, "Does a dry run of Workflow and only reports the final query and list of masters on which the operation will be applied") + if err := subFlags.Parse(args); err != nil { + return err + } + if subFlags.NArg() != 2 { + return fmt.Errorf("usage: Workflow --dry-run keyspace.workflow start/stop/delete") + } + keyspace, workflow, err := splitKeyspaceWorkflow(subFlags.Arg(0)) + if err != nil { + return err + } + _, err = wr.TopoServer().GetKeyspace(ctx, keyspace) + if err != nil { + wr.Logger().Errorf("Keyspace %s not found", keyspace) + } + action := subFlags.Arg(1) + + results, err := wr.WorkflowAction(ctx, workflow, keyspace, action, *dryRun) + if err != nil { + return err + } + if action == "list" { + return nil + } + if len(results) == 0 { + wr.Logger().Printf("no result returned\n") + return nil + } + qr := queryResultForRowsAffected(results) + + printQueryResult(loggerWriter{wr.Logger()}, qr) + return nil + +} + func commandPanic(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { panic(fmt.Errorf("this command panics on purpose")) } diff --git a/go/vt/wrangler/vexec.go b/go/vt/wrangler/vexec.go new file mode 100644 index 00000000000..e978811e63d --- /dev/null +++ b/go/vt/wrangler/vexec.go @@ -0,0 +1,399 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wrangler + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "sync" + "time" + + "vitess.io/vitess/go/vt/log" + + "github.com/gogo/protobuf/proto" + "github.com/olekukonko/tablewriter" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/concurrency" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/evalengine" +) + +const ( + vreplicationTableName = "_vt.vreplication" +) + +type vexec struct { + ctx context.Context + workflow string + keyspace string + query string + + wr *Wrangler + + plan *vexecPlan + masters []*topo.TabletInfo +} + +func newVExec(ctx context.Context, workflow, keyspace, query string, wr *Wrangler) *vexec { + return &vexec{ + ctx: ctx, + workflow: workflow, + keyspace: keyspace, + query: query, + wr: wr, + } +} + +// VExec executes queries on _vt.vreplication on all masters in the target keyspace of the workflow +func (wr *Wrangler) VExec(ctx context.Context, workflow, keyspace, query string, dryRun bool) (map[*topo.TabletInfo]*sqltypes.Result, error) { + results, err := wr.runVexec(ctx, workflow, keyspace, query, dryRun) + retResults := make(map[*topo.TabletInfo]*sqltypes.Result) + for tablet, result := range results { + retResults[tablet] = sqltypes.Proto3ToResult(result) + } + return retResults, err +} + +func (wr *Wrangler) runVexec(ctx context.Context, workflow, keyspace, query string, dryRun bool) (map[*topo.TabletInfo]*querypb.QueryResult, error) { + vx := newVExec(ctx, workflow, keyspace, query, wr) + if err := vx.getMasters(); err != nil { + return nil, err + } + if _, err := vx.buildVExecPlan(); err != nil { + return nil, err + } + fullQuery := vx.plan.parsedQuery.Query + if dryRun { + return nil, vx.outputDryRunInfo(wr) + } + return vx.exec(fullQuery) +} + +func (vx *vexec) outputDryRunInfo(wr *Wrangler) error { + rsr, err := vx.wr.getStreams(vx.ctx, vx.workflow, vx.keyspace) + if err != nil { + return err + } + + wr.Logger().Printf("Query: %s\nwill be run on the following streams in keyspace %s for workflow %s:\n\n", + vx.plan.parsedQuery.Query, vx.keyspace, vx.workflow) + tableString := &strings.Builder{} + table := tablewriter.NewWriter(tableString) + table.SetHeader([]string{"Tablet", "ID", "BinLogSource", "State", "DBName", "Current GTID", "MaxReplicationLag"}) + for _, master := range vx.masters { + key := fmt.Sprintf("%s/%s", master.Shard, master.AliasString()) + for _, stream := range rsr.Statuses[key] { + table.Append([]string{key, fmt.Sprintf("%d", stream.ID), stream.Bls.String(), stream.State, stream.DBName, stream.Pos, fmt.Sprintf("%d", stream.MaxReplicationLag)}) + } + } + table.SetAutoMergeCellsByColumnIndex([]int{0}) + table.SetRowLine(true) + table.Render() + wr.Logger().Printf(tableString.String()) + wr.Logger().Printf("\n\n") + + return nil +} + +func (vx *vexec) exec(query string) (map[*topo.TabletInfo]*querypb.QueryResult, error) { + var wg sync.WaitGroup + workflow := vx.workflow + allErrors := &concurrency.AllErrorRecorder{} + results := make(map[*topo.TabletInfo]*querypb.QueryResult) + var mu sync.Mutex + ctx, cancel := context.WithTimeout(vx.ctx, 10*time.Second) + defer cancel() + for _, master := range vx.masters { + wg.Add(1) + go func(ctx context.Context, master *topo.TabletInfo) { + defer wg.Done() + log.Infof("Running %s on %s\n", query, master.AliasString()) + qr, err := vx.wr.VReplicationExec(ctx, master.Alias, query) + log.Infof("Result is %s: %v", master.AliasString(), qr) + if err != nil { + allErrors.RecordError(err) + } else { + if qr.RowsAffected == 0 { + allErrors.RecordError(fmt.Errorf("\nno matching streams found for workflow %s, tablet %s, query %s", workflow, master.Alias, query)) + } else { + mu.Lock() + results[master] = qr + mu.Unlock() + } + } + }(ctx, master) + } + wg.Wait() + return results, allErrors.AggrError(vterrors.Aggregate) +} + +func (vx *vexec) getMasters() error { + var err error + shards, err := vx.wr.ts.GetShardNames(vx.ctx, vx.keyspace) + if err != nil { + return err + } + if len(shards) == 0 { + return fmt.Errorf("no shards found in keyspace %s", vx.keyspace) + } + var allMasters []*topo.TabletInfo + var master *topo.TabletInfo + for _, shard := range shards { + if master, err = vx.getMasterForShard(shard); err != nil { + return err + } + if master == nil { + return fmt.Errorf("no master found for shard %s", shard) + } + allMasters = append(allMasters, master) + } + vx.masters = allMasters + return nil +} + +func (vx *vexec) getMasterForShard(shard string) (*topo.TabletInfo, error) { + si, err := vx.wr.ts.GetShard(vx.ctx, vx.keyspace, shard) + if err != nil { + return nil, err + } + if si.MasterAlias == nil { + return nil, fmt.Errorf("no master found for shard %s", shard) + } + master, err := vx.wr.ts.GetTablet(vx.ctx, si.MasterAlias) + if err != nil { + return nil, err + } + if master == nil { + return nil, fmt.Errorf("could not get tablet for %s:%s", vx.keyspace, si.MasterAlias) + } + return master, nil +} + +// WorkflowAction can start/stop/delete or list strams in _vt.vreplication on all masters in the target keyspace of the workflow +func (wr *Wrangler) WorkflowAction(ctx context.Context, workflow, keyspace, action string, dryRun bool) (map[*topo.TabletInfo]*sqltypes.Result, error) { + if action == "list" { + return nil, wr.listStreams(ctx, workflow, keyspace) + } + results, err := wr.execWorkflowAction(ctx, workflow, keyspace, action, dryRun) + retResults := make(map[*topo.TabletInfo]*sqltypes.Result) + for tablet, result := range results { + retResults[tablet] = sqltypes.Proto3ToResult(result) + } + return retResults, err +} + +func (wr *Wrangler) getWorkflowActionQuery(action string) (string, error) { + var query string + updateSQL := "update _vt.vreplication set state = %s" + switch action { + case "stop": + query = fmt.Sprintf(updateSQL, encodeString("Stopped")) + case "start": + query = fmt.Sprintf(updateSQL, encodeString("Running")) + case "delete": + query = "delete from _vt.vreplication" + default: + return "", fmt.Errorf("invalid action found: %s", action) + } + return query, nil +} + +func (wr *Wrangler) execWorkflowAction(ctx context.Context, workflow, keyspace, action string, dryRun bool) (map[*topo.TabletInfo]*querypb.QueryResult, error) { + var err error + query, err := wr.getWorkflowActionQuery(action) + if err != nil { + return nil, err + } + vx := newVExec(ctx, workflow, keyspace, query, wr) + err = vx.getMasters() + if err != nil { + return nil, err + } + if _, err := vx.buildVExecPlan(); err != nil { + return nil, err + } + fullQuery := vx.plan.parsedQuery.Query + if dryRun { + return nil, vx.outputDryRunInfo(wr) + } + results, err := vx.exec(fullQuery) + return results, err +} + +type replicationStatusResult struct { + Workflow string + SourceKeyspace string + TargetKeyspace string + + Statuses map[string][]*replicationStatus +} + +type copyState struct { + Table string + LastPK string +} + +type replicationStatus struct { + Shard string + Tablet string + ID int64 + Bls binlogdatapb.BinlogSource + Pos string + StopPos string + State string + MaxReplicationLag int64 + DBName string + TimeUpdated int64 + Message string + + CopyState []copyState +} + +func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqltypes.Value, master *topo.TabletInfo) (*replicationStatus, string, error) { + var err error + var id, maxReplicationLag, timeUpdated int64 + var state, dbName, pos, stopPos, message string + var bls binlogdatapb.BinlogSource + id, err = evalengine.ToInt64(row[0]) + if err != nil { + return nil, "", err + } + if err := proto.UnmarshalText(row[1].ToString(), &bls); err != nil { + return nil, "", err + } + pos = row[2].ToString() + stopPos = row[3].ToString() + maxReplicationLag, err = evalengine.ToInt64(row[4]) + if err != nil { + return nil, "", err + } + state = row[5].ToString() + dbName = row[6].ToString() + timeUpdated, err = evalengine.ToInt64(row[7]) + if err != nil { + return nil, "", err + } + message = row[8].ToString() + status := &replicationStatus{ + Shard: master.Shard, + Tablet: master.AliasString(), + ID: id, + Bls: bls, + Pos: pos, + StopPos: stopPos, + State: state, + DBName: dbName, + MaxReplicationLag: maxReplicationLag, + TimeUpdated: timeUpdated, + Message: message, + } + status.CopyState, err = wr.getCopyState(ctx, master, id) + if err != nil { + return nil, "", err + } + + status.State = updateState(message, status.State, status.CopyState, timeUpdated) + return status, bls.Keyspace, nil +} + +func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) (*replicationStatusResult, error) { + var rsr replicationStatusResult + rsr.Statuses = make(map[string][]*replicationStatus) + rsr.Workflow = workflow + rsr.TargetKeyspace = keyspace + var results map[*topo.TabletInfo]*querypb.QueryResult + query := "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, message from _vt.vreplication" + results, err := wr.runVexec(ctx, workflow, keyspace, query, false) + if err != nil { + return nil, err + } + for master, result := range results { + var rsrStatus []*replicationStatus + qr := sqltypes.Proto3ToResult(result) + for _, row := range qr.Rows { + status, sourceKeyspace, err := wr.getReplicationStatusFromRow(ctx, row, master) + fmt.Printf("getReplicationStatusFromRow status for master %s is %v\n", master.AliasString(), status) + if err != nil { + return nil, err + } + rsr.SourceKeyspace = sourceKeyspace + + rsrStatus = append(rsrStatus, status) + } + rsr.Statuses[fmt.Sprintf("%s/%s", master.Shard, master.AliasString())] = rsrStatus + } + return &rsr, nil +} + +func (wr *Wrangler) listStreams(ctx context.Context, workflow, keyspace string) error { + replStatus, err := wr.getStreams(ctx, workflow, keyspace) + if err != nil { + return err + } + if err := dumpStreamListAsJSON(replStatus, wr); err != nil { + return err + } + + return nil +} + +func updateState(message, state string, cs []copyState, timeUpdated int64) string { + if message != "" { + state = "Error" + } else if state == "Running" && len(cs) > 0 { + state = "Copying" + } else if state == "Running" && int64(time.Now().Second())-timeUpdated > 10 /* seconds */ { + state = "Lagging" + } + return state +} + +func dumpStreamListAsJSON(replStatus *replicationStatusResult, wr *Wrangler) error { + text, err := json.MarshalIndent(replStatus, "", "\t") + if err != nil { + return err + } + wr.Logger().Printf("%s\n", text) + return nil +} + +func (wr *Wrangler) getCopyState(ctx context.Context, tablet *topo.TabletInfo, id int64) ([]copyState, error) { + var cs []copyState + query := fmt.Sprintf(`select table_name, lastpk from _vt.copy_state where vrepl_id = %d`, id) + qr, err := wr.VReplicationExec(ctx, tablet.Alias, query) + if err != nil { + return nil, err + } + if qr != nil { + for _, row := range qr.Rows { + table := string(row.Values[0]) + lastPK := string(row.Values[1]) + copyState := copyState{ + Table: table, + LastPK: lastPK, + } + cs = append(cs, copyState) + } + } + + return cs, nil +} diff --git a/go/vt/wrangler/vexec_plan.go b/go/vt/wrangler/vexec_plan.go new file mode 100644 index 00000000000..a61a986d679 --- /dev/null +++ b/go/vt/wrangler/vexec_plan.go @@ -0,0 +1,205 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wrangler + +import ( + "fmt" + + "vitess.io/vitess/go/vt/sqlparser" +) + +type vexecPlan struct { + query string + opcode int + parsedQuery *sqlparser.ParsedQuery +} + +const ( + updateQuery = iota + deleteQuery + selectQuery +) + +func (vx *vexec) buildVExecPlan() (*vexecPlan, error) { + stmt, err := sqlparser.Parse(vx.query) + if err != nil { + return nil, err + } + var plan *vexecPlan + switch stmt := stmt.(type) { + case *sqlparser.Update: + plan, err = vx.buildUpdatePlan(stmt) + case *sqlparser.Delete: + plan, err = vx.buildDeletePlan(stmt) + case *sqlparser.Select: + plan, err = vx.buildSelectPlan(stmt) + default: + return nil, fmt.Errorf("query not supported by vexec: %s", sqlparser.String(stmt)) + } + + if err != nil { + return nil, err + } + plan.query = vx.query + vx.plan = plan + return plan, nil +} + +func splitAndExpression(filters []sqlparser.Expr, node sqlparser.Expr) []sqlparser.Expr { + if node == nil { + return filters + } + switch node := node.(type) { + case *sqlparser.AndExpr: + filters = splitAndExpression(filters, node.Left) + return splitAndExpression(filters, node.Right) + } + return append(filters, node) +} + +func (vx *vexec) analyzeWhere(where *sqlparser.Where) []string { + var cols []string + if where == nil { + return cols + } + exprs := splitAndExpression(nil, where.Expr) + for _, expr := range exprs { + switch expr := expr.(type) { + case *sqlparser.ComparisonExpr: + qualifiedName, ok := expr.Left.(*sqlparser.ColName) + if ok { + cols = append(cols, qualifiedName.Name.String()) + } + } + } + return cols +} + +func (vx *vexec) addDefaultWheres(where *sqlparser.Where) *sqlparser.Where { + cols := vx.analyzeWhere(where) + var hasDBName, hasWorkflow bool + for _, col := range cols { + if col == "db_name" { + hasDBName = true + } else if col == "workflow" { + hasWorkflow = true + } + } + newWhere := where + if !hasDBName { + expr := &sqlparser.ComparisonExpr{ + Left: &sqlparser.ColName{Name: sqlparser.NewColIdent("db_name")}, + Operator: sqlparser.EqualStr, + Right: sqlparser.NewStrVal([]byte(vx.masters[0].DbName())), + } + if newWhere == nil { + newWhere = &sqlparser.Where{ + Type: sqlparser.WhereStr, + Expr: expr, + } + } else { + newWhere.Expr = &sqlparser.AndExpr{ + Left: newWhere.Expr, + Right: expr, + } + } + } + if !hasWorkflow { + expr := &sqlparser.ComparisonExpr{ + Left: &sqlparser.ColName{Name: sqlparser.NewColIdent("workflow")}, + Operator: sqlparser.EqualStr, + Right: sqlparser.NewStrVal([]byte(vx.workflow)), + } + newWhere.Expr = &sqlparser.AndExpr{ + Left: newWhere.Expr, + Right: expr, + } + } + return newWhere +} + +func (vx *vexec) buildUpdatePlan(upd *sqlparser.Update) (*vexecPlan, error) { + switch sqlparser.String(upd.TableExprs) { + case vreplicationTableName: + // no-op + default: + return nil, fmt.Errorf("vexec does not support: %v", sqlparser.String(upd.TableExprs)) + } + if upd.OrderBy != nil || upd.Limit != nil { + return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(upd)) + } + for _, expr := range upd.Exprs { + if expr.Name.Name.EqualString("id") { + return nil, fmt.Errorf("id cannot be changed: %v", sqlparser.String(expr)) + } + } + + upd.Where = vx.addDefaultWheres(upd.Where) + + buf := sqlparser.NewTrackedBuffer(nil) + buf.Myprintf("%v", upd) + + return &vexecPlan{ + opcode: updateQuery, + parsedQuery: buf.ParsedQuery(), + }, nil +} + +func (vx *vexec) buildDeletePlan(del *sqlparser.Delete) (*vexecPlan, error) { + switch sqlparser.String(del.TableExprs) { + case vreplicationTableName: + // no-op + default: + return nil, fmt.Errorf("invalid table name: %v", sqlparser.String(del.TableExprs)) + } + if del.Targets != nil { + return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(del)) + } + if del.Partitions != nil { + return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(del)) + } + if del.OrderBy != nil || del.Limit != nil { + return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(del)) + } + + del.Where = vx.addDefaultWheres(del.Where) + + buf := sqlparser.NewTrackedBuffer(nil) + buf.Myprintf("%v", del) + + return &vexecPlan{ + opcode: deleteQuery, + parsedQuery: buf.ParsedQuery(), + }, nil +} + +func (vx *vexec) buildSelectPlan(sel *sqlparser.Select) (*vexecPlan, error) { + switch sqlparser.String(sel.From) { + case vreplicationTableName: + // no-op + default: + return nil, fmt.Errorf("invalid table name: %v", sqlparser.String(sel.From)) + } + sel.Where = vx.addDefaultWheres(sel.Where) + buf := sqlparser.NewTrackedBuffer(nil) + buf.Myprintf("%v", sel) + + return &vexecPlan{ + opcode: selectQuery, + parsedQuery: buf.ParsedQuery(), + }, nil +} diff --git a/go/vt/wrangler/vexec_test.go b/go/vt/wrangler/vexec_test.go new file mode 100644 index 00000000000..f0a15e0841f --- /dev/null +++ b/go/vt/wrangler/vexec_test.go @@ -0,0 +1,373 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wrangler + +import ( + "context" + "fmt" + "sort" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/utils" + "vitess.io/vitess/go/vt/logutil" +) + +func TestListStreams(t *testing.T) { + ctx := context.Background() + workflow := "wrWorkflow" + keyspace := "target" + env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil) + defer env.close() + var logger = logutil.NewMemoryLogger() + wr := New(logger, env.topoServ, env.tmc) + //query := "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, message from _vt.vreplication where workflow = 'wrWorkflow' and db_name = 'vt_target'" + + wr.WorkflowAction(ctx, workflow, keyspace, "ListStreams", false) + +} + +func TestVExec(t *testing.T) { + ctx := context.Background() + workflow := "wrWorkflow" + keyspace := "target" + query := "update _vt.vreplication set state = 'Running'" + env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil) + defer env.close() + var logger = logutil.NewMemoryLogger() + wr := New(logger, env.topoServ, env.tmc) + + vx := newVExec(ctx, workflow, keyspace, query, wr) + err := vx.getMasters() + require.Nil(t, err) + masters := vx.masters + require.NotNil(t, masters) + require.Equal(t, len(masters), 2) + var shards []string + for _, master := range masters { + shards = append(shards, master.Shard) + } + sort.Strings(shards) + require.Equal(t, fmt.Sprintf("%v", shards), "[-80 80-]") + plan, err := vx.buildVExecPlan() + require.NoError(t, err) + require.NotNil(t, plan) + + addWheres := func(query string) string { + if strings.Contains(query, " where ") { + query += " and " + } else { + query += " where " + } + query += fmt.Sprintf("db_name = %s and workflow = %s", encodeString("vt_"+keyspace), encodeString(workflow)) + return query + } + want := addWheres(query) + require.Equal(t, want, plan.parsedQuery.Query) + + query = plan.parsedQuery.Query + vx.exec(query) + + type TestCase struct { + name string + query string + result *sqltypes.Result + errorString string + } + + var result *sqltypes.Result + var testCases []*TestCase + result = sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|message", + "int64|varchar|varchar"), + "1|keyspace:\"source\" shard:\"0\" filter: >|", + ) + testCases = append(testCases, &TestCase{ + name: "select", + query: "select id, source, message from _vt.vreplication", + result: result, + }) + result = &sqltypes.Result{ + RowsAffected: 1, + Rows: [][]sqltypes.Value{}, + } + testCases = append(testCases, &TestCase{ + name: "delete", + query: "delete from _vt.vreplication where message != ''", + result: result, + }) + result = &sqltypes.Result{ + RowsAffected: 1, + Rows: [][]sqltypes.Value{}, + } + testCases = append(testCases, &TestCase{ + name: "update", + query: "update _vt.vreplication set state='Stopped', message='for wrangler test'", + result: result, + }) + + errorString := "query not supported by vexec" + testCases = append(testCases, &TestCase{ + name: "insert", + query: "insert into _vt.vreplication(state, workflow, db_name) values ('Running', 'wk1', 'ks1'), ('Stopped', 'wk1', 'ks1')", + errorString: errorString, + }) + + errorString = "invalid table name" + testCases = append(testCases, &TestCase{ + name: "delete invalid-other-table", + query: "delete from _vt.copy_state", + errorString: errorString, + }) + + for _, testCase := range testCases { + t.Run(testCase.query, func(t *testing.T) { + results, err := wr.VExec(ctx, workflow, keyspace, testCase.query, false) + if testCase.errorString == "" { + require.NoError(t, err) + for _, result := range results { + utils.MustMatch(t, testCase.result, result, "Incorrect result") + } + } else { + require.Error(t, err) + if !strings.Contains(err.Error(), testCase.errorString) { + t.Fatalf("Wrong error, want %s, got %s", testCase.errorString, err.Error()) + } + } + }) + } + + query = "delete from _vt.vreplication" + _, err = wr.VExec(ctx, workflow, keyspace, query, true) + require.NoError(t, err) + dryRunResults := []string{ + "Query: delete from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", + "will be run on the following streams in keyspace target for workflow wrWorkflow:\n\n", + "+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+", + "| TABLET | ID | BINLOGSOURCE | STATE | DBNAME | CURRENT GTID | MAXREPLICATIONLAG |", + "+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+", + "| -80/zone1-0000000200 | 1 | keyspace:\"source\" shard:\"0\" | Copying | vt_target | pos | 0 |", + "| | | filter: > | | | | |", + "+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+", + "| 80-/zone1-0000000210 | 1 | keyspace:\"source\" shard:\"0\" | Copying | vt_target | pos | 0 |", + "| | | filter: > | | | | |", + "+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+", + } + require.Equal(t, strings.Join(dryRunResults, "\n")+"\n\n\n\n\n", logger.String()) +} + +func TestWorkflowStatusUpdate(t *testing.T) { + require.Equal(t, "Error", updateState("master tablet not contactable", "Running", nil, 0)) + require.Equal(t, "Lagging", updateState("", "Running", nil, int64(time.Now().Second())-100)) + require.Equal(t, "Copying", updateState("", "Running", []copyState{{Table: "t1", LastPK: "[[INT64(10)]]"}}, int64(time.Now().Second()))) +} + +func TestWorkflowListStreams(t *testing.T) { + ctx := context.Background() + workflow := "wrWorkflow" + keyspace := "target" + env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil) + defer env.close() + logger := logutil.NewMemoryLogger() + wr := New(logger, env.topoServ, env.tmc) + + err := wr.listStreams(ctx, workflow, keyspace) + require.Nil(t, err) + want := `{ + "Workflow": "wrWorkflow", + "SourceKeyspace": "source", + "TargetKeyspace": "target", + "Statuses": { + "-80/zone1-0000000200": [ + { + "Shard": "-80", + "Tablet": "zone1-0000000200", + "ID": 1, + "Bls": { + "keyspace": "source", + "shard": "0", + "filter": { + "rules": [ + { + "match": "t1" + } + ] + } + }, + "Pos": "pos", + "StopPos": "", + "State": "Copying", + "MaxReplicationLag": 0, + "DBName": "vt_target", + "TimeUpdated": 1234, + "Message": "", + "CopyState": [ + { + "Table": "t", + "LastPK": "1" + } + ] + } + ], + "80-/zone1-0000000210": [ + { + "Shard": "80-", + "Tablet": "zone1-0000000210", + "ID": 1, + "Bls": { + "keyspace": "source", + "shard": "0", + "filter": { + "rules": [ + { + "match": "t1" + } + ] + } + }, + "Pos": "pos", + "StopPos": "", + "State": "Copying", + "MaxReplicationLag": 0, + "DBName": "vt_target", + "TimeUpdated": 1234, + "Message": "", + "CopyState": [ + { + "Table": "t", + "LastPK": "1" + } + ] + } + ] + } +} + +` + require.Equal(t, want, logger.String()) + + results, err := wr.execWorkflowAction(ctx, workflow, keyspace, "stop", false) + require.Nil(t, err) + require.Equal(t, "map[Tablet{zone1-0000000200}:rows_affected:1 Tablet{zone1-0000000210}:rows_affected:1 ]", fmt.Sprintf("%v", results)) + + logger.Clear() + results, err = wr.execWorkflowAction(ctx, workflow, keyspace, "stop", true) + require.Nil(t, err) + require.Equal(t, "map[]", fmt.Sprintf("%v", results)) + dryRunResult := `Query: update _vt.vreplication set state = 'Stopped' where db_name = 'vt_target' and workflow = 'wrWorkflow' +will be run on the following streams in keyspace target for workflow wrWorkflow: + + ++----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+ +| TABLET | ID | BINLOGSOURCE | STATE | DBNAME | CURRENT GTID | MAXREPLICATIONLAG | ++----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+ +| -80/zone1-0000000200 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | pos | 0 | +| | | filter: > | | | | | ++----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+ +| 80-/zone1-0000000210 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | pos | 0 | +| | | filter: > | | | | | ++----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+ + + + + +` + require.Equal(t, dryRunResult, logger.String()) +} + +func TestVExecValidations(t *testing.T) { + ctx := context.Background() + workflow := "wf" + keyspace := "ks" + query := "" + env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil) + defer env.close() + + wr := New(logutil.NewConsoleLogger(), env.topoServ, env.tmc) + + vx := newVExec(ctx, workflow, keyspace, query, wr) + + type badQuery struct { + name string + query string + errorString string + } + badQueries := []badQuery{ + { + name: "invalid", + query: "bad query", + errorString: "syntax error at position 4 near 'bad'", + }, + { + name: "incorrect table", + query: "select * from _vt.vreplication2", + errorString: "invalid table name: _vt.vreplication2", + }, + { + name: "unsupported query", + query: "describe _vt.vreplication", + errorString: "query not supported by vexec: otherread", + }, + } + for _, bq := range badQueries { + t.Run(bq.name, func(t *testing.T) { + vx.query = bq.query + plan, err := vx.buildVExecPlan() + require.EqualError(t, err, bq.errorString) + require.Nil(t, plan) + }) + } + + type action struct { + name string + want string + expectedError error + } + updateSQL := "update _vt.vreplication set state = %s" + actions := []action{ + { + name: "start", + want: fmt.Sprintf(updateSQL, encodeString("Running")), + expectedError: nil, + }, + { + name: "stop", + want: fmt.Sprintf(updateSQL, encodeString("Stopped")), + expectedError: nil, + }, + { + name: "delete", + want: "delete from _vt.vreplication", + expectedError: nil, + }, + { + name: "other", + want: "", + expectedError: fmt.Errorf("invalid action found: other"), + }} + + for _, a := range actions { + t.Run(a.name, func(t *testing.T) { + sql, err := wr.getWorkflowActionQuery(a.name) + require.Equal(t, a.expectedError, err) + require.Equal(t, a.want, sql) + }) + } +} diff --git a/go/vt/wrangler/wrangler_env_test.go b/go/vt/wrangler/wrangler_env_test.go new file mode 100644 index 00000000000..fd2c9af4c3f --- /dev/null +++ b/go/vt/wrangler/wrangler_env_test.go @@ -0,0 +1,307 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wrangler + +import ( + "flag" + "fmt" + "sync" + + "golang.org/x/net/context" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/grpcclient" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/logutil" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/vttablet/queryservice" + "vitess.io/vitess/go/vt/vttablet/queryservice/fakes" + "vitess.io/vitess/go/vt/vttablet/tabletconn" + "vitess.io/vitess/go/vt/vttablet/tmclient" +) + +const ( + testStopPosition = "MariaDB/5-456-892" + testSourceGtid = "MariaDB/5-456-893" + testTargetMasterPosition = "MariaDB/6-456-892" +) + +type testWranglerEnv struct { + wr *Wrangler + workflow string + topoServ *topo.Server + cell string + tabletType topodatapb.TabletType + tmc *testWranglerTMClient + + mu sync.Mutex + tablets map[int]*testWranglerTablet +} + +// wranglerEnv has to be a global for RegisterDialer to work. +var wranglerEnv *testWranglerEnv + +func init() { + tabletconn.RegisterDialer("WranglerTest", func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { + wranglerEnv.mu.Lock() + defer wranglerEnv.mu.Unlock() + fmt.Println("In WranglerTest dialer") + if qs, ok := wranglerEnv.tablets[int(tablet.Alias.Uid)]; ok { + fmt.Printf("query service is %v", qs) + return qs, nil + } + return nil, fmt.Errorf("tablet %d not found", tablet.Alias.Uid) + }) +} + +//---------------------------------------------- +// testWranglerEnv + +func newWranglerTestEnv(sourceShards, targetShards []string, query string, positions map[string]string) *testWranglerEnv { + flag.Set("tablet_protocol", "WranglerTest") + env := &testWranglerEnv{ + workflow: "wrWorkflow", + tablets: make(map[int]*testWranglerTablet), + topoServ: memorytopo.NewServer("zone1"), + cell: "zone1", + tabletType: topodatapb.TabletType_REPLICA, + tmc: newTestWranglerTMClient(), + } + env.wr = New(logutil.NewConsoleLogger(), env.topoServ, env.tmc) + + tabletID := 100 + for _, shard := range sourceShards { + _ = env.addTablet(tabletID, "source", shard, topodatapb.TabletType_MASTER) + _ = env.addTablet(tabletID+1, "source", shard, topodatapb.TabletType_REPLICA) + env.tmc.waitpos[tabletID+1] = testStopPosition + + tabletID += 10 + } + tabletID = 200 + for _, shard := range targetShards { + master := env.addTablet(tabletID, "target", shard, topodatapb.TabletType_MASTER) + _ = env.addTablet(tabletID+1, "target", shard, topodatapb.TabletType_REPLICA) + + var rows []string + var posRows []string + var bls *binlogdatapb.BinlogSource + for j, sourceShard := range sourceShards { + bls = &binlogdatapb.BinlogSource{ + Keyspace: "source", + Shard: sourceShard, + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: query, + }}, + }, + } + rows = append(rows, fmt.Sprintf("%d|%v|", j+1, bls)) + position := testStopPosition + if pos := positions[sourceShard+shard]; pos != "" { + position = pos + } + posRows = append(posRows, fmt.Sprintf("%v|%s", bls, position)) + + env.tmc.setVRResults( + master.tablet, + fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos='%s', message='synchronizing for wrangler test' where id=%d", testSourceGtid, j+1), + &sqltypes.Result{}, + ) + } + // migrater buildMigrationTargets + env.tmc.setVRResults( + master.tablet, + "select id, source, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", + sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|message", + "int64|varchar|varchar"), + rows..., + ), + ) + + env.tmc.setVRResults(master.tablet, "update _vt.vreplication set state = 'Stopped', message = 'for wrangler test' where db_name = 'vt_target' and workflow = 'wrWorkflow'", &sqltypes.Result{RowsAffected: 1}) + env.tmc.setVRResults(master.tablet, "update _vt.vreplication set state = 'Stopped' where db_name = 'vt_target' and workflow = 'wrWorkflow'", &sqltypes.Result{RowsAffected: 1}) + env.tmc.setVRResults(master.tablet, "delete from _vt.vreplication where message != '' and db_name = 'vt_target' and workflow = 'wrWorkflow'", &sqltypes.Result{RowsAffected: 1}) + env.tmc.setVRResults(master.tablet, "insert into _vt.vreplication(state, workflow, db_name) values ('Running', 'wk1', 'ks1'), ('Stopped', 'wk1', 'ks1')", &sqltypes.Result{RowsAffected: 2}) + + result := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|message", + "int64|varchar|varchar|varchar|int64|varchar|varchar|int64|varchar"), + fmt.Sprintf("1|%v|pos||0|Running|vt_target|1234|", bls), + ) + env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", result) + //" + env.tmc.setVRResults( + master.tablet, + "select source, pos from _vt.vreplication where db_name='vt_target' and workflow='wrWorkflow'", + sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "source|pos", + "varchar|varchar"), + posRows..., + ), + ) + + result = sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "table|lastpk", + "varchar|varchar"), + "t1|pk1", + ) + + env.tmc.setVRResults(master.tablet, "select table_name, lastpk from _vt.copy_state where vrepl_id = 1", result) + + env.tmc.vrpos[tabletID] = testSourceGtid + env.tmc.pos[tabletID] = testTargetMasterPosition + + env.tmc.waitpos[tabletID+1] = testTargetMasterPosition + + env.tmc.setVRResults(master.tablet, "update _vt.vreplication set state='Running', message='', stop_pos='' where db_name='vt_target' and workflow='wrWorkflow'", &sqltypes.Result{}) + + tabletID += 10 + } + wranglerEnv = env + return env +} + +func (env *testWranglerEnv) close() { + env.mu.Lock() + defer env.mu.Unlock() + for _, t := range env.tablets { + env.topoServ.DeleteTablet(context.Background(), t.tablet.Alias) + } + env.tablets = nil +} + +func (env *testWranglerEnv) addTablet(id int, keyspace, shard string, tabletType topodatapb.TabletType) *testWranglerTablet { + env.mu.Lock() + defer env.mu.Unlock() + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: env.cell, + Uid: uint32(id), + }, + Keyspace: keyspace, + Shard: shard, + KeyRange: &topodatapb.KeyRange{}, + Type: tabletType, + PortMap: map[string]int32{ + "test": int32(id), + }, + } + env.tablets[id] = newTestWranglerTablet(tablet) + if err := env.wr.InitTablet(context.Background(), tablet, false /* allowMasterOverride */, true /* createShardAndKeyspace */, false /* allowUpdate */); err != nil { + panic(err) + } + if tabletType == topodatapb.TabletType_MASTER { + _, err := env.wr.ts.UpdateShardFields(context.Background(), keyspace, shard, func(si *topo.ShardInfo) error { + si.MasterAlias = tablet.Alias + return nil + }) + if err != nil { + panic(err) + } + } + env.tablets[id].queryResults = make(map[string]*querypb.QueryResult) + return env.tablets[id] +} + +//---------------------------------------------- +// testWranglerTablet + +type testWranglerTablet struct { + queryservice.QueryService + tablet *topodatapb.Tablet + queryResults map[string]*querypb.QueryResult + gotQueries []string +} + +func newTestWranglerTablet(tablet *topodatapb.Tablet) *testWranglerTablet { + return &testWranglerTablet{ + QueryService: fakes.ErrorQueryService, + tablet: tablet, + } +} + +func (tvt *testWranglerTablet) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { + return callback(&querypb.StreamHealthResponse{ + Serving: true, + Target: &querypb.Target{ + Keyspace: tvt.tablet.Keyspace, + Shard: tvt.tablet.Shard, + TabletType: tvt.tablet.Type, + }, + RealtimeStats: &querypb.RealtimeStats{}, + }) +} + +//---------------------------------------------- +// testWranglerTMClient + +type testWranglerTMClient struct { + tmclient.TabletManagerClient + schema *tabletmanagerdatapb.SchemaDefinition + vrQueries map[int]map[string]*querypb.QueryResult + waitpos map[int]string + vrpos map[int]string + pos map[int]string +} + +func newTestWranglerTMClient() *testWranglerTMClient { + return &testWranglerTMClient{ + vrQueries: make(map[int]map[string]*querypb.QueryResult), + waitpos: make(map[int]string), + vrpos: make(map[int]string), + pos: make(map[int]string), + } +} + +func (tmc *testWranglerTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) { + return tmc.schema, nil +} + +func (tmc *testWranglerTMClient) setVRResults(tablet *topodatapb.Tablet, query string, result *sqltypes.Result) { + queries, ok := tmc.vrQueries[int(tablet.Alias.Uid)] + if !ok { + queries = make(map[string]*querypb.QueryResult) + tmc.vrQueries[int(tablet.Alias.Uid)] = queries + } + queries[query] = sqltypes.ResultToProto3(result) +} + +func (tmc *testWranglerTMClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) { + result, ok := tmc.vrQueries[int(tablet.Alias.Uid)][query] + if !ok { + return nil, fmt.Errorf("query %q not found for tablet %d", query, tablet.Alias.Uid) + } + return result, nil +} + +func (tmc *testWranglerTMClient) ExecuteFetchAsApp(ctx context.Context, tablet *topodatapb.Tablet, usePool bool, query []byte, maxRows int) (*querypb.QueryResult, error) { + // fmt.Printf("tablet: %d query: %s\n", tablet.Alias.Uid, string(query)) + t := wranglerEnv.tablets[int(tablet.Alias.Uid)] + t.gotQueries = append(t.gotQueries, string(query)) + result, ok := t.queryResults[string(query)] + if !ok { + result = &querypb.QueryResult{} + log.Errorf("QUery: %s, Result :%v\n", query, result) + } + return result, nil +}