diff --git a/docker/local/install_local_dependencies.sh b/docker/local/install_local_dependencies.sh index b723ee99452..be17c7dc14a 100755 --- a/docker/local/install_local_dependencies.sh +++ b/docker/local/install_local_dependencies.sh @@ -18,5 +18,10 @@ rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz mkdir -p /var/run/etcd && chown -R vitess:vitess /var/run/etcd +# Install gh-ost +curl -k -L https://github.com/openark/gh-ost/releases/download/v1.1.0/gh-ost-binary-linux-20200805092717.tar.gz -o /tmp/gh-ost.tar.gz +(cd /tmp/ && tar xzf gh-ost.tar.gz) +cp /tmp/gh-ost /usr/bin + # Clean up files we won't need in the final image. rm -rf /var/lib/apt/lists/* diff --git a/go.mod b/go.mod index 1ff58dc17cc..5fe2e6ed94e 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/golang/protobuf v1.3.2 github.com/golang/snappy v0.0.1 github.com/google/go-cmp v0.4.0 + github.com/google/uuid v1.1.1 github.com/googleapis/gnostic v0.2.0 // indirect github.com/gorilla/websocket v1.4.0 github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 @@ -41,11 +42,12 @@ require ( github.com/hashicorp/serf v0.9.2 // indirect github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428 github.com/imdario/mergo v0.3.6 // indirect + github.com/julienschmidt/httprouter v1.3.0 github.com/klauspost/compress v1.4.1 // indirect github.com/klauspost/cpuid v1.2.0 // indirect github.com/klauspost/pgzip v1.2.4 github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect - github.com/krishicks/yaml-patch v0.0.10 + github.com/krishicks/yaml-patch v0.0.10 // indirect github.com/magiconair/properties v1.8.1 github.com/manifoldco/promptui v0.7.0 github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1 diff --git a/go.sum b/go.sum index d017a2f9d3b..a8ef600806a 100644 --- a/go.sum +++ b/go.sum @@ -363,6 +363,8 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV github.com/juju/ansiterm v0.0.0-20180109212912-720a0952cc2a h1:FaWFmfWdAUKbSCtOU2QjDaorUexogfaMgbipgYATUMU= github.com/juju/ansiterm v0.0.0-20180109212912-720a0952cc2a/go.mod h1:UJSiEoRfvx3hP73CvoARgeLjaIOjybY9vj8PUPPFGeU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= +github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/gotool v1.0.0 h1:AV2c/EiW3KqPNT9ZKl07ehoAGi4C5/01Cfbblndcapg= @@ -685,6 +687,7 @@ golang.org/x/net v0.0.0-20191004110552-13f9640d40b9 h1:rjwSpXsdiK0dV8/Naq3kAw9ym golang.org/x/net v0.0.0-20191004110552-13f9640d40b9/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 h1:SVwTIAaPC2U/AvvLNZ2a7OVsmBpC8L5BlwK1whH3hm0= diff --git a/go/sqltypes/named_result.go b/go/sqltypes/named_result.go new file mode 100644 index 00000000000..7a67b5d4489 --- /dev/null +++ b/go/sqltypes/named_result.go @@ -0,0 +1,140 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sqltypes + +import ( + "errors" + + querypb "vitess.io/vitess/go/vt/proto/query" +) + +var ( + // ErrNoSuchField indicates a search for a value by an unknown field/column name + ErrNoSuchField = errors.New("No such field in RowNamedValues") +) + +// RowNamedValues contains a row's values as a map based on Field (aka table column) name +type RowNamedValues map[string]Value + +// ToString returns the named field as string +func (r RowNamedValues) ToString(fieldName string) (string, error) { + if v, ok := r[fieldName]; ok { + return v.ToString(), nil + } + return "", ErrNoSuchField +} + +// AsString returns the named field as string, or default value if nonexistent/error +func (r RowNamedValues) AsString(fieldName string, def string) string { + if v, err := r.ToString(fieldName); err == nil { + return v + } + return def +} + +// ToInt64 returns the named field as int64 +func (r RowNamedValues) ToInt64(fieldName string) (int64, error) { + if v, ok := r[fieldName]; ok { + return v.ToInt64() + } + return 0, ErrNoSuchField +} + +// AsInt64 returns the named field as int64, or default value if nonexistent/error +func (r RowNamedValues) AsInt64(fieldName string, def int64) int64 { + if v, err := r.ToInt64(fieldName); err == nil { + return v + } + return def +} + +// ToUint64 returns the named field as uint64 +func (r RowNamedValues) ToUint64(fieldName string) (uint64, error) { + if v, ok := r[fieldName]; ok { + return v.ToUint64() + } + return 0, ErrNoSuchField +} + +// AsUint64 returns the named field as uint64, or default value if nonexistent/error +func (r RowNamedValues) AsUint64(fieldName string, def uint64) uint64 { + if v, err := r.ToUint64(fieldName); err == nil { + return v + } + return def +} + +// ToBool returns the named field as bool +func (r RowNamedValues) ToBool(fieldName string) (bool, error) { + if v, ok := r[fieldName]; ok { + return v.ToBool() + } + return false, ErrNoSuchField +} + +// AsBool returns the named field as bool, or default value if nonexistent/error +func (r RowNamedValues) AsBool(fieldName string, def bool) bool { + if v, err := r.ToBool(fieldName); err == nil { + return v + } + return def +} + +// NamedResult represents a query result with named values as opposed to ordinal values. +type NamedResult struct { + Fields []*querypb.Field `json:"fields"` + RowsAffected uint64 `json:"rows_affected"` + InsertID uint64 `json:"insert_id"` + Rows []RowNamedValues `json:"rows"` +} + +// ToNamedResult converts a Result struct into a new NamedResult struct +func ToNamedResult(result *Result) (r *NamedResult) { + if result == nil { + return r + } + r = &NamedResult{ + Fields: result.Fields, + RowsAffected: result.RowsAffected, + InsertID: result.InsertID, + } + columnOrdinals := make(map[int]string) + for i, field := range result.Fields { + columnOrdinals[i] = field.Name + } + r.Rows = make([]RowNamedValues, len(result.Rows)) + for rowIndex, row := range result.Rows { + namedRow := make(RowNamedValues) + for i, value := range row { + namedRow[columnOrdinals[i]] = value + } + r.Rows[rowIndex] = namedRow + } + return r +} + +// Row assumes this result has exactly one row, and returns it, or else returns nil. +// It is useful for queries like: +// - select count(*) from ... +// - select @@read_only +// - select UNIX_TIMESTAMP() from dual +func (r *NamedResult) Row() RowNamedValues { + if len(r.Rows) != 1 { + return nil + } + return r.Rows[0] +} diff --git a/go/sqltypes/named_result_test.go b/go/sqltypes/named_result_test.go new file mode 100644 index 00000000000..1e83f0c24db --- /dev/null +++ b/go/sqltypes/named_result_test.go @@ -0,0 +1,60 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sqltypes + +import ( + "reflect" + "testing" + + querypb "vitess.io/vitess/go/vt/proto/query" +) + +func TestToNamedResult(t *testing.T) { + in := &Result{ + Fields: []*querypb.Field{{ + Name: "id", + Type: Int64, + }, { + Name: "status", + Type: VarChar, + }}, + InsertID: 1, + RowsAffected: 2, + Rows: [][]Value{ + {TestValue(Int64, "1"), TestValue(VarChar, "first")}, + {TestValue(Int64, "2"), TestValue(VarChar, "second")}, + {TestValue(Int64, "3"), TestValue(VarChar, "third")}, + }, + } + named := in.Named() + for i := range in.Rows { + { + want := in.Rows[i][0] + got := named.Rows[i]["id"] + if !reflect.DeepEqual(want, got) { + t.Errorf("Named:%+v\n, want:%+v\n", got, want) + } + } + { + want := in.Rows[i][1] + got := named.Rows[i]["status"] + if !reflect.DeepEqual(want, got) { + t.Errorf("Named:%+v\n, want:%+v\n", got, want) + } + } + } +} diff --git a/go/sqltypes/result.go b/go/sqltypes/result.go index b12378b2ebf..2843f31efac 100644 --- a/go/sqltypes/result.go +++ b/go/sqltypes/result.go @@ -218,3 +218,8 @@ func (result *Result) AppendResult(src *Result) { } result.Rows = append(result.Rows, src.Rows...) } + +// Named returns a NamedResult based on this struct +func (result *Result) Named() *NamedResult { + return ToNamedResult(result) +} diff --git a/go/sqltypes/value.go b/go/sqltypes/value.go index 07a0a58168a..77ce7d35c6c 100644 --- a/go/sqltypes/value.go +++ b/go/sqltypes/value.go @@ -20,6 +20,7 @@ package sqltypes import ( "encoding/base64" "encoding/json" + "errors" "fmt" "strconv" @@ -37,6 +38,9 @@ var ( DontEscape = byte(255) nullstr = []byte("null") + + // ErrIncompatibleTypeCast indicates a casting problem + ErrIncompatibleTypeCast = errors.New("Cannot convert value to desired type") ) // BinWriter interface is used for encoding values. @@ -203,6 +207,39 @@ func (v Value) Len() int { return len(v.val) } +// ToInt64 returns the value as MySQL would return it as a int64. +func (v Value) ToInt64() (int64, error) { + if !v.IsIntegral() { + return 0, ErrIncompatibleTypeCast + } + + return strconv.ParseInt(v.ToString(), 10, 64) +} + +// ToUint64 returns the value as MySQL would return it as a uint64. +func (v Value) ToUint64() (uint64, error) { + if !v.IsIntegral() { + return 0, ErrIncompatibleTypeCast + } + + return strconv.ParseUint(v.ToString(), 10, 64) +} + +// ToBool returns the value as a bool value +func (v Value) ToBool() (bool, error) { + i, err := v.ToInt64() + if err != nil { + return false, err + } + switch i { + case 0: + return false, nil + case 1: + return true, nil + } + return false, ErrIncompatibleTypeCast +} + // ToString returns the value as MySQL would return it as string. // If the value is not convertible like in the case of Expression, it returns nil. func (v Value) ToString() string { diff --git a/go/test/endtoend/tabletmanager/tablet_health_test.go b/go/test/endtoend/tabletmanager/tablet_health_test.go index 93289bcab6f..c00cf8e8e57 100644 --- a/go/test/endtoend/tabletmanager/tablet_health_test.go +++ b/go/test/endtoend/tabletmanager/tablet_health_test.go @@ -126,7 +126,7 @@ func TestHealthCheck(t *testing.T) { // stop replication, make sure we don't go unhealthy. // TODO: replace with StopReplication once StopSlave has been removed - err = clusterInstance.VtctlclientProcess.ExecuteCommand("StopSlave", rTablet.Alias) + err = clusterInstance.VtctlclientProcess.ExecuteCommand("StopReplication", rTablet.Alias) require.NoError(t, err) err = clusterInstance.VtctlclientProcess.ExecuteCommand("RunHealthCheck", rTablet.Alias) require.NoError(t, err) diff --git a/go/vt/schema/online_ddl.go b/go/vt/schema/online_ddl.go new file mode 100644 index 00000000000..d9f5e19c7ce --- /dev/null +++ b/go/vt/schema/online_ddl.go @@ -0,0 +1,155 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schema + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/google/uuid" + + "vitess.io/vitess/go/vt/topo" +) + +var ( + migrationBasePath = "schema-migration" +) + +// MigrationBasePath is the root for all schema migration entries +func MigrationBasePath() string { + return migrationBasePath +} + +// MigrationRequestsPath is the base path for all newly received schema migration requests. +// such requests need to be investigates/reviewed, and to be assigned to all shards +func MigrationRequestsPath() string { + return fmt.Sprintf("%s/requests", MigrationBasePath()) +} + +// MigrationQueuedPath is the base path for schema migrations that have been reviewed and +// queued for execution. Kept for historical reference +func MigrationQueuedPath() string { + return fmt.Sprintf("%s/queued", MigrationBasePath()) +} + +// MigrationJobsKeyspacePath is the base path for a tablet job, by keyspace +func MigrationJobsKeyspacePath(keyspace string) string { + return fmt.Sprintf("%s/jobs/%s", MigrationBasePath(), keyspace) +} + +// MigrationJobsKeyspaceShardPath is the base path for a tablet job, by keyspace and shard +func MigrationJobsKeyspaceShardPath(keyspace, shard string) string { + return fmt.Sprintf("%s/%s", MigrationJobsKeyspacePath(keyspace), shard) +} + +// OnlineDDLStatus is an indicator to a online DDL status +type OnlineDDLStatus string + +const ( + OnlineDDLStatusRequested OnlineDDLStatus = "requested" + OnlineDDLStatusCancelled OnlineDDLStatus = "cancelled" + OnlineDDLStatusQueued OnlineDDLStatus = "queued" + OnlineDDLStatusReady OnlineDDLStatus = "ready" + OnlineDDLStatusRunning OnlineDDLStatus = "running" + OnlineDDLStatusComplete OnlineDDLStatus = "complete" + OnlineDDLStatusFailed OnlineDDLStatus = "failed" +) + +// OnlineDDL encapsulates the relevant information in an online schema change request +type OnlineDDL struct { + Keyspace string `json:"keyspace,omitempty"` + Table string `json:"table,omitempty"` + SQL string `json:"sql,omitempty"` + UUID string `json:"uuid,omitempty"` + Online bool `json:"online,omitempty"` + RequestTime int64 `json:"time_created,omitempty"` + Status OnlineDDLStatus `json:"status,omitempty"` +} + +// FromJSON creates an OnlineDDL from json +func FromJSON(bytes []byte) (*OnlineDDL, error) { + onlineDDL := &OnlineDDL{} + err := json.Unmarshal(bytes, onlineDDL) + return onlineDDL, err +} + +// ReadTopo reads a OnlineDDL object from given topo connection +func ReadTopo(ctx context.Context, conn topo.Conn, entryPath string) (*OnlineDDL, error) { + bytes, _, err := conn.Get(ctx, entryPath) + if err != nil { + return nil, fmt.Errorf("ReadTopo Get %s error: %s", entryPath, err.Error()) + } + onlineDDL, err := FromJSON(bytes) + if err != nil { + return nil, fmt.Errorf("ReadTopo unmarshal %s error: %s", entryPath, err.Error()) + } + return onlineDDL, nil +} + +// NewOnlineDDL creates a schema change request with self generated UUID and RequestTime +func NewOnlineDDL(keyspace string, table string, sql string) (*OnlineDDL, error) { + uuid, err := CreateUUID() + if err != nil { + return nil, err + } + return &OnlineDDL{ + Keyspace: keyspace, + Table: table, + SQL: sql, + UUID: uuid, + Online: true, + RequestTime: time.Now().UnixNano(), + Status: OnlineDDLStatusRequested, + }, nil +} + +// JobsKeyspaceShardPath returns job/// +func (onlineDDL *OnlineDDL) JobsKeyspaceShardPath(shard string) string { + return MigrationJobsKeyspaceShardPath(onlineDDL.Keyspace, shard) +} + +// ToJSON exports this onlineDDL to JSON +func (onlineDDL *OnlineDDL) ToJSON() ([]byte, error) { + return json.Marshal(onlineDDL) +} + +// WriteTopo writes this online DDL to given topo connection, based on basePath and and this DDL's UUID +func (onlineDDL *OnlineDDL) WriteTopo(ctx context.Context, conn topo.Conn, basePath string) error { + if onlineDDL.UUID == "" { + return fmt.Errorf("onlineDDL UUID not found; keyspace=%s, sql=%s", onlineDDL.Keyspace, onlineDDL.SQL) + } + bytes, err := onlineDDL.ToJSON() + if err != nil { + return fmt.Errorf("onlineDDL marshall error:%s, keyspace=%s, sql=%s", err.Error(), onlineDDL.Keyspace, onlineDDL.SQL) + } + _, err = conn.Create(ctx, fmt.Sprintf("%s/%s", basePath, onlineDDL.UUID), bytes) + if err != nil { + return fmt.Errorf("onlineDDL topo create error:%s, keyspace=%s, sql=%s", err.Error(), onlineDDL.Keyspace, onlineDDL.SQL) + } + return nil +} + +// CreateUUID creates a globally unique ID, returned as string +func CreateUUID() (string, error) { + u, err := uuid.NewUUID() + if err != nil { + return "", err + } + return u.String(), nil +} diff --git a/go/vt/schemamanager/tablet_executor.go b/go/vt/schemamanager/tablet_executor.go index 670b5c51b45..9d7332ec0bb 100644 --- a/go/vt/schemamanager/tablet_executor.go +++ b/go/vt/schemamanager/tablet_executor.go @@ -24,7 +24,9 @@ import ( "golang.org/x/net/context" "vitess.io/vitess/go/sync2" + "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/wrangler" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -36,6 +38,7 @@ type TabletExecutor struct { tablets []*topodatapb.Tablet isClosed bool allowBigSchemaChange bool + onlineSchemaChange bool keyspace string waitReplicasTimeout time.Duration } @@ -46,6 +49,7 @@ func NewTabletExecutor(wr *wrangler.Wrangler, waitReplicasTimeout time.Duration) wr: wr, isClosed: true, allowBigSchemaChange: false, + onlineSchemaChange: false, waitReplicasTimeout: waitReplicasTimeout, } } @@ -62,6 +66,11 @@ func (exec *TabletExecutor) DisallowBigSchemaChange() { exec.allowBigSchemaChange = false } +// SetOnlineSchemaChange sets TabletExecutor such that it initiates online schema change migrations +func (exec *TabletExecutor) SetOnlineSchemaChange() { + exec.onlineSchemaChange = true +} + // Open opens a connection to the master for every shard. func (exec *TabletExecutor) Open(ctx context.Context, keyspace string) error { if !exec.isClosed { @@ -160,6 +169,11 @@ func (exec *TabletExecutor) detectBigSchemaChanges(ctx context.Context, parsedDD switch ddl.Action { case sqlparser.DropStr, sqlparser.CreateStr, sqlparser.TruncateStr, sqlparser.RenameStr: continue + case sqlparser.AlterStr: + if exec.onlineSchemaChange { + // Seeing that we intend to run an online-schema-change, we can skip the "big change" check. + continue + } } tableName := ddl.Table.Name.String() if rowCount, ok := tableWithCount[tableName]; ok { @@ -217,7 +231,21 @@ func (exec *TabletExecutor) Execute(ctx context.Context, sqls []string) *Execute for index, sql := range sqls { execResult.CurSQLIndex = index - exec.executeOnAllTablets(ctx, &execResult, sql) + + stat, err := sqlparser.Parse(sql) + if err != nil { + execResult.ExecutorErr = err.Error() + return &execResult + } + executeOnlineSchemaChange := false + switch ddl := stat.(type) { + case *sqlparser.DDL: + if ddl.Action == sqlparser.AlterStr && exec.onlineSchemaChange { + executeOnlineSchemaChange = true + } + } + exec.wr.Logger().Infof("Received DDL request. online schema change = %t", executeOnlineSchemaChange) + exec.executeOnAllTablets(ctx, &execResult, sql, executeOnlineSchemaChange) if len(execResult.FailedShards) > 0 { break } @@ -225,7 +253,28 @@ func (exec *TabletExecutor) Execute(ctx context.Context, sqls []string) *Execute return &execResult } -func (exec *TabletExecutor) executeOnAllTablets(ctx context.Context, execResult *ExecuteResult, sql string) { +func (exec *TabletExecutor) executeOnAllTablets( + ctx context.Context, execResult *ExecuteResult, sql string, + executeOnlineSchemaChange bool, +) { + if executeOnlineSchemaChange { + change, err := schema.NewOnlineDDL(exec.keyspace, "", sql) + if err != nil { + execResult.ExecutorErr = err.Error() + return + } + conn, err := exec.wr.TopoServer().ConnForCell(ctx, topo.GlobalCell) + if err != nil { + execResult.ExecutorErr = fmt.Sprintf("online-schema-change ConnForCell error:%s", err.Error()) + return + } + err = change.WriteTopo(ctx, conn, schema.MigrationRequestsPath()) + if err != nil { + execResult.ExecutorErr = err.Error() + } + return + } + var wg sync.WaitGroup numOfMasterTablets := len(exec.tablets) wg.Add(numOfMasterTablets) diff --git a/go/vt/sqlparser/analyzer.go b/go/vt/sqlparser/analyzer.go index 03b7d196b1b..081bc2e5655 100644 --- a/go/vt/sqlparser/analyzer.go +++ b/go/vt/sqlparser/analyzer.go @@ -259,6 +259,17 @@ func IsVschemaDDL(ddl *DDL) bool { return false } +//IsOnlineSchemaDDL returns true if the query is an online schema change DDL +func IsOnlineSchemaDDL(ddl *DDL, sql string) bool { + switch ddl.Action { + case AlterStr: + // TODO(shlomi): rmeove hard coded 'true', or consider how to hint to vtgate that ALTER TABLE is "online" + return true + // return strings.Contains(sql, VitessOnlineDDLHint) + } + return false +} + // SplitAndExpression breaks up the Expr into AND-separated conditions // and appends them to filters. Outer parenthesis are removed. Precedence // should be taken into account if expressions are recombined. diff --git a/go/vt/sqlparser/constants.go b/go/vt/sqlparser/constants.go index 8f36732aa06..47ec7feb8b2 100644 --- a/go/vt/sqlparser/constants.go +++ b/go/vt/sqlparser/constants.go @@ -172,4 +172,9 @@ const ( VitessStr = "vitess" TraditionalStr = "traditional" AnalyzeStr = "analyze" + + // Vitess magic hints + // TODO(shlomi) this is a temporary hack; we should be able to parse hints in a formal way, + // but at this time I'm not keen to tamper with the parser + VitessOnlineDDLHint = "vt:online-schema-change" ) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 0be416ff661..2a2d8fe5556 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -403,8 +403,8 @@ var commands = []commandGroup{ "[-exclude_tables=''] [-include-views] [-skip-no-master] ", "Validates that the master schema from shard 0 matches the schema on all of the other tablets in the keyspace."}, {"ApplySchema", commandApplySchema, - "[-allow_long_unavailability] [-wait_replicas_timeout=10s] {-sql= || -sql-file=} ", - "Applies the schema change to the specified keyspace on every master, running in parallel on all shards. The changes are then propagated to replicas via replication. If -allow_long_unavailability is set, schema changes affecting a large number of rows (and possibly incurring a longer period of unavailability) will not be rejected."}, + "[-allow_long_unavailability] [-online_schema_change] [-wait_replicas_timeout=10s] {-sql= || -sql-file=} ", + "Applies the schema change to the specified keyspace on every master, running in parallel on all shards. The changes are then propagated to replicas via replication. If -allow_long_unavailability is set, schema changes affecting a large number of rows (and possibly incurring a longer period of unavailability) will not be rejected. If -online_schema_change is set, vitess will run an online, nonblocking migration"}, {"CopySchemaShard", commandCopySchemaShard, "[-tables=,,...] [-exclude_tables=,,...] [-include-views] [-skip-verify] [-wait_replicas_timeout=10s] { || } ", "Copies the schema from a source shard's master (or a specific tablet) to a destination shard. The schema is applied directly on the master of the destination shard, and it is propagated to the replicas through binlogs."}, @@ -2394,6 +2394,7 @@ func commandValidateSchemaKeyspace(ctx context.Context, wr *wrangler.Wrangler, s func commandApplySchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { allowLongUnavailability := subFlags.Bool("allow_long_unavailability", false, "Allow large schema changes which incur a longer unavailability of the database.") + onlineSchemaChange := subFlags.Bool("online_schema_change", false, "Run an online, nonblocking migration") sql := subFlags.String("sql", "", "A list of semicolon-delimited SQL commands") sqlFile := subFlags.String("sql-file", "", "Identifies the file that contains the SQL commands") // for backwards compatibility @@ -2419,6 +2420,9 @@ func commandApplySchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *fl if *allowLongUnavailability { executor.AllowBigSchemaChange() } + if *onlineSchemaChange { + executor.SetOnlineSchemaChange() + } return schemamanager.Run( ctx, schemamanager.NewPlainController(change, keyspace), diff --git a/go/vt/vtctld/api.go b/go/vt/vtctld/api.go index ea0c31015dd..f9160677b30 100644 --- a/go/vt/vtctld/api.go +++ b/go/vt/vtctld/api.go @@ -571,6 +571,7 @@ func initAPI(ctx context.Context, ts *topo.Server, actions *ActionRepository, re req := struct { Keyspace, SQL string ReplicaTimeoutSeconds int + OnlineSchemaChange bool }{} if err := unmarshalRequest(r, &req); err != nil { return fmt.Errorf("can't unmarshal request: %v", err) @@ -585,7 +586,12 @@ func initAPI(ctx context.Context, ts *topo.Server, actions *ActionRepository, re wr := wrangler.New(logger, ts, tmClient) executor := schemamanager.NewTabletExecutor( - wr, time.Duration(req.ReplicaTimeoutSeconds)*time.Second) + wr, time.Duration(req.ReplicaTimeoutSeconds)*time.Second, + ) + + if req.OnlineSchemaChange { + executor.SetOnlineSchemaChange() + } return schemamanager.Run(ctx, schemamanager.NewUIController(req.SQL, req.Keyspace, w), executor) diff --git a/go/vt/vtctld/schema.go b/go/vt/vtctld/schema.go new file mode 100644 index 00000000000..cc1418bd3be --- /dev/null +++ b/go/vt/vtctld/schema.go @@ -0,0 +1,119 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtctld + +import ( + "fmt" + "time" + + "golang.org/x/net/context" + + "vitess.io/vitess/go/timer" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/schema" + "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/topo" +) + +var ( + migrationCheckInterval = time.Second * 10 +) + +var migrationCheckTicks *timer.Timer + +func initSchemaManager(ts *topo.Server) { + migrationCheckTicks = timer.NewTimer(migrationCheckInterval) + + runMigrationRequestChecks(ts) +} + +func runMigrationRequestChecks(ts *topo.Server) { + ctx, cancel := context.WithCancel(context.Background()) + migrationCheckTicks.Start(func() { onMigrationCheckTick(ctx, ts) }) + + go func() { + <-ctx.Done() + migrationCheckTicks.Stop() + }() + + // Running cancel on OnTermSync will cancel the context of any + // running workflow inside vtctld. They may still checkpoint + // if they want to. + servenv.OnTermSync(cancel) +} + +func reviewMigrationRequest(ctx context.Context, ts *topo.Server, conn topo.Conn, uuid string) error { + entryPath := fmt.Sprintf("%s/%s", schema.MigrationRequestsPath(), uuid) + onlineDDL, err := schema.ReadTopo(ctx, conn, entryPath) + if err != nil { + return err + } + log.Infof("Found schema migration request: %+v", onlineDDL) + + onlineDDL.Status = schema.OnlineDDLStatusQueued + + shardNames, err := ts.GetShardNames(ctx, onlineDDL.Keyspace) + if err != nil { + return fmt.Errorf("unable to get shard names for keyspace: %s, error: %v", onlineDDL.Keyspace, err) + } + for _, shardName := range shardNames { + if err := onlineDDL.WriteTopo(ctx, conn, onlineDDL.JobsKeyspaceShardPath(shardName)); err != nil { + return fmt.Errorf("unable to write shard job %+v error: %s", onlineDDL, err.Error()) + } + } + + if err := onlineDDL.WriteTopo(ctx, conn, schema.MigrationQueuedPath()); err != nil { + return fmt.Errorf("unable to write reviewed migration: %+v, error: %s", onlineDDL, err) + } + if err := conn.Delete(ctx, entryPath, nil); err != nil { + return fmt.Errorf("unable to delete %+v, error: %s", entryPath, err) + } + return nil +} + +func reviewMigrationRequests(ctx context.Context, ts *topo.Server, conn topo.Conn) error { + entries, err := conn.ListDir(ctx, schema.MigrationRequestsPath(), true) + if err != nil { + log.Errorf("vtctld.reviewMigrationRequests listDir error: %s", err.Error()) + return err + } + + for _, entry := range entries { + if err := reviewMigrationRequest(ctx, ts, conn, entry.Name); err != nil { + log.Errorf("vtctld.reviewMigrationRequest %s error: %s", entry.Name, err.Error()) + continue + } + } + return nil +} + +func onMigrationCheckTick(ctx context.Context, ts *topo.Server) { + conn, err := ts.ConnForCell(ctx, topo.GlobalCell) + if err != nil { + log.Errorf("Executor.checkMigrations ConnForCell error: %s", err.Error()) + return + } + + lockDescriptor, err := conn.Lock(ctx, schema.MigrationRequestsPath(), "cvtctld.checkMigrationRequests") + if err != nil { + log.Errorf("Executor.checkMigrations ConnForCell error: %s", err.Error()) + return + } + defer lockDescriptor.Unlock(ctx) + + reviewMigrationRequests(ctx, ts, conn) +} diff --git a/go/vt/vtctld/vtctld.go b/go/vt/vtctld/vtctld.go index 7dde2434273..b0ebe52bd18 100644 --- a/go/vt/vtctld/vtctld.go +++ b/go/vt/vtctld/vtctld.go @@ -179,6 +179,9 @@ func InitVtctld(ts *topo.Server) { // Init workflow manager. initWorkflowManager(ts) + // Init online DDL schema manager + initSchemaManager(ts) + // Setup reverse proxy for all vttablets through /vttablet/. initVTTabletRedirection(ts) } diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index 9c886354fb8..0b9712a1e8b 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -37,6 +37,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/key" + "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/srvtopo" querypb "vitess.io/vitess/go/vt/proto/query" @@ -140,6 +141,10 @@ func (t noopVCursor) ResolveDestinations(keyspace string, ids []*querypb.Value, panic("unimplemented") } +func (t noopVCursor) SubmitOnlineDDL(change *schema.OnlineDDL) error { + panic("unimplemented") +} + var _ VCursor = (*loggingVCursor)(nil) var _ SessionActions = (*loggingVCursor)(nil) @@ -248,6 +253,10 @@ func (f *loggingVCursor) AutocommitApproval() bool { return true } +func (f *loggingVCursor) SubmitOnlineDDL(change *schema.OnlineDDL) error { + return nil +} + func (f *loggingVCursor) ExecuteStandalone(query string, bindvars map[string]*querypb.BindVariable, rs *srvtopo.ResolvedShard) (*sqltypes.Result, error) { f.log = append(f.log, fmt.Sprintf("ExecuteStandalone %s %v %s %s", query, printBindVars(bindvars), rs.Target.Keyspace, rs.Target.Shard)) return f.nextResult() diff --git a/go/vt/vtgate/engine/online_ddl.go b/go/vt/vtgate/engine/online_ddl.go new file mode 100644 index 00000000000..bbb4ce2bf6f --- /dev/null +++ b/go/vt/vtgate/engine/online_ddl.go @@ -0,0 +1,104 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/proto/query" + querypb "vitess.io/vitess/go/vt/proto/query" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/schema" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/vindexes" +) + +var _ Primitive = (*OnlineDDL)(nil) + +//OnlineDDL represents the instructions to perform an onlins chema change via vtctld +type OnlineDDL struct { + Keyspace *vindexes.Keyspace + DDL *sqlparser.DDL + SQL string + + noTxNeeded + + noInputs +} + +func (v *OnlineDDL) description() PrimitiveDescription { + return PrimitiveDescription{ + OperatorType: "OnlineDDL", + Keyspace: v.Keyspace, + Other: map[string]interface{}{ + "query": sqlparser.String(v.DDL), + }, + } +} + +//RouteType implements the Primitive interface +func (v *OnlineDDL) RouteType() string { + return "OnlineDDL" +} + +//GetKeyspaceName implements the Primitive interface +func (v *OnlineDDL) GetKeyspaceName() string { + return v.Keyspace.Name +} + +//GetTableName implements the Primitive interface +func (v *OnlineDDL) GetTableName() string { + return v.DDL.Table.Name.String() +} + +//Execute implements the Primitive interface +func (v *OnlineDDL) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantfields bool) (result *sqltypes.Result, err error) { + onlineDDL, err := schema.NewOnlineDDL(v.GetKeyspaceName(), v.GetTableName(), v.SQL) + if err != nil { + return result, err + } + err = vcursor.SubmitOnlineDDL(onlineDDL) + if err != nil { + return result, err + } + + result = &sqltypes.Result{ + Fields: []*querypb.Field{ + { + Name: "uuid", + Type: sqltypes.VarChar, + }, + }, + Rows: [][]sqltypes.Value{ + { + sqltypes.NewVarBinary(onlineDDL.UUID), + }, + }, + RowsAffected: 1, + } + return result, err +} + +//StreamExecute implements the Primitive interface +func (v *OnlineDDL) StreamExecute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantields bool, callback func(*sqltypes.Result) error) error { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "not reachable") // TODO: shlomi - have no idea if this should work +} + +//GetFields implements the Primitive interface +func (v *OnlineDDL) GetFields(vcursor VCursor, bindVars map[string]*query.BindVariable) (*sqltypes.Result, error) { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "not reachable") // TODO: shlomi - have no idea if this should work +} diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index db31ea39b12..8d78d4f9cf6 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -29,6 +29,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/key" + "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/srvtopo" querypb "vitess.io/vitess/go/vt/proto/query" @@ -83,6 +84,8 @@ type ( ExecuteVSchema(keyspace string, vschemaDDL *sqlparser.DDL) error + SubmitOnlineDDL(change *schema.OnlineDDL) error + Session() SessionActions } diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 780414b0c43..153355a1899 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -1088,7 +1088,7 @@ func (e *Executor) StreamExecute(ctx context.Context, method string, safeSession bindVars = make(map[string]*querypb.BindVariable) } query, comments := sqlparser.SplitMarginComments(sql) - vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver) + vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv) vcursor.SetIgnoreMaxMemoryRows(true) switch stmtType { @@ -1577,7 +1577,7 @@ func (e *Executor) prepare(ctx context.Context, safeSession *SafeSession, sql st func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *LogStats) ([]*querypb.Field, error) { // V3 mode. query, comments := sqlparser.SplitMarginComments(sql) - vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver) + vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv) plan, err := e.getPlan( vcursor, query, diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index 7deb4e1534f..3f47e6f3a56 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -1469,8 +1469,8 @@ func TestVSchemaStats(t *testing.T) { func TestGetPlanUnnormalized(t *testing.T) { r, _, _, _ := createLegacyExecutorEnv() - emptyvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) - unshardedvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) + emptyvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil) + unshardedvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil) logStats1 := NewLogStats(ctx, "Test", "", nil) query1 := "select * from music_user_map where id = 1" @@ -1528,7 +1528,7 @@ func TestGetPlanUnnormalized(t *testing.T) { func TestGetPlanCacheUnnormalized(t *testing.T) { r, _, _, _ := createLegacyExecutorEnv() - emptyvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) + emptyvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil) query1 := "select * from music_user_map where id = 1" logStats1 := NewLogStats(ctx, "Test", "", nil) _, err := r.getPlan(emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */, logStats1) @@ -1553,7 +1553,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { // Skip cache using directive r, _, _, _ = createLegacyExecutorEnv() - unshardedvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) + unshardedvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil) query1 = "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)" logStats1 = NewLogStats(ctx, "Test", "", nil) @@ -1571,7 +1571,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { } // the target string will be resolved and become part of the plan cache key, which adds a new entry - ksIDVc1, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) + ksIDVc1, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil) _, err = r.getPlan(ksIDVc1, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false, logStats1) require.NoError(t, err) if len(r.plans.Keys()) != 2 { @@ -1579,7 +1579,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { } // the target string will be resolved and become part of the plan cache key, as it's an unsharded ks, it will be the same entry as above - ksIDVc2, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) + ksIDVc2, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil) _, err = r.getPlan(ksIDVc2, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false, logStats1) require.NoError(t, err) if len(r.plans.Keys()) != 2 { @@ -1590,7 +1590,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { func TestGetPlanCacheNormalized(t *testing.T) { r, _, _, _ := createLegacyExecutorEnv() r.normalize = true - emptyvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) + emptyvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil) query1 := "select * from music_user_map where id = 1" logStats1 := NewLogStats(ctx, "Test", "", nil) _, err := r.getPlan(emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */, logStats1) @@ -1615,7 +1615,7 @@ func TestGetPlanCacheNormalized(t *testing.T) { // Skip cache using directive r, _, _, _ = createLegacyExecutorEnv() r.normalize = true - unshardedvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) + unshardedvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil) query1 = "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)" logStats1 = NewLogStats(ctx, "Test", "", nil) @@ -1633,7 +1633,7 @@ func TestGetPlanCacheNormalized(t *testing.T) { } // the target string will be resolved and become part of the plan cache key, which adds a new entry - ksIDVc1, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) + ksIDVc1, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil) _, err = r.getPlan(ksIDVc1, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false, logStats1) require.NoError(t, err) if len(r.plans.Keys()) != 2 { @@ -1641,7 +1641,7 @@ func TestGetPlanCacheNormalized(t *testing.T) { } // the target string will be resolved and become part of the plan cache key, as it's an unsharded ks, it will be the same entry as above - ksIDVc2, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) + ksIDVc2, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil) _, err = r.getPlan(ksIDVc2, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false, logStats1) require.NoError(t, err) if len(r.plans.Keys()) != 2 { @@ -1652,8 +1652,8 @@ func TestGetPlanCacheNormalized(t *testing.T) { func TestGetPlanNormalized(t *testing.T) { r, _, _, _ := createLegacyExecutorEnv() r.normalize = true - emptyvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) - unshardedvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) + emptyvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil) + unshardedvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil) query1 := "select * from music_user_map where id = 1" query2 := "select * from music_user_map where id = 2" diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index f2180a68980..caf56d8d1d1 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -45,7 +45,7 @@ func (e *Executor) newExecute(ctx context.Context, safeSession *SafeSession, sql } query, comments := sqlparser.SplitMarginComments(sql) - vcursor, err := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver) + vcursor, err := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv) if err != nil { return 0, nil, err } diff --git a/go/vt/vtgate/planbuilder/builder.go b/go/vt/vtgate/planbuilder/builder.go index 6b73b22be82..1c102321e6c 100644 --- a/go/vt/vtgate/planbuilder/builder.go +++ b/go/vt/vtgate/planbuilder/builder.go @@ -317,6 +317,9 @@ func createInstructionFor(query string, stmt sqlparser.Statement, vschema Contex if sqlparser.IsVschemaDDL(stmt) { return buildVSchemaDDLPlan(stmt, vschema) } + if sqlparser.IsOnlineSchemaDDL(stmt, query) { + return buildOnlineDDLPlan(query, stmt, vschema) + } return buildDDLPlan(query, stmt, vschema) case *sqlparser.Use: return buildUsePlan(stmt, vschema) diff --git a/go/vt/vtgate/planbuilder/ddl.go b/go/vt/vtgate/planbuilder/ddl.go index b701efb25c4..9392c4c8099 100644 --- a/go/vt/vtgate/planbuilder/ddl.go +++ b/go/vt/vtgate/planbuilder/ddl.go @@ -27,6 +27,18 @@ func buildDDLPlan(sql string, in sqlparser.Statement, vschema ContextVSchema) (e }, nil } +func buildOnlineDDLPlan(query string, stmt *sqlparser.DDL, vschema ContextVSchema) (engine.Primitive, error) { + _, keyspace, _, err := vschema.TargetDestination(stmt.Table.Qualifier.String()) + if err != nil { + return nil, err + } + return &engine.OnlineDDL{ + Keyspace: keyspace, + DDL: stmt, + SQL: query, + }, nil +} + func buildVSchemaDDLPlan(stmt *sqlparser.DDL, vschema ContextVSchema) (engine.Primitive, error) { _, keyspace, _, err := vschema.TargetDestination(stmt.Table.Qualifier.String()) if err != nil { diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index c90b3b913dd..340101e5fe6 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -29,6 +29,7 @@ import ( "vitess.io/vitess/go/vt/callerid" vschemapb "vitess.io/vitess/go/vt/proto/vschema" + "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtgate/vschemaacl" @@ -40,6 +41,7 @@ import ( "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/srvtopo" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" "vitess.io/vitess/go/vt/vtgate/vindexes" @@ -83,6 +85,7 @@ type vcursorImpl struct { marginComments sqlparser.MarginComments executor iExecute resolver *srvtopo.Resolver + topoServer *topo.Server logStats *LogStats // rollbackOnPartialExec is set to true if any DML was successfully // executed. If there was a subsequent failure, the transaction @@ -134,7 +137,7 @@ func (vc *vcursorImpl) ExecuteVSchema(keyspace string, vschemaDDL *sqlparser.DDL // the query and supply it here. Trailing comments are typically sent by the application for various reasons, // including as identifying markers. So, they have to be added back to all queries that are executed // on behalf of the original query. -func newVCursorImpl(ctx context.Context, safeSession *SafeSession, marginComments sqlparser.MarginComments, executor *Executor, logStats *LogStats, vm VSchemaOperator, vschema *vindexes.VSchema, resolver *srvtopo.Resolver) (*vcursorImpl, error) { +func newVCursorImpl(ctx context.Context, safeSession *SafeSession, marginComments sqlparser.MarginComments, executor *Executor, logStats *LogStats, vm VSchemaOperator, vschema *vindexes.VSchema, resolver *srvtopo.Resolver, serv srvtopo.Server) (*vcursorImpl, error) { keyspace, tabletType, destination, err := parseDestinationTarget(safeSession.TargetString, vschema) if err != nil { return nil, err @@ -144,6 +147,13 @@ func newVCursorImpl(ctx context.Context, safeSession *SafeSession, marginComment if UsingLegacyGateway() && safeSession.InTransaction() && tabletType != topodatapb.TabletType_MASTER { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "newVCursorImpl: transactions are supported only for master tablet types, current type: %v", tabletType) } + var ts *topo.Server + if serv != nil { + ts, err = serv.GetTopoServer() + if err != nil { + return nil, err + } + } return &vcursorImpl{ ctx: ctx, @@ -157,6 +167,7 @@ func newVCursorImpl(ctx context.Context, safeSession *SafeSession, marginComment resolver: resolver, vschema: vschema, vm: vm, + topoServer: ts, }, nil } @@ -437,6 +448,16 @@ func (vc *vcursorImpl) TabletType() topodatapb.TabletType { return vc.tabletType } +// SubmitOnlineDDL implements the VCursor interface +func (vc *vcursorImpl) SubmitOnlineDDL(change *schema.OnlineDDL) error { + conn, err := vc.topoServer.ConnForCell(vc.ctx, topo.GlobalCell) + if err != nil { + return err + } + // Submit an online schema change by writing a migration request in topo + return change.WriteTopo(vc.ctx, conn, schema.MigrationRequestsPath()) +} + func commentedShardQueries(shardQueries []*querypb.BoundQuery, marginComments sqlparser.MarginComments) []*querypb.BoundQuery { if marginComments.Leading == "" && marginComments.Trailing == "" { return shardQueries diff --git a/go/vt/vtgate/vcursor_impl_test.go b/go/vt/vtgate/vcursor_impl_test.go index dd956f6b517..628600f34f7 100644 --- a/go/vt/vtgate/vcursor_impl_test.go +++ b/go/vt/vtgate/vcursor_impl_test.go @@ -181,7 +181,7 @@ func TestDestinationKeyspace(t *testing.T) { }} for _, tc := range tests { - impl, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: tc.targetString}), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil) + impl, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: tc.targetString}), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil) impl.vschema = tc.vschema dest, keyspace, tabletType, err := impl.TargetDestination(tc.qualifier) if tc.expectedError == "" { @@ -238,7 +238,7 @@ func TestSetTarget(t *testing.T) { for i, tc := range tests { t.Run(string(i)+"#"+tc.targetString, func(t *testing.T) { - vc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{InTransaction: true}), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil) + vc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{InTransaction: true}), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil) vc.vschema = tc.vschema err := vc.SetTarget(tc.targetString) if tc.expectedError == "" { @@ -280,7 +280,7 @@ func TestPlanPrefixKey(t *testing.T) { t.Run(string(i)+"#"+tc.targetString, func(t *testing.T) { ss := NewSafeSession(&vtgatepb.Session{InTransaction: false}) ss.SetTargetString(tc.targetString) - vc, err := newVCursorImpl(context.Background(), ss, sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, srvtopo.NewResolver(&fakeTopoServer{}, nil, "")) + vc, err := newVCursorImpl(context.Background(), ss, sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil) require.NoError(t, err) vc.vschema = tc.vschema require.Equal(t, tc.expectedPlanPrefixKey, vc.planPrefixKey()) @@ -299,7 +299,7 @@ func TestFirstSortedKeyspace(t *testing.T) { ks3Schema.Keyspace.Name: ks3Schema, }} - vc, err := newVCursorImpl(context.Background(), NewSafeSession(nil), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: vschemaWith2KS}, vschemaWith2KS, srvtopo.NewResolver(&fakeTopoServer{}, nil, "")) + vc, err := newVCursorImpl(context.Background(), NewSafeSession(nil), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: vschemaWith2KS}, vschemaWith2KS, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil) require.NoError(t, err) ks, err := vc.FirstSortedKeyspace() require.NoError(t, err) diff --git a/go/vt/vttablet/onlineddl/exec.go b/go/vt/vttablet/onlineddl/exec.go new file mode 100644 index 00000000000..3d673295db9 --- /dev/null +++ b/go/vt/vttablet/onlineddl/exec.go @@ -0,0 +1,90 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package onlineddl + +import ( + "crypto/rand" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "io/ioutil" + "os/exec" + "path/filepath" + + "vitess.io/vitess/go/vt/log" +) + +// execCmd searches the PATH for a command and runs it, logging the output. +// If input is not nil, pipe it to the command's stdin. +func execCmd(name string, args, env []string, dir string, input io.Reader, output io.Writer) (cmd *exec.Cmd, err error) { + cmdPath, err := exec.LookPath(name) + if err != nil { + return cmd, err + } + log.Infof("execCmd: %v %v %v", name, cmdPath, args) + + cmd = exec.Command(cmdPath, args...) + cmd.Env = env + cmd.Dir = dir + if input != nil { + cmd.Stdin = input + } + if output != nil { + cmd.Stdout = output + cmd.Stderr = output + } + err = cmd.Run() + if err != nil { + err = fmt.Errorf("execCmd failed: %v, %v", name, err) + log.Errorf(err.Error()) + } + log.Infof("execCmd success: %v", name) + return cmd, err +} + +// createTempDir creates a temporary directory and returns its name +func createTempDir(hint string) (dirName string, err error) { + if hint != "" { + return ioutil.TempDir("", fmt.Sprintf("gh-ost-%s-*", hint)) + } + return ioutil.TempDir("", "gh-ost-*") +} + +// createTempScript creates an executable file in given directory and with given text as content. +func createTempScript(dirName, fileName, text string) (fullName string, err error) { + fullName = filepath.Join(dirName, fileName) + bytes := []byte(text) + err = ioutil.WriteFile(fullName, bytes, 0755) + return fullName, err +} + +// RandomHash returns a 64 hex character random string +func RandomHash() string { + size := 64 + rb := make([]byte, size) + _, _ = rand.Read(rb) + + hasher := sha256.New() + hasher.Write(rb) + return hex.EncodeToString(hasher.Sum(nil)) +} + +// ShortRandomHash returns a 8 hex character random string +func ShortRandomHash() string { + return RandomHash()[0:8] +} diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go new file mode 100644 index 00000000000..21d1b4c6aff --- /dev/null +++ b/go/vt/vttablet/onlineddl/executor.go @@ -0,0 +1,623 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package onlineddl + +import ( + "context" + "errors" + "fmt" + "os" + "sync" + "sync/atomic" + "time" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/timer" + "vitess.io/vitess/go/vt/dbconnpool" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/schema" + "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" +) + +var ( + // ErrExecutorNotWritableTablet is generated when executor is asked to run gh-ost on a read-only server + ErrExecutorNotWritableTablet = errors.New("Cannot run gh-ost migration on non-writable tablet") + // ErrExecutorMigrationAlreadyRunning is generated when an attempt is made to run an operation that conflicts with a running migration + ErrExecutorMigrationAlreadyRunning = errors.New("Cannot run gh-ost migration since a migration is already running") +) + +const ( + maxPasswordLength = 32 // MySQL's *replication* password may not exceed 32 characters + ghostUser = "gh-ost" + ghostGrant = "'gh-ost'@'127.0.0.1'" +) + +// Executor wraps and manages the execution of a gh-ost migration. +type Executor struct { + env tabletenv.Env + pool *connpool.Pool + tabletTypeFunc func() topodatapb.TabletType + ts *topo.Server + + keyspace string + shard string + dbName string + + initMutex sync.Mutex + migrationMutex sync.Mutex + migrationRunning int64 + + ticks *timer.Timer + isOpen bool +} + +var ( + migrationCheckInterval = time.Second * 10 +) + +// NewExecutor creates a new gh-ost executor. +func NewExecutor(env tabletenv.Env, ts *topo.Server, tabletTypeFunc func() topodatapb.TabletType) *Executor { + return &Executor{ + env: env, + + pool: connpool.NewPool(env, "ExecutorPool", tabletenv.ConnPoolConfig{ + Size: 1, + IdleTimeoutSeconds: env.Config().OltpReadPool.IdleTimeoutSeconds, + }), + tabletTypeFunc: tabletTypeFunc, + ts: ts, + ticks: timer.NewTimer(migrationCheckInterval), + } +} + +func (e *Executor) execQuery(ctx context.Context, query string) (result *sqltypes.Result, err error) { + defer e.env.LogError() + + conn, err := e.pool.Get(ctx) + if err != nil { + return result, err + } + defer conn.Recycle() + return withDDL.Exec(ctx, query, conn.Exec) +} + +func (e *Executor) initSchema(ctx context.Context) error { + _, err := e.execQuery(ctx, sqlValidationQuery) + return err +} + +// InitDBConfig initializes keysapce +func (e *Executor) InitDBConfig(keyspace, shard, dbName string) { + e.keyspace = keyspace + e.shard = shard + e.dbName = dbName +} + +// Open opens database pool and initializes the schema +func (e *Executor) Open() error { + e.initMutex.Lock() + defer e.initMutex.Unlock() + if e.isOpen { + return nil + } + e.pool.Open(e.env.Config().DB.AppWithDB(), e.env.Config().DB.DbaWithDB(), e.env.Config().DB.AppDebugWithDB()) + e.ticks.Start(e.onMigrationCheckTick) + e.isOpen = true + + return nil +} + +// Close frees resources +func (e *Executor) Close() { + e.initMutex.Lock() + defer e.initMutex.Unlock() + if !e.isOpen { + return + } + + e.ticks.Stop() + e.pool.Close() + e.isOpen = false +} + +func (e *Executor) ghostPanicFlagFileName(onlineDDL *schema.OnlineDDL) string { + return fmt.Sprintf("/tmp/ghost.%s.panic.flag", onlineDDL.UUID) +} + +// readMySQLVariables contacts the backend MySQL server to read some of its configuration +func (e *Executor) readMySQLVariables(ctx context.Context) (host string, port int, readOnly bool, err error) { + conn, err := e.pool.Get(ctx) + if err != nil { + return host, port, readOnly, err + } + defer conn.Recycle() + + tm, err := conn.Exec(ctx, "select @@global.hostname as hostname, @@global.port as port, @@global.read_only as read_only from dual", 1, true) + if err != nil { + return host, port, readOnly, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "could not read MySQL variables: %v", err) + } + row := tm.Named().Row() + if row == nil { + return host, port, readOnly, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "unexpected result for MySQL variables: %+v", tm.Rows) + } + host = row["hostname"].ToString() + if p, err := row.ToInt64("port"); err != nil { + return host, port, readOnly, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "could not parse @@global.port %v: %v", tm, err) + } else { + port = int(p) + } + if readOnly, err = row.ToBool("read_only"); err != nil { + return host, port, readOnly, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "could not parse @@global.read_only %v: %v", tm, err) + } + return host, port, readOnly, nil +} + +// createGhostUser creates a gh-ost user account with all neccessary privileges and with a random password +func (e *Executor) createGhostUser(ctx context.Context) (password string, err error) { + conn, err := dbconnpool.NewDBConnection(ctx, e.env.Config().DB.DbaConnector()) + if err != nil { + return password, err + } + defer conn.Close() + + password = RandomHash()[0:maxPasswordLength] + + for _, query := range sqlCreateGhostUser { + parsed := sqlparser.BuildParsedQuery(query, ghostGrant, password) + if _, err := conn.ExecuteFetch(parsed.Query, 0, false); err != nil { + return password, err + } + } + for _, query := range sqlGrantGhostUser { + parsed := sqlparser.BuildParsedQuery(query, ghostGrant) + if _, err := conn.ExecuteFetch(parsed.Query, 0, false); err != nil { + return password, err + } + } + return password, err +} + +// dropGhostUser drops the given gh-ost user account at the end of migration +func (e *Executor) dropGhostUser(ctx context.Context, user string) error { + conn, err := dbconnpool.NewDBConnection(ctx, e.env.Config().DB.DbaConnector()) + if err != nil { + return err + } + defer conn.Close() + + parsed := sqlparser.BuildParsedQuery(sqlDropGhostUser, user) + _, err = conn.ExecuteFetch(parsed.Query, 0, false) + return err +} + +// Execute validates and runs a gh-ost process. +// Validation included testing the backend MySQL server and the gh-ost binray itself +// Execution runs first a dry run, then an actual migration +func (e *Executor) Execute(ctx context.Context, onlineDDL *schema.OnlineDDL) error { + e.migrationMutex.Lock() + defer e.migrationMutex.Unlock() + + if atomic.LoadInt64(&e.migrationRunning) > 0 { + return ErrExecutorMigrationAlreadyRunning + } + + if e.tabletTypeFunc() != topodatapb.TabletType_MASTER { + return ErrExecutorNotWritableTablet + } + mysqlHost, mysqlPort, readOnly, err := e.readMySQLVariables(ctx) + if err != nil { + log.Errorf("Error before running gh-ost: %+v", err) + return err + } + if readOnly { + err := fmt.Errorf("Error before running gh-ost: MySQL server is read_only") + log.Errorf(err.Error()) + return err + } + ghostPassword, err := e.createGhostUser(ctx) + if err != nil { + err := fmt.Errorf("Error creating gh-ost user: %+v", err) + log.Errorf(err.Error()) + return err + } + tempDir, err := createTempDir(onlineDDL.UUID) + if err != nil { + log.Errorf("Error creating temporary directory: %+v", err) + return err + } + credentialsConfigFileContent := fmt.Sprintf(`[client] +user=%s +password=${GH_OST_PASSWORD} +`, ghostUser) + credentialsConfigFileName, err := createTempScript(tempDir, "gh-ost-conf.cfg", credentialsConfigFileContent) + if err != nil { + log.Errorf("Error creating config file: %+v", err) + return err + } + wrapperScriptContent := fmt.Sprintf(`#!/bin/bash +ghost_log_path="%s" +ghost_log_file=gh-ost.log + +mkdir -p "$ghost_log_path" + +export GH_OST_PASSWORD +echo "executing: gh-ost" "$@" > "$ghost_log_path/$ghost_log_file.exec" +gh-ost "$@" > "$ghost_log_path/$ghost_log_file" 2>&1 + `, tempDir, + ) + wrapperScriptFileName, err := createTempScript(tempDir, "gh-ost-wrapper.sh", wrapperScriptContent) + if err != nil { + log.Errorf("Error creating wrapper script: %+v", err) + return err + } + onHookContent := func(status schema.OnlineDDLStatus) string { + return fmt.Sprintf(`#!/bin/bash +curl -s 'http://localhost:%d/schema-migration/report-status?uuid='"$GH_OST_HOOKS_HINT"'&status=%s&dryrun='"$GH_OST_DRY_RUN" + `, *servenv.Port, string(status)) + } + if _, err := createTempScript(tempDir, "gh-ost-on-startup", onHookContent(schema.OnlineDDLStatusRunning)); err != nil { + log.Errorf("Error creating script: %+v", err) + return err + } + if _, err := createTempScript(tempDir, "gh-ost-on-status", onHookContent(schema.OnlineDDLStatusRunning)); err != nil { + log.Errorf("Error creating script: %+v", err) + return err + } + if _, err := createTempScript(tempDir, "gh-ost-on-success", onHookContent(schema.OnlineDDLStatusComplete)); err != nil { + log.Errorf("Error creating script: %+v", err) + return err + } + if _, err := createTempScript(tempDir, "gh-ost-on-failure", onHookContent(schema.OnlineDDLStatusFailed)); err != nil { + log.Errorf("Error creating script: %+v", err) + return err + } + // Validate gh-ost binary: + log.Infof("Will now validate gh-ost binary") + _, err = execCmd( + "bash", + []string{ + wrapperScriptFileName, + "--version", + }, + os.Environ(), + "/tmp", + nil, + nil, + ) + if err != nil { + log.Errorf("Error testing gh-ost binary: %+v", err) + return err + } + log.Infof("+ OK") + + runGhost := func(execute bool) error { + os.Setenv("GH_OST_PASSWORD", ghostPassword) + _, err := execCmd( + "bash", + []string{ + wrapperScriptFileName, + fmt.Sprintf(`--host=%s`, mysqlHost), + fmt.Sprintf(`--port=%d`, mysqlPort), + fmt.Sprintf(`--conf=%s`, credentialsConfigFileName), // user & password found here + `--allow-on-master`, + `--max-load=Threads_running=100`, + `--critical-load=Threads_running=200`, + `--critical-load-hibernate-seconds=60`, + `--approve-renamed-columns`, + `--debug`, + `--exact-rowcount`, + `--timestamp-old-table`, + `--initially-drop-ghost-table`, + `--default-retries=120`, + fmt.Sprintf("--hooks-path=%s", tempDir), + fmt.Sprintf(`--hooks-hint=%s`, onlineDDL.UUID), + fmt.Sprintf(`--database=%s`, e.dbName), + fmt.Sprintf(`--table=%s`, onlineDDL.Table), + fmt.Sprintf(`--alter=%s`, onlineDDL.SQL), + fmt.Sprintf(`--panic-flag-file=%s`, e.ghostPanicFlagFileName(onlineDDL)), + fmt.Sprintf(`--execute=%t`, execute), + }, + os.Environ(), + "/tmp", + nil, + nil, + ) + return err + } + + atomic.StoreInt64(&e.migrationRunning, 1) + go func() error { + defer atomic.StoreInt64(&e.migrationRunning, 0) + defer e.dropGhostUser(ctx, ghostGrant) + + log.Infof("Will now dry-run gh-ost on: %s:%d", mysqlHost, mysqlPort) + if err := runGhost(false); err != nil { + log.Errorf("Error executing gh-ost dry run: %+v", err) + return err + } + log.Infof("+ OK") + + log.Infof("Will now run gh-ost on: %s:%d", mysqlHost, mysqlPort) + startedMigrations.Add(1) + if err := runGhost(true); err != nil { + failedMigrations.Add(1) + log.Errorf("Error running gh-ost: %+v", err) + return err + } + successfulMigrations.Add(1) + log.Infof("+ OK") + return nil + }() + return nil +} + +// Cancel attempts to abort a running migration by touching the panic flag file +func (e *Executor) Cancel(onlineDDL *schema.OnlineDDL) error { + file, err := os.OpenFile(e.ghostPanicFlagFileName(onlineDDL), os.O_RDONLY|os.O_CREATE, 0666) + if file != nil { + defer file.Close() + } + return err +} + +func (e *Executor) writeMigrationJob(ctx context.Context, onlineDDL *schema.OnlineDDL) error { + parsed := sqlparser.BuildParsedQuery(sqlInsertSchemaMigration, "_vt", + ":migration_uuid", + ":keyspace", + ":shard", + ":mysql_table", + ":migration_statement", + ":strategy", + ":migration_status", + ) + bindVars := map[string]*querypb.BindVariable{ + "migration_uuid": sqltypes.StringBindVariable(onlineDDL.UUID), + "keyspace": sqltypes.StringBindVariable(onlineDDL.Keyspace), + "shard": sqltypes.StringBindVariable(e.shard), + "mysql_table": sqltypes.StringBindVariable(onlineDDL.Table), + "migration_statement": sqltypes.StringBindVariable(onlineDDL.SQL), + "strategy": sqltypes.StringBindVariable(""), + "migration_status": sqltypes.StringBindVariable(string(onlineDDL.Status)), + } + + bound, err := parsed.GenerateQuery(bindVars, nil) + if err != nil { + return err + } + _, err = e.execQuery(ctx, bound) + if err != nil { + return err + } + + return nil +} + +// reviewMigrationJobs reads Topo's listing of migrations for this keyspace/shard, +// and persists them in _vt.schema_migrations. Some of those jobs may be new, some +// perhaps already known, it doesn't matter. +func (e *Executor) reviewMigrationJobs(ctx context.Context) error { + if atomic.LoadInt64(&e.migrationRunning) > 0 { + // Just to save some cycles here. If there's a running migration, skip reading global topo: + // even if global topo has new jobs for us, we wouldn't be able to run them, anyway. + return nil + } + + conn, err := e.ts.ConnForCell(ctx, topo.GlobalCell) + if err != nil { + log.Errorf("Executor.reviewMigrationRequests ConnForCell error: %s", err.Error()) + return err + } + + dirPath := schema.MigrationJobsKeyspaceShardPath(e.keyspace, e.shard) + entries, err := conn.ListDir(ctx, dirPath, false) + if err != nil { + log.Errorf("Executor.reviewMigrationRequests listDir error: %s", err.Error()) + return err + } + for _, entry := range entries { + entryPath := fmt.Sprintf("%s/%s", dirPath, entry.Name) + onlineDDL, err := schema.ReadTopo(ctx, conn, entryPath) + if err != nil { + log.Errorf("reviewMigrationRequests.ReadTopo error: %+v", err) + continue + } + if err := e.writeMigrationJob(ctx, onlineDDL); err != nil { + log.Errorf("reviewMigrationRequests.writeMigrationJob error: %+v", err) + continue + } + log.Infof("Found schema migration job: %+v", onlineDDL) + } + return nil +} + +// scheduleNextMigration attemps to schedule a single migration to run next. +// possibly there's no migrations to run. Possibly there's a migration running right now, +// in which cases nothing happens. +func (e *Executor) scheduleNextMigration(ctx context.Context) error { + e.migrationMutex.Lock() + defer e.migrationMutex.Unlock() + + if atomic.LoadInt64(&e.migrationRunning) > 0 { + return ErrExecutorMigrationAlreadyRunning + } + + { + parsed := sqlparser.BuildParsedQuery(sqlSelectCountReadyMigrations, "_vt") + r, err := e.execQuery(ctx, parsed.Query) + if err != nil { + return err + } + row := r.Named().Row() + countReady, err := row.ToInt64("count_ready") + if err != nil { + return err + } + if countReady > 0 { + // seems like there's already one migration that's good to go + return nil + } + } // Cool, seems like no migration is ready. Let's try and make a single 'queued' migration 'ready' + + parsed := sqlparser.BuildParsedQuery(sqlScheduleSingleMigration, "_vt") + _, err := e.execQuery(ctx, parsed.Query) + + return err +} + +func (e *Executor) runNextMigration(ctx context.Context) error { + e.migrationMutex.Lock() + defer e.migrationMutex.Unlock() + + if atomic.LoadInt64(&e.migrationRunning) > 0 { + return ErrExecutorMigrationAlreadyRunning + } + + parsed := sqlparser.BuildParsedQuery(sqlSelectReadyMigration, "_vt") + r, err := e.execQuery(ctx, parsed.Query) + if err != nil { + return err + } + named := r.Named() + for _, row := range named.Rows { + onlineDDL := &schema.OnlineDDL{ + Keyspace: row["keyspace"].ToString(), + Table: row["mysql_table"].ToString(), + SQL: row["migration_statement"].ToString(), + UUID: row["migration_uuid"].ToString(), + Online: true, + Status: schema.OnlineDDLStatus(row["migration_status"].ToString()), + } + go e.Execute(ctx, onlineDDL) + // the query should only ever return a single row at the most + // but let's make it also explicit here that we only run a single migration + return nil + } + + return nil +} + +func (e *Executor) onMigrationCheckTick() { + if e.tabletTypeFunc() != topodatapb.TabletType_MASTER { + return + } + if e.keyspace == "" { + log.Errorf("Executor.onMigrationCheckTick(): empty keyspace") + return + } + ctx := context.Background() + e.initSchema(ctx) + + e.reviewMigrationJobs(ctx) + e.scheduleNextMigration(ctx) + e.runNextMigration(ctx) +} + +func (e *Executor) updateMigrationStartedTimestamp(ctx context.Context, uuid string) error { + parsed := sqlparser.BuildParsedQuery(sqlUpdateMigrationStartedTimestamp, "_vt", + ":migration_uuid", + ) + bindVars := map[string]*querypb.BindVariable{ + "migration_uuid": sqltypes.StringBindVariable(uuid), + } + bound, err := parsed.GenerateQuery(bindVars, nil) + if err != nil { + return err + } + _, err = e.execQuery(ctx, bound) + return err +} + +func (e *Executor) updateMigrationTimestamp(ctx context.Context, timestampColumn string, uuid string) error { + parsed := sqlparser.BuildParsedQuery(sqlUpdateMigrationTimestamp, "_vt", timestampColumn, + ":migration_uuid", + ) + bindVars := map[string]*querypb.BindVariable{ + "migration_uuid": sqltypes.StringBindVariable(uuid), + } + bound, err := parsed.GenerateQuery(bindVars, nil) + if err != nil { + return err + } + _, err = e.execQuery(ctx, bound) + return err +} + +func (e *Executor) updateMigrationStatus(ctx context.Context, uuid string, status schema.OnlineDDLStatus) error { + parsed := sqlparser.BuildParsedQuery(sqlUpdateMigrationStatus, "_vt", + ":migration_status", + ":migration_uuid", + ) + bindVars := map[string]*querypb.BindVariable{ + "migration_status": sqltypes.StringBindVariable(string(status)), + "migration_uuid": sqltypes.StringBindVariable(uuid), + } + bound, err := parsed.GenerateQuery(bindVars, nil) + if err != nil { + return err + } + _, err = e.execQuery(ctx, bound) + return err +} + +// OnSchemaMigrationStatus is called by TabletServer's API, which is invoked by a running gh-ost migration's hooks. +func (e *Executor) OnSchemaMigrationStatus(ctx context.Context, uuidParam, statusParam, dryrunParam string) (err error) { + status := schema.OnlineDDLStatus(statusParam) + dryRun := (dryrunParam == "true") + + if dryRun && status != schema.OnlineDDLStatusFailed { + // We don't consider dry-run reports unless there's a failure + return nil + } + switch status { + case schema.OnlineDDLStatusReady: + { + err = e.updateMigrationTimestamp(ctx, "ready_timestamp", uuidParam) + } + case schema.OnlineDDLStatusRunning: + { + _ = e.updateMigrationStartedTimestamp(ctx, uuidParam) + err = e.updateMigrationTimestamp(ctx, "liveness_timestamp", uuidParam) + } + case schema.OnlineDDLStatusComplete: + { + _ = e.updateMigrationStartedTimestamp(ctx, uuidParam) + err = e.updateMigrationTimestamp(ctx, "completed_timestamp", uuidParam) + } + case schema.OnlineDDLStatusFailed: + { + _ = e.updateMigrationStartedTimestamp(ctx, uuidParam) + err = e.updateMigrationTimestamp(ctx, "completed_timestamp", uuidParam) + } + } + if err != nil { + return err + } + if err = e.updateMigrationStatus(ctx, uuidParam, status); err != nil { + return err + } + + return nil +} diff --git a/go/vt/vttablet/onlineddl/ghost.go b/go/vt/vttablet/onlineddl/ghost.go new file mode 100644 index 00000000000..40234c87f9b --- /dev/null +++ b/go/vt/vttablet/onlineddl/ghost.go @@ -0,0 +1,39 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package heartbeat contains a writer and reader of heartbeats for a master-replica cluster. +// This is similar to Percona's pt-heartbeat, and is meant to supplement the information +// returned from SHOW SLAVE STATUS. In some circumstances, lag returned from SHOW SLAVE STATUS +// is incorrect and is at best only at 1 second resolution. The heartbeat package directly +// tests replication by writing a record with a timestamp on the master, and comparing that +// timestamp after reading it on the replica. This happens at the interval defined by heartbeat_interval. +// Note: the lag reported will be affected by clock drift, so it is recommended to run ntpd or similar. +// +// The data collected by the heartbeat package is made available in /debug/vars in counters prefixed by Heartbeat*. +// It's additionally used as a source for healthchecks and will impact the serving state of a tablet, if enabled. +// The heartbeat interval is purposefully kept distinct from the health check interval because lag measurement +// requires more frequent polling that the healthcheck typically is configured for. +package onlineddl + +import ( + "vitess.io/vitess/go/stats" +) + +var ( + startedMigrations = stats.NewCounter("StartedMigrations", "Count of initiated migrations") + successfulMigrations = stats.NewCounter("SuccessfulMigrations", "Count of successful migrations, a subset of StartedMigrations") + failedMigrations = stats.NewCounter("FailedMigrations", "Count of failed migrations, a subset of StartedMigrations") +) diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go new file mode 100644 index 00000000000..f19ae86466f --- /dev/null +++ b/go/vt/vttablet/onlineddl/schema.go @@ -0,0 +1,125 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package onlineddl + +import ( + "fmt" + + "vitess.io/vitess/go/vt/withddl" +) + +const ( + sqlCreateSidecarDB = "create database if not exists %s" + sqlCreateSchemaMigrationsTable = `CREATE TABLE IF NOT EXISTS %s.schema_migrations ( + id bigint(20) unsigned NOT NULL AUTO_INCREMENT, + migration_uuid varchar(64) NOT NULL, + keyspace varchar(256) NOT NULL, + shard varchar(256) NOT NULL, + mysql_table varchar(128) NOT NULL, + migration_statement text NOT NULL, + strategy varchar(128) NOT NULL, + added_timestamp timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + ready_timestamp timestamp NULL DEFAULT NULL, + started_timestamp timestamp NULL DEFAULT NULL, + liveness_timestamp timestamp NULL DEFAULT NULL, + completed_timestamp timestamp NULL DEFAULT NULL, + migration_status varchar(128) NOT NULL, + PRIMARY KEY (id), + UNIQUE KEY uuid_idx (migration_uuid), + KEY keyspace_shard_idx (keyspace,shard), + KEY status_idx (migration_status, liveness_timestamp) + ) engine=InnoDB DEFAULT CHARSET=utf8mb4` + sqlValidationQuery = `select 1 from schema_migrations limit 1` + sqlInsertSchemaMigration = `INSERT IGNORE INTO %s.schema_migrations ( + migration_uuid, + keyspace, + shard, + mysql_table, + migration_statement, + strategy, + migration_status + ) VALUES ( + %a, %a, %a, %a, %a, %a, %a + )` + sqlScheduleSingleMigration = `UPDATE %s.schema_migrations + SET + migration_status='ready', + ready_timestamp=NOW() + WHERE + migration_status='queued' + ORDER BY + added_timestamp ASC + LIMIT 1 + ` + sqlUpdateMigrationStatus = `UPDATE %s.schema_migrations + SET migration_status=%a + WHERE + migration_uuid=%a + ` + sqlUpdateMigrationStartedTimestamp = `UPDATE %s.schema_migrations + SET started_timestamp=IFNULL(started_timestamp, NOW()) + WHERE + migration_uuid=%a + ` + sqlUpdateMigrationTimestamp = `UPDATE %s.schema_migrations + SET %s=NOW() + WHERE + migration_uuid=%a + ` + sqlSelectCountReadyMigrations = `SELECT + count(*) as count_ready + FROM %s.schema_migrations + WHERE + migration_status='ready' + ` + sqlSelectReadyMigration = `SELECT + id, + migration_uuid, + keyspace, + shard, + mysql_table, + migration_statement, + strategy, + added_timestamp, + ready_timestamp, + started_timestamp, + liveness_timestamp, + completed_timestamp, + migration_status + FROM %s.schema_migrations + WHERE + migration_status='ready' + LIMIT 1 + ` +) + +var ( + sqlCreateGhostUser = []string{ + `CREATE USER IF NOT EXISTS %s IDENTIFIED BY '%s'`, + `ALTER USER %s IDENTIFIED BY '%s'`, + } + sqlGrantGhostUser = []string{ + `GRANT SUPER, REPLICATION SLAVE ON *.* TO %s`, + `GRANT ALTER, CREATE, DELETE, DROP, INDEX, INSERT, LOCK TABLES, SELECT, TRIGGER, UPDATE ON *.* TO %s`, + } + sqlDropGhostUser = `DROP USER IF EXISTS %s` +) + +var withDDL = withddl.New([]string{ + fmt.Sprintf(sqlCreateSidecarDB, "_vt"), + fmt.Sprintf(sqlCreateSchemaMigrationsTable, "_vt"), +}) diff --git a/go/vt/vttablet/tabletserver/controller.go b/go/vt/vttablet/tabletserver/controller.go index 0347924288a..89547e791e2 100644 --- a/go/vt/vttablet/tabletserver/controller.go +++ b/go/vt/vttablet/tabletserver/controller.go @@ -48,7 +48,7 @@ type Controller interface { Stats() *tabletenv.Stats // InitDBConfig sets up the db config vars. - InitDBConfig(querypb.Target, *dbconfigs.DBConfigs, mysqlctl.MysqlDaemon) error + InitDBConfig(target querypb.Target, dbConfigs *dbconfigs.DBConfigs, mysqlDaemon mysqlctl.MysqlDaemon) error // SetServingType transitions the query service to the required serving type. // Returns true if the state of QueryService or the tablet type changed. diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 5459ee8f56f..1c054328956 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -105,6 +105,7 @@ type stateManager struct { txThrottler txThrottler te txEngine messager subComponent + ddle onlineDDLExecutor // hcticks starts on initialiazation and runs forever. hcticks *timer.Timer @@ -155,6 +156,11 @@ type ( Open() error Close() } + + onlineDDLExecutor interface { + Open() error + Close() + } ) // Init performs the second phase of initialization. @@ -465,6 +471,9 @@ func (sm *stateManager) connect(tabletType topodatapb.TabletType) error { if err := sm.qe.Open(); err != nil { return err } + if err := sm.ddle.Open(); err != nil { + return err + } return sm.txThrottler.Open() } @@ -473,6 +482,7 @@ func (sm *stateManager) unserveCommon() { sm.te.Close() sm.qe.StopServing() sm.tracker.Close() + sm.ddle.Close() sm.requests.Wait() } @@ -486,6 +496,7 @@ func (sm *stateManager) closeAll() { sm.vstreamer.Close() sm.rt.Close() sm.se.Close() + sm.ddle.Close() sm.setState(topodatapb.TabletType_UNKNOWN, StateNotConnected) } diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 6ef9abd7358..fefc869ac94 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -51,6 +51,7 @@ import ( "vitess.io/vitess/go/vt/tableacl" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/onlineddl" "vitess.io/vitess/go/vt/vttablet/queryservice" "vitess.io/vitess/go/vt/vttablet/tabletserver/messager" "vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder" @@ -104,7 +105,8 @@ type TabletServer struct { hs *healthStreamer // sm manages state transitions. - sm *stateManager + sm *stateManager + onlineDDLExecutor *onlineddl.Executor // alias is used for identifying this tabletserver in healthcheck responses. alias topodatapb.TabletAlias @@ -154,6 +156,14 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to tsv.txThrottler = txthrottler.NewTxThrottler(tsv.config, topoServer) tsv.te = NewTxEngine(tsv) tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer) + tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, topoServer, + func() topodatapb.TabletType { + if tsv.sm == nil { + return topodatapb.TabletType_UNKNOWN + } + return tsv.sm.Target().TabletType + }, + ) tsv.sm = &stateManager{ hs: tsv.hs, @@ -166,6 +176,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to txThrottler: tsv.txThrottler, te: tsv.te, messager: tsv.messager, + ddle: tsv.onlineDDLExecutor, } tsv.exporter.NewGaugeFunc("TabletState", "Tablet server state", func() int64 { return int64(tsv.sm.State()) }) @@ -183,6 +194,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to tsv.registerQueryzHandler() tsv.registerStreamQueryzHandlers() tsv.registerTwopczHandler() + tsv.registerMigrationStatusHandler() return tsv } @@ -201,6 +213,7 @@ func (tsv *TabletServer) InitDBConfig(target querypb.Target, dbcfgs *dbconfigs.D tsv.txThrottler.InitDBConfig(target) tsv.vstreamer.InitDBConfig(target.Keyspace) tsv.hs.InitDBConfig(target) + tsv.onlineDDLExecutor.InitDBConfig(target.Keyspace, target.Shard, dbcfgs.DBName) return nil } @@ -1455,6 +1468,17 @@ func (tsv *TabletServer) registerTwopczHandler() { }) } +func (tsv *TabletServer) registerMigrationStatusHandler() { + tsv.exporter.HandleFunc("/schema-migration/report-status", func(w http.ResponseWriter, r *http.Request) { + ctx := tabletenv.LocalContext() + if err := tsv.onlineDDLExecutor.OnSchemaMigrationStatus(ctx, r.URL.Query().Get("uuid"), r.URL.Query().Get("status"), r.URL.Query().Get("dryrun")); err != nil { + http.Error(w, fmt.Sprintf("not ok: %v", err), http.StatusInternalServerError) + return + } + w.Write([]byte("ok")) + }) +} + // SetTracking forces tracking to be on or off. // Only to be used for testing. func (tsv *TabletServer) SetTracking(enabled bool) {