Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 66 additions & 6 deletions go/vt/vttablet/tabletmanager/vdiff/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/google/uuid"

"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"

Expand Down Expand Up @@ -155,7 +156,12 @@ func (vde *Engine) handleCreateResumeAction(ctx context.Context, dbClient binlog
var err error
options := req.Options

query := fmt.Sprintf(sqlGetVDiffID, encodeString(req.VdiffUuid))
query, err := sqlparser.ParseAndBind(sqlGetVDiffID,
sqltypes.StringBindVariable(req.VdiffUuid),
)
if err != nil {
return err
}
if qr, err = dbClient.ExecuteFetch(query, 1); err != nil {
return err
}
Expand Down Expand Up @@ -295,20 +301,74 @@ func (vde *Engine) handleStopAction(ctx context.Context, dbClient binlogplayer.D
}

func (vde *Engine) handleDeleteAction(ctx context.Context, dbClient binlogplayer.DBClient, action VDiffAction, req *tabletmanagerdatapb.VDiffRequest, resp *tabletmanagerdatapb.VDiffResponse) error {
var err error
query := ""
vde.mu.Lock()
defer vde.mu.Unlock()
var deleteQuery string
cleanupController := func(controller *controller) {
if controller == nil {
return
}
controller.Stop()
delete(vde.controllers, controller.id)
}

switch req.ActionArg {
case AllActionArg:
query = fmt.Sprintf(sqlDeleteVDiffs, encodeString(req.Keyspace), encodeString(req.Workflow))
// We need to stop any running controllers before we delete
// the vdiff records.
query, err := sqlparser.ParseAndBind(sqlGetVDiffIDsByKeyspaceWorkflow,
sqltypes.StringBindVariable(req.Keyspace),
sqltypes.StringBindVariable(req.Workflow),
)
if err != nil {
return err
}
res, err := dbClient.ExecuteFetch(query, -1)
if err != nil {
return err
}
for _, row := range res.Named().Rows {
cleanupController(vde.controllers[row.AsInt64("id", -1)])
}
deleteQuery, err = sqlparser.ParseAndBind(sqlDeleteVDiffs,
sqltypes.StringBindVariable(req.Keyspace),
sqltypes.StringBindVariable(req.Workflow),
)
if err != nil {
return err
}
default:
uuid, err := uuid.Parse(req.ActionArg)
if err != nil {
return fmt.Errorf("action argument %s not supported", req.ActionArg)
}
query = fmt.Sprintf(sqlDeleteVDiffByUUID, encodeString(uuid.String()))
// We need to be sure that the controller is stopped, if
// it's still running, before we delete the vdiff record.
query, err := sqlparser.ParseAndBind(sqlGetVDiffID,
sqltypes.StringBindVariable(uuid.String()),
)
if err != nil {
return err
}
res, err := dbClient.ExecuteFetch(query, 1)
if err != nil {
return err
}
row := res.Named().Row() // Must only be one
if row == nil {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no vdiff found for UUID %s on tablet %v",
uuid, vde.thisTablet.Alias)
}
cleanupController(vde.controllers[row.AsInt64("id", -1)])
deleteQuery, err = sqlparser.ParseAndBind(sqlDeleteVDiffByUUID,
sqltypes.StringBindVariable(uuid.String()),
)
if err != nil {
return err
}
}
if _, err = dbClient.ExecuteFetch(query, 1); err != nil {
// Execute the query which deletes the vdiff record(s).
if _, err := dbClient.ExecuteFetch(deleteQuery, 1); err != nil {
return err
}

Expand Down
70 changes: 57 additions & 13 deletions go/vt/vttablet/tabletmanager/vdiff/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,18 @@ func TestPerformVDiffAction(t *testing.T) {
keyspace := "ks"
workflow := "wf"
uuid := uuid.New().String()
type queryAndResult struct {
query string
result *sqltypes.Result // Optional if you need a non-empty result
}
tests := []struct {
name string
vde *Engine
req *tabletmanagerdatapb.VDiffRequest
preFunc func() error
postFunc func() error
want *tabletmanagerdatapb.VDiffResponse
expectQueries []string
expectQueries []queryAndResult
wantErr error
}{
{
Expand All @@ -72,9 +76,13 @@ func TestPerformVDiffAction(t *testing.T) {
preFunc: func() error {
return tstenv.TopoServ.CreateCellInfo(ctx, "zone100_test", &topodatapb.CellInfo{})
},
expectQueries: []string{
fmt.Sprintf("select id as id from _vt.vdiff where vdiff_uuid = %s", encodeString(uuid)),
fmt.Sprintf(`insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values('', '', 'pending', '{\"picker_options\":{\"source_cell\":\"cell1,zone100_test\",\"target_cell\":\"cell1,zone100_test\"}}', '0', 'vt_vttest', %s)`, encodeString(uuid)),
expectQueries: []queryAndResult{
{
query: fmt.Sprintf("select id as id from _vt.vdiff where vdiff_uuid = %s", encodeString(uuid)),
},
{
query: fmt.Sprintf(`insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values('', '', 'pending', '{\"picker_options\":{\"source_cell\":\"cell1,zone100_test\",\"target_cell\":\"cell1,zone100_test\"}}', '0', 'vt_vttest', %s)`, encodeString(uuid)),
},
},
postFunc: func() error {
return tstenv.TopoServ.DeleteCellInfo(ctx, "zone100_test", true)
Expand Down Expand Up @@ -102,9 +110,13 @@ func TestPerformVDiffAction(t *testing.T) {
Cells: cells,
})
},
expectQueries: []string{
fmt.Sprintf("select id as id from _vt.vdiff where vdiff_uuid = %s", encodeString(uuid)),
fmt.Sprintf(`insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values('', '', 'pending', '{\"picker_options\":{\"source_cell\":\"all\",\"target_cell\":\"all\"}}', '0', 'vt_vttest', %s)`, encodeString(uuid)),
expectQueries: []queryAndResult{
{
query: fmt.Sprintf("select id as id from _vt.vdiff where vdiff_uuid = %s", encodeString(uuid)),
},
{
query: fmt.Sprintf(`insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values('', '', 'pending', '{\"picker_options\":{\"source_cell\":\"all\",\"target_cell\":\"all\"}}', '0', 'vt_vttest', %s)`, encodeString(uuid)),
},
},
postFunc: func() error {
if err := tstenv.TopoServ.DeleteCellInfo(ctx, "zone100_test", true); err != nil {
Expand All @@ -119,9 +131,21 @@ func TestPerformVDiffAction(t *testing.T) {
Action: string(DeleteAction),
ActionArg: uuid,
},
expectQueries: []string{
fmt.Sprintf(`delete from vd, vdt using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
expectQueries: []queryAndResult{
{
query: fmt.Sprintf("select id as id from _vt.vdiff where vdiff_uuid = %s", encodeString(uuid)),
result: sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id",
"int64",
),
"1",
),
},
{
query: fmt.Sprintf(`delete from vd, vdt using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
where vd.vdiff_uuid = %s`, encodeString(uuid)),
},
},
},
{
Expand All @@ -132,10 +156,23 @@ func TestPerformVDiffAction(t *testing.T) {
Keyspace: keyspace,
Workflow: workflow,
},
expectQueries: []string{
fmt.Sprintf(`delete from vd, vdt, vdl using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
expectQueries: []queryAndResult{
{
query: fmt.Sprintf("select id as id from _vt.vdiff where keyspace = %s and workflow = %s", encodeString(keyspace), encodeString(workflow)),
result: sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id",
"int64",
),
"1",
"2",
),
},
{
query: fmt.Sprintf(`delete from vd, vdt, vdl using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
left join _vt.vdiff_log as vdl on (vd.id = vdl.vdiff_id)
where vd.keyspace = %s and vd.workflow = %s`, encodeString(keyspace), encodeString(workflow)),
},
},
},
}
Expand All @@ -148,10 +185,14 @@ func TestPerformVDiffAction(t *testing.T) {
if tt.vde == nil {
tt.vde = vdiffenv.vde
}
for _, query := range tt.expectQueries {
vdiffenv.dbClient.ExpectRequest(query, &sqltypes.Result{}, nil)
for _, queryResult := range tt.expectQueries {
if queryResult.result == nil {
queryResult.result = &sqltypes.Result{}
}
vdiffenv.dbClient.ExpectRequest(queryResult.query, queryResult.result, nil)
}
got, err := tt.vde.PerformVDiffAction(ctx, tt.req)
vdiffenv.dbClient.Wait()
if tt.wantErr != nil && !vterrors.Equals(err, tt.wantErr) {
t.Errorf("Engine.PerformVDiffAction() error = %v, wantErr %v", err, tt.wantErr)
return
Expand All @@ -163,6 +204,9 @@ func TestPerformVDiffAction(t *testing.T) {
err := tt.postFunc()
require.NoError(t, err, "post function failed: %v", err)
}
// No VDiffs should be running anymore.
require.Equal(t, 0, len(vdiffenv.vde.controllers), "expected no controllers to be running, but found %d",
len(vdiffenv.vde.controllers))
})
}
}
18 changes: 11 additions & 7 deletions go/vt/vttablet/tabletmanager/vdiff/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@ import (
"strings"
"time"

"vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/vterrors"

"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tmclient"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

/*
Expand Down Expand Up @@ -177,6 +177,8 @@ func (ct *controller) start(ctx context.Context, dbClient binlogplayer.DBClient)
select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-ct.done:
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
default:
}
ct.workflowFilter = fmt.Sprintf("where workflow = %s and db_name = %s", encodeString(ct.workflow), encodeString(ct.vde.dbName))
Expand All @@ -190,6 +192,8 @@ func (ct *controller) start(ctx context.Context, dbClient binlogplayer.DBClient)
select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-ct.done:
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
default:
}
source := newMigrationSource()
Expand Down Expand Up @@ -306,9 +310,9 @@ func (ct *controller) saveErrorState(ctx context.Context, saveErr error) error {
log.Warningf("Failed to persist vdiff error state: %v. Will retry in %s", err, retryDelay.String())
select {
case <-ctx.Done():
return fmt.Errorf("engine is shutting down")
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "engine is shutting down")
case <-ct.done:
return fmt.Errorf("vdiff was stopped")
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
case <-time.After(retryDelay):
if retryDelay < maxRetryDelay {
retryDelay = time.Duration(float64(retryDelay) * 1.5)
Expand Down
18 changes: 10 additions & 8 deletions go/vt/vttablet/tabletmanager/vdiff/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ const (
sqlGetVDiffByID = "select * from _vt.vdiff where id = %d"
sqlDeleteVDiffs = `delete from vd, vdt, vdl using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
left join _vt.vdiff_log as vdl on (vd.id = vdl.vdiff_id)
where vd.keyspace = %s and vd.workflow = %s`
where vd.keyspace = %a and vd.workflow = %a`
sqlDeleteVDiffByUUID = `delete from vd, vdt using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
where vd.vdiff_uuid = %s`
where vd.vdiff_uuid = %a`
sqlVDiffSummary = `select vd.state as vdiff_state, vd.last_error as last_error, vdt.table_name as table_name,
vd.vdiff_uuid as 'uuid', vdt.state as table_state, vdt.table_rows as table_rows,
vd.started_at as started_at, vdt.rows_compared as rows_compared, vd.completed_at as completed_at,
Expand All @@ -41,12 +41,14 @@ const (
sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = %s %s where id = %d"
sqlUpdateVDiffStopped = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'stopped', vdt.state = 'stopped', vd.last_error = ''
where vd.id = vdt.vdiff_id and vd.id = %d and vd.state != 'completed'`
sqlGetVReplicationEntry = "select * from _vt.vreplication %s"
sqlGetVDiffsToRun = "select * from _vt.vdiff where state in ('started','pending')" // what VDiffs have not been stopped or completed
sqlGetVDiffsToRetry = "select * from _vt.vdiff where state = 'error' and json_unquote(json_extract(options, '$.core_options.auto_retry')) = 'true'"
sqlGetVDiffID = "select id as id from _vt.vdiff where vdiff_uuid = %s"
sqlGetAllVDiffs = "select * from _vt.vdiff order by id desc"
sqlGetAllTableRows = "select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %s and table_name in (%s)"
sqlGetVReplicationEntry = "select * from _vt.vreplication %s"
sqlGetVDiffsToRun = "select * from _vt.vdiff where state in ('started','pending')" // what VDiffs have not been stopped or completed
sqlGetVDiffsToRetry = "select * from _vt.vdiff where state = 'error' and json_unquote(json_extract(options, '$.core_options.auto_retry')) = 'true'"
sqlGetVDiffID = "select id as id from _vt.vdiff where vdiff_uuid = %a"
sqlGetVDiffIDsByKeyspaceWorkflow = "select id as id from _vt.vdiff where keyspace = %a and workflow = %a"
sqlGetAllVDiffs = "select * from _vt.vdiff order by id desc"
sqlGetTableRows = "select table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %a and table_name = %a"
sqlGetAllTableRows = "select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %s and table_name in (%s)"

sqlNewVDiffTable = "insert into _vt.vdiff_table(vdiff_id, table_name, state, table_rows) values(%d, %s, 'pending', %d)"
sqlGetVDiffTable = `select vdt.lastpk as lastpk, vdt.mismatch as mismatch, vdt.report as report
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ func (td *tableDiffer) streamOneShard(ctx context.Context, participant *shardStr
case participant.result <- result:
case <-ctx.Done():
return vterrors.Wrap(ctx.Err(), "VStreamRows")
case <-td.wd.ct.done:
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
}
return nil
})
Expand Down Expand Up @@ -490,6 +492,8 @@ func (td *tableDiffer) diff(ctx context.Context, rowsToCompare int64, debug, onl
select {
case <-ctx.Done():
return nil, vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-td.wd.ct.done:
return nil, vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
default:
}

Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.D
select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-wd.ct.done:
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
default:
}

Expand Down Expand Up @@ -141,6 +143,8 @@ func (wd *workflowDiffer) diff(ctx context.Context) error {
select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-wd.ct.done:
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
default:
}

Expand All @@ -160,6 +164,8 @@ func (wd *workflowDiffer) diff(ctx context.Context) error {
select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-wd.ct.done:
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped")
default:
}
query := fmt.Sprintf(sqlGetVDiffTable, wd.ct.id, encodeString(td.table.Name))
Expand Down
Loading