diff --git a/go/vt/vttablet/tabletmanager/vdiff/action.go b/go/vt/vttablet/tabletmanager/vdiff/action.go index 7a18015fc24..78f636b7662 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/action.go +++ b/go/vt/vttablet/tabletmanager/vdiff/action.go @@ -23,6 +23,7 @@ import ( "github.com/google/uuid" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" @@ -97,7 +98,10 @@ func (vde *Engine) getVDiffSummary(vdiffID int64, dbClient binlogplayer.DBClient var qr *sqltypes.Result var err error - query := fmt.Sprintf(sqlVDiffSummary, vdiffID) + query, err := sqlparser.ParseAndBind(sqlVDiffSummary, sqltypes.Int64BindVariable(vdiffID)) + if err != nil { + return nil, err + } if qr, err = dbClient.ExecuteFetch(query, -1); err != nil { return nil, err } @@ -144,10 +148,12 @@ func (vde *Engine) getDefaultCell() (string, error) { func (vde *Engine) handleCreateResumeAction(ctx context.Context, dbClient binlogplayer.DBClient, action VDiffAction, req *tabletmanagerdatapb.VDiffRequest, resp *tabletmanagerdatapb.VDiffResponse) error { var qr *sqltypes.Result - var err error options := req.Options - query := fmt.Sprintf(sqlGetVDiffID, encodeString(req.VdiffUuid)) + query, err := sqlparser.ParseAndBind(sqlGetVDiffID, sqltypes.StringBindVariable(req.VdiffUuid)) + if err != nil { + return err + } if qr, err = dbClient.ExecuteFetch(query, 1); err != nil { return err } @@ -173,9 +179,18 @@ func (vde *Engine) handleCreateResumeAction(ctx context.Context, dbClient binlog return err } if action == CreateAction { - query := fmt.Sprintf(sqlNewVDiff, - encodeString(req.Keyspace), encodeString(req.Workflow), "pending", encodeString(string(optionsJSON)), - vde.thisTablet.Shard, topoproto.TabletDbName(vde.thisTablet), req.VdiffUuid) + query, err := sqlparser.ParseAndBind(sqlNewVDiff, + sqltypes.StringBindVariable(req.Keyspace), + sqltypes.StringBindVariable(req.Workflow), + sqltypes.StringBindVariable("pending"), + sqltypes.StringBindVariable(string(optionsJSON)), + sqltypes.StringBindVariable(vde.thisTablet.Shard), + sqltypes.StringBindVariable(topoproto.TabletDbName(vde.thisTablet)), + sqltypes.StringBindVariable(req.VdiffUuid), + ) + if err != nil { + return err + } if qr, err = dbClient.ExecuteFetch(query, 1); err != nil { return err } @@ -185,7 +200,13 @@ func (vde *Engine) handleCreateResumeAction(ctx context.Context, dbClient binlog } resp.Id = int64(qr.InsertID) } else { - query := fmt.Sprintf(sqlResumeVDiff, encodeString(string(optionsJSON)), encodeString(req.VdiffUuid)) + query, err := sqlparser.ParseAndBind(sqlResumeVDiff, + sqltypes.StringBindVariable(string(optionsJSON)), + sqltypes.StringBindVariable(req.VdiffUuid), + ) + if err != nil { + return err + } if qr, err = dbClient.ExecuteFetch(query, 1); err != nil { return err } @@ -219,7 +240,13 @@ func (vde *Engine) handleShowAction(ctx context.Context, dbClient binlogplayer.D vdiffUUID := "" if req.ActionArg == LastActionArg { - query := fmt.Sprintf(sqlGetMostRecentVDiff, encodeString(req.Keyspace), encodeString(req.Workflow)) + query, err := sqlparser.ParseAndBind(sqlGetMostRecentVDiff, + sqltypes.StringBindVariable(req.Keyspace), + sqltypes.StringBindVariable(req.Workflow), + ) + if err != nil { + return err + } if qr, err = dbClient.ExecuteFetch(query, 1); err != nil { return err } @@ -234,7 +261,14 @@ func (vde *Engine) handleShowAction(ctx context.Context, dbClient binlogplayer.D } if vdiffUUID != "" { resp.VdiffUuid = vdiffUUID - query := fmt.Sprintf(sqlGetVDiffByKeyspaceWorkflowUUID, encodeString(req.Keyspace), encodeString(req.Workflow), encodeString(vdiffUUID)) + query, err := sqlparser.ParseAndBind(sqlGetVDiffByKeyspaceWorkflowUUID, + sqltypes.StringBindVariable(req.Keyspace), + sqltypes.StringBindVariable(req.Workflow), + sqltypes.StringBindVariable(vdiffUUID), + ) + if err != nil { + return err + } if qr, err = dbClient.ExecuteFetch(query, 1); err != nil { return err } @@ -278,7 +312,7 @@ func (vde *Engine) handleStopAction(ctx context.Context, dbClient binlogplayer.D if controller.uuid == req.VdiffUuid { controller.Stop() if err := controller.markStoppedByRequest(); err != nil { - return err + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "encountered an error marking vdiff %s as stopped: %v", controller.uuid, err) } break } @@ -292,13 +326,22 @@ func (vde *Engine) handleDeleteAction(ctx context.Context, dbClient binlogplayer switch req.ActionArg { case AllActionArg: - query = fmt.Sprintf(sqlDeleteVDiffs, encodeString(req.Keyspace), encodeString(req.Workflow)) + query, err = sqlparser.ParseAndBind(sqlDeleteVDiffs, + sqltypes.StringBindVariable(req.Keyspace), + sqltypes.StringBindVariable(req.Workflow), + ) + if err != nil { + return err + } default: uuid, err := uuid.Parse(req.ActionArg) if err != nil { return fmt.Errorf("action argument %s not supported", req.ActionArg) } - query = fmt.Sprintf(sqlDeleteVDiffByUUID, encodeString(uuid.String())) + query, err = sqlparser.ParseAndBind(sqlDeleteVDiffByUUID, sqltypes.StringBindVariable(uuid.String())) + if err != nil { + return err + } } if _, err = dbClient.ExecuteFetch(query, 1); err != nil { return err diff --git a/go/vt/vttablet/tabletmanager/vdiff/controller.go b/go/vt/vttablet/tabletmanager/vdiff/controller.go index 8edc4b333e4..a98b5481343 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/controller.go +++ b/go/vt/vttablet/tabletmanager/vdiff/controller.go @@ -24,6 +24,7 @@ import ( "time" "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "google.golang.org/protobuf/encoding/prototext" @@ -165,8 +166,13 @@ func (ct *controller) updateState(dbClient binlogplayer.DBClient, state VDiffSta // Clear out any previous error for the vdiff on this shard err = errors.New("") } - query := fmt.Sprintf(sqlUpdateVDiffState, encodeString(string(state)), encodeString(err.Error()), extraCols, ct.id) - if _, err := dbClient.ExecuteFetch(query, 1); err != nil { + query := sqlparser.BuildParsedQuery(sqlUpdateVDiffState, + encodeString(string(state)), + encodeString(err.Error()), + extraCols, + ct.id, + ) + if _, err := dbClient.ExecuteFetch(query.Query, 1); err != nil { return err } insertVDiffLog(ct.vde.ctx, dbClient, ct.id, fmt.Sprintf("State changed to: %s", state)) @@ -179,9 +185,10 @@ func (ct *controller) start(ctx context.Context, dbClient binlogplayer.DBClient) return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") default: } - ct.workflowFilter = fmt.Sprintf("where workflow = %s and db_name = %s", encodeString(ct.workflow), encodeString(ct.vde.dbName)) - query := fmt.Sprintf(sqlGetVReplicationEntry, ct.workflowFilter) - qr, err := dbClient.ExecuteFetch(query, -1) + ct.workflowFilter = fmt.Sprintf("where workflow = %s and db_name = %s", encodeString(ct.workflow), + encodeString(ct.vde.dbName)) + query := sqlparser.BuildParsedQuery(sqlGetVReplicationEntry, ct.workflowFilter) + qr, err := dbClient.ExecuteFetch(query.Query, -1) if err != nil { return err } @@ -248,15 +255,17 @@ func (ct *controller) start(ctx context.Context, dbClient binlogplayer.DBClient) func (ct *controller) markStoppedByRequest() error { dbClient := ct.vde.dbClientFactoryFiltered() if err := dbClient.Connect(); err != nil { - return fmt.Errorf("encountered an error marking vdiff %s as stopped: %v", ct.uuid, err) + return err } defer dbClient.Close() - query := fmt.Sprintf(sqlUpdateVDiffStopped, ct.id) + query, err := sqlparser.ParseAndBind(sqlUpdateVDiffStopped, sqltypes.Int64BindVariable(ct.id)) + if err != nil { + return err + } var res *sqltypes.Result - var err error if res, err = dbClient.ExecuteFetch(query, 1); err != nil { - return fmt.Errorf("encountered an error marking vdiff %s as stopped: %v", ct.uuid, err) + return err } // We don't mark it as stopped if it's already completed if res.RowsAffected > 0 { diff --git a/go/vt/vttablet/tabletmanager/vdiff/engine.go b/go/vt/vttablet/tabletmanager/vdiff/engine.go index 92e4a2a4555..c0ee5bcad51 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/engine.go +++ b/go/vt/vttablet/tabletmanager/vdiff/engine.go @@ -27,6 +27,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/proto/tabletmanagerdata" "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" "vitess.io/vitess/go/vt/vttablet/tmclient" @@ -297,7 +298,11 @@ func (vde *Engine) getVDiffsToRetry(ctx context.Context, dbClient binlogplayer.D } func (vde *Engine) getVDiffByID(ctx context.Context, dbClient binlogplayer.DBClient, id int64) (*sqltypes.Result, error) { - qr, err := dbClient.ExecuteFetch(fmt.Sprintf(sqlGetVDiffByID, id), -1) + query, err := sqlparser.ParseAndBind(sqlGetVDiffByID, sqltypes.Int64BindVariable(id)) + if err != nil { + return nil, err + } + qr, err := dbClient.ExecuteFetch(query, -1) if err != nil { return nil, err } @@ -340,7 +345,11 @@ func (vde *Engine) retryVDiffs(ctx context.Context) error { return err } log.Infof("Retrying vdiff %s that had an ephemeral error of '%v'", uuid, lastError) - if _, err = dbClient.ExecuteFetch(fmt.Sprintf(sqlRetryVDiff, id), 1); err != nil { + query, err := sqlparser.ParseAndBind(sqlRetryVDiff, sqltypes.Int64BindVariable(id)) + if err != nil { + return err + } + if _, err = dbClient.ExecuteFetch(query, 1); err != nil { return err } options := &tabletmanagerdata.VDiffOptions{} diff --git a/go/vt/vttablet/tabletmanager/vdiff/schema.go b/go/vt/vttablet/tabletmanager/vdiff/schema.go index f82f03106b4..f8194dee14c 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/schema.go +++ b/go/vt/vttablet/tabletmanager/vdiff/schema.go @@ -18,48 +18,48 @@ package vdiff const ( sqlAnalyzeTable = "analyze table `%s`.`%s`" - sqlNewVDiff = "insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values(%s, %s, '%s', %s, '%s', '%s', '%s')" - sqlResumeVDiff = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.options = %s, vd.started_at = NULL, vd.completed_at = NULL, vd.state = 'pending', - vdt.state = 'pending' where vd.vdiff_uuid = %s and vd.id = vdt.vdiff_id and vd.state in ('completed', 'stopped') + sqlNewVDiff = "insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values(%a, %a, %a, %a, %a, %a, %a)" + sqlResumeVDiff = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.options = %a, vd.started_at = NULL, vd.completed_at = NULL, vd.state = 'pending', + vdt.state = 'pending' where vd.vdiff_uuid = %a and vd.id = vdt.vdiff_id and vd.state in ('completed', 'stopped') and vdt.state in ('completed', 'stopped')` sqlRetryVDiff = `update _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) set vd.state = 'pending', - vd.last_error = '', vdt.state = 'pending' where vd.id = %d and (vd.state = 'error' or vdt.state = 'error')` - sqlGetVDiffByKeyspaceWorkflowUUID = "select * from _vt.vdiff where keyspace = %s and workflow = %s and vdiff_uuid = %s" - sqlGetMostRecentVDiff = "select * from _vt.vdiff where keyspace = %s and workflow = %s order by id desc limit 1" - sqlGetVDiffByID = "select * from _vt.vdiff where id = %d" + vd.last_error = '', vdt.state = 'pending' where vd.id = %a and (vd.state = 'error' or vdt.state = 'error')` + sqlGetVDiffByKeyspaceWorkflowUUID = "select * from _vt.vdiff where keyspace = %a and workflow = %a and vdiff_uuid = %a" + sqlGetMostRecentVDiff = "select * from _vt.vdiff where keyspace = %a and workflow = %a order by id desc limit 1" + sqlGetVDiffByID = "select * from _vt.vdiff where id = %a" sqlDeleteVDiffs = `delete from vd, vdt, vdl using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) left join _vt.vdiff_log as vdl on (vd.id = vdl.vdiff_id) - where vd.keyspace = %s and vd.workflow = %s` + where vd.keyspace = %a and vd.workflow = %a` sqlDeleteVDiffByUUID = `delete from vd, vdt using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) - where vd.vdiff_uuid = %s` + where vd.vdiff_uuid = %a` sqlVDiffSummary = `select vd.state as vdiff_state, vd.last_error as last_error, vdt.table_name as table_name, vd.vdiff_uuid as 'uuid', vdt.state as table_state, vdt.table_rows as table_rows, vd.started_at as started_at, vdt.rows_compared as rows_compared, vd.completed_at as completed_at, IF(vdt.mismatch = 1, 1, 0) as has_mismatch, vdt.report as report from _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) - where vd.id = %d` + where vd.id = %a` // sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1` sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = %s %s where id = %d" sqlUpdateVDiffStopped = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'stopped', vdt.state = 'stopped', vd.last_error = '' - where vd.id = vdt.vdiff_id and vd.id = %d and vd.state != 'completed'` + where vd.id = vdt.vdiff_id and vd.id = %a and vd.state != 'completed'` sqlGetVReplicationEntry = "select * from _vt.vreplication %s" sqlGetVDiffsToRun = "select * from _vt.vdiff where state in ('started','pending')" // what VDiffs have not been stopped or completed sqlGetVDiffsToRetry = "select * from _vt.vdiff where state = 'error' and json_unquote(json_extract(options, '$.core_options.auto_retry')) = 'true'" - sqlGetVDiffID = "select id as id from _vt.vdiff where vdiff_uuid = %s" + sqlGetVDiffID = "select id as id from _vt.vdiff where vdiff_uuid = %a" sqlGetAllVDiffs = "select * from _vt.vdiff order by id desc" sqlGetTableRows = "select table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %a and table_name = %a" sqlGetAllTableRows = "select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %s and table_name in (%s)" - sqlNewVDiffTable = "insert into _vt.vdiff_table(vdiff_id, table_name, state, table_rows) values(%d, %s, 'pending', %d)" + sqlNewVDiffTable = "insert into _vt.vdiff_table(vdiff_id, table_name, state, table_rows) values(%a, %a, 'pending', %a)" sqlGetVDiffTable = `select vdt.lastpk as lastpk, vdt.mismatch as mismatch, vdt.report as report from _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) - where vdt.vdiff_id = %d and vdt.table_name = %s` + where vdt.vdiff_id = %a and vdt.table_name = %a` sqlUpdateTableRows = "update _vt.vdiff_table set table_rows = %a where vdiff_id = %a and table_name = %a" - sqlUpdateTableProgress = "update _vt.vdiff_table set rows_compared = %d, lastpk = %s, report = %s where vdiff_id = %d and table_name = %s" - sqlUpdateTableNoProgress = "update _vt.vdiff_table set rows_compared = %d, report = %s where vdiff_id = %d and table_name = %s" - sqlUpdateTableState = "update _vt.vdiff_table set state = %s where vdiff_id = %d and table_name = %s" - sqlUpdateTableStateAndReport = "update _vt.vdiff_table set state = %s, rows_compared = %d, report = %s where vdiff_id = %d and table_name = %s" - sqlUpdateTableMismatch = "update _vt.vdiff_table set mismatch = true where vdiff_id = %d and table_name = %s" + sqlUpdateTableProgress = "update _vt.vdiff_table set rows_compared = %a, lastpk = %a, report = %a where vdiff_id = %a and table_name = %a" + sqlUpdateTableNoProgress = "update _vt.vdiff_table set rows_compared = %a, report = %a where vdiff_id = %a and table_name = %a" + sqlUpdateTableState = "update _vt.vdiff_table set state = %a where vdiff_id = %a and table_name = %a" + sqlUpdateTableStateAndReport = "update _vt.vdiff_table set state = %a, rows_compared = %a, report = %a where vdiff_id = %a and table_name = %a" + sqlUpdateTableMismatch = "update _vt.vdiff_table set mismatch = true where vdiff_id = %a and table_name = %a" - sqlGetIncompleteTables = "select table_name as table_name from _vt.vdiff_table where vdiff_id = %d and state != 'completed'" + sqlGetIncompleteTables = "select table_name as table_name from _vt.vdiff_table where vdiff_id = %a and state != 'completed'" ) diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index 0f650e3eaa1..c3c80d28d40 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -442,7 +442,13 @@ func (td *tableDiffer) diff(ctx context.Context, rowsToCompare int64, debug, onl // We need to continue were we left off when appropriate. This can be an // auto-retry on error, or a manual retry via the resume command. // Otherwise the existing state will be empty and we start from scratch. - query := fmt.Sprintf(sqlGetVDiffTable, td.wd.ct.id, encodeString(td.table.Name)) + query, err := sqlparser.ParseAndBind(sqlGetVDiffTable, + sqltypes.Int64BindVariable(td.wd.ct.id), + sqltypes.StringBindVariable(td.table.Name), + ) + if err != nil { + return nil, err + } cs, err := dbClient.ExecuteFetch(query, -1) if err != nil { return nil, err @@ -663,9 +669,26 @@ func (td *tableDiffer) updateTableProgress(dbClient binlogplayer.DBClient, dr *D return err } - query = fmt.Sprintf(sqlUpdateTableProgress, dr.ProcessedRows, encodeString(string(lastPK)), encodeString(string(rpt)), td.wd.ct.id, encodeString(td.table.Name)) + query, err = sqlparser.ParseAndBind(sqlUpdateTableProgress, + sqltypes.Int64BindVariable(dr.ProcessedRows), + sqltypes.StringBindVariable(string(lastPK)), + sqltypes.StringBindVariable(string(rpt)), + sqltypes.Int64BindVariable(td.wd.ct.id), + sqltypes.StringBindVariable(td.table.Name), + ) + if err != nil { + return err + } } else { - query = fmt.Sprintf(sqlUpdateTableNoProgress, dr.ProcessedRows, encodeString(string(rpt)), td.wd.ct.id, encodeString(td.table.Name)) + query, err = sqlparser.ParseAndBind(sqlUpdateTableNoProgress, + sqltypes.Int64BindVariable(dr.ProcessedRows), + sqltypes.StringBindVariable(string(rpt)), + sqltypes.Int64BindVariable(td.wd.ct.id), + sqltypes.StringBindVariable(td.table.Name), + ) + if err != nil { + return err + } } if _, err := dbClient.ExecuteFetch(query, 1); err != nil { return err @@ -674,8 +697,15 @@ func (td *tableDiffer) updateTableProgress(dbClient binlogplayer.DBClient, dr *D } func (td *tableDiffer) updateTableState(ctx context.Context, dbClient binlogplayer.DBClient, state VDiffState) error { - query := fmt.Sprintf(sqlUpdateTableState, encodeString(string(state)), td.wd.ct.id, encodeString(td.table.Name)) - if _, err := dbClient.ExecuteFetch(query, 1); err != nil { + query, err := sqlparser.ParseAndBind(sqlUpdateTableState, + sqltypes.StringBindVariable(string(state)), + sqltypes.Int64BindVariable(td.wd.ct.id), + sqltypes.StringBindVariable(td.table.Name), + ) + if err != nil { + return err + } + if _, err = dbClient.ExecuteFetch(query, 1); err != nil { return err } insertVDiffLog(ctx, dbClient, td.wd.ct.id, fmt.Sprintf("%s: table %s", state, encodeString(td.table.Name))) @@ -694,8 +724,17 @@ func (td *tableDiffer) updateTableStateAndReport(ctx context.Context, dbClient b } else { report = "{}" } - query := fmt.Sprintf(sqlUpdateTableStateAndReport, encodeString(string(state)), dr.ProcessedRows, encodeString(report), td.wd.ct.id, encodeString(td.table.Name)) - if _, err := dbClient.ExecuteFetch(query, 1); err != nil { + query, err := sqlparser.ParseAndBind(sqlUpdateTableStateAndReport, + sqltypes.StringBindVariable(string(state)), + sqltypes.Int64BindVariable(dr.ProcessedRows), + sqltypes.StringBindVariable(report), + sqltypes.Int64BindVariable(td.wd.ct.id), + sqltypes.StringBindVariable(td.table.Name), + ) + if err != nil { + return err + } + if _, err = dbClient.ExecuteFetch(query, 1); err != nil { return err } insertVDiffLog(ctx, dbClient, td.wd.ct.id, fmt.Sprintf("%s: table %s", state, encodeString(td.table.Name))) @@ -704,8 +743,14 @@ func (td *tableDiffer) updateTableStateAndReport(ctx context.Context, dbClient b } func updateTableMismatch(dbClient binlogplayer.DBClient, vdiffID int64, table string) error { - query := fmt.Sprintf(sqlUpdateTableMismatch, vdiffID, encodeString(table)) - if _, err := dbClient.ExecuteFetch(query, 1); err != nil { + query, err := sqlparser.ParseAndBind(sqlUpdateTableMismatch, + sqltypes.Int64BindVariable(vdiffID), + sqltypes.StringBindVariable(table), + ) + if err != nil { + return err + } + if _, err = dbClient.ExecuteFetch(query, 1); err != nil { return err } return nil @@ -777,7 +822,10 @@ func (td *tableDiffer) adjustForSourceTimeZone(targetSelectExprs sqlparser.Selec // rows in the table) and saves them in the vdiff_table record. func (td *tableDiffer) updateTableStats(dbClient binlogplayer.DBClient) error { // First update the stats. - stmt := sqlparser.BuildParsedQuery(sqlAnalyzeTable, td.wd.ct.vde.dbName, td.table.Name) + stmt := sqlparser.BuildParsedQuery(sqlAnalyzeTable, + td.wd.ct.vde.dbName, + td.table.Name, + ) if _, err := dbClient.ExecuteFetch(stmt.Query, -1); err != nil { return err } diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go index ed4879eeb95..7d7af3f8a37 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go @@ -205,7 +205,13 @@ func (wd *workflowDiffer) diff(ctx context.Context) error { return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") default: } - query := fmt.Sprintf(sqlGetVDiffTable, wd.ct.id, encodeString(td.table.Name)) + query, err := sqlparser.ParseAndBind(sqlGetVDiffTable, + sqltypes.Int64BindVariable(wd.ct.id), + sqltypes.StringBindVariable(td.table.Name), + ) + if err != nil { + return err + } qr, err := dbClient.ExecuteFetch(query, 1) if err != nil { return err @@ -235,7 +241,10 @@ func (wd *workflowDiffer) diff(ctx context.Context) error { } func (wd *workflowDiffer) markIfCompleted(ctx context.Context, dbClient binlogplayer.DBClient) error { - query := fmt.Sprintf(sqlGetIncompleteTables, wd.ct.id) + query, err := sqlparser.ParseAndBind(sqlGetIncompleteTables, sqltypes.Int64BindVariable(wd.ct.id)) + if err != nil { + return err + } qr, err := dbClient.ExecuteFetch(query, -1) if err != nil { return err @@ -305,7 +314,13 @@ func (wd *workflowDiffer) buildPlan(dbClient binlogplayer.DBClient, filter *binl // getTableLastPK gets the lastPK protobuf message for a given vdiff table. func (wd *workflowDiffer) getTableLastPK(dbClient binlogplayer.DBClient, tableName string) (*querypb.QueryResult, error) { - query := fmt.Sprintf(sqlGetVDiffTable, wd.ct.id, encodeString(tableName)) + query, err := sqlparser.ParseAndBind(sqlGetVDiffTable, + sqltypes.Int64BindVariable(wd.ct.id), + sqltypes.StringBindVariable(tableName), + ) + if err != nil { + return nil, err + } qr, err := dbClient.ExecuteFetch(query, 1) if err != nil { return nil, err @@ -332,7 +347,10 @@ func (wd *workflowDiffer) initVDiffTables(dbClient binlogplayer.DBClient) error for tableName := range wd.tableDiffers { // Update the table statistics for each table if requested. if wd.opts.CoreOptions.UpdateTableStats { - stmt := sqlparser.BuildParsedQuery(sqlAnalyzeTable, wd.ct.vde.dbName, tableName) + stmt := sqlparser.BuildParsedQuery(sqlAnalyzeTable, + wd.ct.vde.dbName, + tableName, + ) log.Infof("Updating the table stats for %s.%s using: %q", wd.ct.vde.dbName, tableName, stmt.Query) if _, err := dbClient.ExecuteFetch(stmt.Query, -1); err != nil { return err @@ -344,8 +362,11 @@ func (wd *workflowDiffer) initVDiffTables(dbClient binlogplayer.DBClient) error tableIn.WriteByte(',') } } - query := fmt.Sprintf(sqlGetAllTableRows, encodeString(wd.ct.vde.dbName), tableIn.String()) - isqr, err := dbClient.ExecuteFetch(query, -1) + query := sqlparser.BuildParsedQuery(sqlGetAllTableRows, + encodeString(wd.ct.vde.dbName), + tableIn.String(), + ) + isqr, err := dbClient.ExecuteFetch(query.Query, -1) if err != nil { return err } @@ -353,13 +374,26 @@ func (wd *workflowDiffer) initVDiffTables(dbClient binlogplayer.DBClient) error tableName, _ := row.ToString("table_name") tableRows, _ := row.ToInt64("table_rows") - query := fmt.Sprintf(sqlGetVDiffTable, wd.ct.id, encodeString(tableName)) + query, err := sqlparser.ParseAndBind(sqlGetVDiffTable, + sqltypes.Int64BindVariable(wd.ct.id), + sqltypes.StringBindVariable(tableName), + ) + if err != nil { + return err + } qr, err := dbClient.ExecuteFetch(query, -1) if err != nil { return err } if len(qr.Rows) == 0 { - query = fmt.Sprintf(sqlNewVDiffTable, wd.ct.id, encodeString(tableName), tableRows) + query, err = sqlparser.ParseAndBind(sqlNewVDiffTable, + sqltypes.Int64BindVariable(wd.ct.id), + sqltypes.StringBindVariable(tableName), + sqltypes.Int64BindVariable(tableRows), + ) + if err != nil { + return err + } } else if len(qr.Rows) == 1 { query, err = sqlparser.ParseAndBind(sqlUpdateTableRows, sqltypes.Int64BindVariable(tableRows),