diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 4f413717770..3d58d7d64bf 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -478,7 +478,7 @@ var commands = []commandGroup{ "Workflow", []command{ {"Workflow", commandWorkflow, " --dry-run", - "Start/Stop/Delete/List/ListAll Workflow on all target tablets in workflow. Example: Workflow merchant.morders Start", + "Start/Stop/Delete/Show/ListAll Workflow on all target tablets in workflow. Example: Workflow merchant.morders Start", }, }, }, @@ -2950,6 +2950,10 @@ func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag. } keyspace := subFlags.Arg(0) action := strings.ToLower(subFlags.Arg(1)) + // Note: List is deprecated and replaced by show. + if action == "list" { + action = "show" + } var workflow string var err error if action != "listall" { @@ -2967,7 +2971,7 @@ func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag. if err != nil { return err } - if action == "list" || action == "listall" { + if action == "show" || action == "listall" { return nil } if len(results) == 0 { diff --git a/go/vt/wrangler/vexec.go b/go/vt/wrangler/vexec.go index 0df718ac23c..f7b64a2738b 100644 --- a/go/vt/wrangler/vexec.go +++ b/go/vt/wrangler/vexec.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "vitess.io/vitess/go/vt/log" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "github.com/golang/protobuf/proto" "github.com/olekukonko/tablewriter" @@ -103,7 +104,7 @@ func (vx *vexec) outputDryRunInfo(wr *Wrangler) error { table.SetHeader([]string{"Tablet", "ID", "BinLogSource", "State", "DBName", "Current GTID", "MaxReplicationLag"}) for _, master := range vx.masters { key := fmt.Sprintf("%s/%s", master.Shard, master.AliasString()) - for _, stream := range rsr.Statuses[key] { + for _, stream := range rsr.ShardStatuses[key].MasterReplicationStatuses { table.Append([]string{key, fmt.Sprintf("%d", stream.ID), stream.Bls.String(), stream.State, stream.DBName, stream.Pos, fmt.Sprintf("%d", stream.MaxReplicationLag)}) } } @@ -192,8 +193,9 @@ func (vx *vexec) getMasterForShard(shard string) (*topo.TabletInfo, error) { // WorkflowAction can start/stop/delete or list streams in _vt.vreplication on all masters in the target keyspace of the workflow. func (wr *Wrangler) WorkflowAction(ctx context.Context, workflow, keyspace, action string, dryRun bool) (map[*topo.TabletInfo]*sqltypes.Result, error) { - if action == "list" { - return nil, wr.listStreams(ctx, workflow, keyspace) + if action == "show" { + _, err := wr.ShowWorkflow(ctx, workflow, keyspace) + return nil, err } else if action == "listall" { _, err := wr.ListAllWorkflows(ctx, keyspace) return nil, err @@ -244,12 +246,33 @@ func (wr *Wrangler) execWorkflowAction(ctx context.Context, workflow, keyspace, return results, err } -type replicationStatusResult struct { - Workflow string - SourceKeyspace string - TargetKeyspace string +// ReplicationStatusResult represents the result of trying to get the replication status for a given workflow. +type ReplicationStatusResult struct { + // Workflow represents the name of the workflow relevant to the related replication statuses. + Workflow string + // SourceLocation represents the keyspace and shards that we are vreplicating from. + SourceLocation ReplicationLocation + // TargetLocation represents the keyspace and shards that we are vreplicating into. + TargetLocation ReplicationLocation + + // Statuses is a map of / : ShardReplicationStatus (for the given shard). + ShardStatuses map[string]*ShardReplicationStatus +} - Statuses map[string][]*replicationStatus +// ReplicationLocation represents a location that data is either replicating from, or replicating into. +type ReplicationLocation struct { + Keyspace string + Shards []string +} + +// ShardReplicationStatus holds relevant vreplication related info for the given shard. +type ShardReplicationStatus struct { + // MasterReplicationStatuses represents all of the replication statuses for the master tablets in the given shard. + MasterReplicationStatuses []*ReplicationStatus + // TabletControls represents the tablet controls for the tablets in the shard. + TabletControls []*topodatapb.Shard_TabletControl + // MasterIsServing indicates whether the master tablet of the given shard is currently serving write traffic. + MasterIsServing bool } type copyState struct { @@ -257,23 +280,36 @@ type copyState struct { LastPK string } -type replicationStatus struct { - Shard string - Tablet string - ID int64 - Bls binlogdatapb.BinlogSource - Pos string - StopPos string - State string +// ReplicationStatus includes data from the _vt.vreplication table, along with other useful relevant data. +type ReplicationStatus struct { + // Shard represents the relevant shard name. + Shard string + // Tablet is the tablet alias that the ReplicationStatus came from. + Tablet string + // ID represents the id column from the _vt.vreplication table. + ID int64 + // Bls represents the BinlogSource. + Bls binlogdatapb.BinlogSource + // Pos represents the pos column from the _vt.vreplication table. + Pos string + // StopPos represents the stop_pos column from the _vt.vreplication table. + StopPos string + // State represents the state column from the _vt.vreplication table. + State string + // MaxReplicationLag represents the max_replication_lag column from the _vt.vreplication table. MaxReplicationLag int64 - DBName string - TimeUpdated int64 - Message string - + // DbName represents the db_name column from the _vt.vreplication table. + DBName string + // TimeUpdated represents the time_updated column from the _vt.vreplication table. + TimeUpdated int64 + // Message represents the message column from the _vt.vreplication table. + Message string + + // CopyState represents the rows from the _vt.copy_state table. CopyState []copyState } -func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqltypes.Value, master *topo.TabletInfo) (*replicationStatus, string, error) { +func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqltypes.Value, master *topo.TabletInfo) (*ReplicationStatus, string, error) { var err error var id, maxReplicationLag, timeUpdated int64 var state, dbName, pos, stopPos, message string @@ -298,7 +334,7 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty return nil, "", err } message = row[8].ToString() - status := &replicationStatus{ + status := &ReplicationStatus{ Shard: master.Shard, Tablet: master.AliasString(), ID: id, @@ -320,32 +356,57 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty return status, bls.Keyspace, nil } -func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) (*replicationStatusResult, error) { - var rsr replicationStatusResult - rsr.Statuses = make(map[string][]*replicationStatus) +func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) (*ReplicationStatusResult, error) { + var rsr ReplicationStatusResult + rsr.ShardStatuses = make(map[string]*ShardReplicationStatus) rsr.Workflow = workflow - rsr.TargetKeyspace = keyspace var results map[*topo.TabletInfo]*querypb.QueryResult query := "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, message from _vt.vreplication" results, err := wr.runVexec(ctx, workflow, keyspace, query, false) if err != nil { return nil, err } + + // We set a topo timeout since we contact topo for the shard record. + ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) + defer cancel() + var sourceKeyspace string + sourceShards := sets.NewString() + targetShards := sets.NewString() for master, result := range results { - var rsrStatus []*replicationStatus + var rsrStatus []*ReplicationStatus qr := sqltypes.Proto3ToResult(result) for _, row := range qr.Rows { - status, sourceKeyspace, err := wr.getReplicationStatusFromRow(ctx, row, master) + status, sk, err := wr.getReplicationStatusFromRow(ctx, row, master) fmt.Printf("getReplicationStatusFromRow status for master %s is %v\n", master.AliasString(), status) if err != nil { return nil, err } - rsr.SourceKeyspace = sourceKeyspace + sourceKeyspace = sk + sourceShards.Insert(status.Bls.Shard) rsrStatus = append(rsrStatus, status) } - rsr.Statuses[fmt.Sprintf("%s/%s", master.Shard, master.AliasString())] = rsrStatus + si, err := wr.ts.GetShard(ctx, keyspace, master.Shard) + if err != nil { + return nil, err + } + targetShards.Insert(si.ShardName()) + rsr.ShardStatuses[fmt.Sprintf("%s/%s", master.Shard, master.AliasString())] = &ShardReplicationStatus{ + MasterReplicationStatuses: rsrStatus, + TabletControls: si.TabletControls, + MasterIsServing: si.IsMasterServing, + } } + rsr.SourceLocation = ReplicationLocation{ + Keyspace: sourceKeyspace, + Shards: sourceShards.List(), + } + rsr.TargetLocation = ReplicationLocation{ + Keyspace: keyspace, + Shards: targetShards.List(), + } + return &rsr, nil } @@ -374,20 +435,21 @@ func (wr *Wrangler) ListAllWorkflows(ctx context.Context, keyspace string) ([]st return workflows, nil } -func (wr *Wrangler) listStreams(ctx context.Context, workflow, keyspace string) error { +// ShowWorkflow will return all of the relevant replication related information for the given workflow. +func (wr *Wrangler) ShowWorkflow(ctx context.Context, workflow, keyspace string) (*ReplicationStatusResult, error) { replStatus, err := wr.getStreams(ctx, workflow, keyspace) if err != nil { - return err + return nil, err } - if len(replStatus.Statuses) == 0 { - return fmt.Errorf("no streams found for workflow %s in keyspace %s", workflow, keyspace) + if len(replStatus.ShardStatuses) == 0 { + return nil, fmt.Errorf("no streams found for workflow %s in keyspace %s", workflow, keyspace) } if err := dumpStreamListAsJSON(replStatus, wr); err != nil { - return err + return nil, err } - return nil + return replStatus, nil } func updateState(message, state string, cs []copyState, timeUpdated int64) string { @@ -401,7 +463,7 @@ func updateState(message, state string, cs []copyState, timeUpdated int64) strin return state } -func dumpStreamListAsJSON(replStatus *replicationStatusResult, wr *Wrangler) error { +func dumpStreamListAsJSON(replStatus *ReplicationStatusResult, wr *Wrangler) error { text, err := json.MarshalIndent(replStatus, "", "\t") if err != nil { return err diff --git a/go/vt/wrangler/vexec_test.go b/go/vt/wrangler/vexec_test.go index 8a85b11dc26..cb38d1570a2 100644 --- a/go/vt/wrangler/vexec_test.go +++ b/go/vt/wrangler/vexec_test.go @@ -188,75 +188,94 @@ func TestWorkflowListStreams(t *testing.T) { logger := logutil.NewMemoryLogger() wr := New(logger, env.topoServ, env.tmc) - err := wr.listStreams(ctx, workflow, keyspace) + _, err := wr.ShowWorkflow(ctx, workflow, keyspace) require.Nil(t, err) want := `{ "Workflow": "wrWorkflow", - "SourceKeyspace": "source", - "TargetKeyspace": "target", - "Statuses": { - "-80/zone1-0000000200": [ - { - "Shard": "-80", - "Tablet": "zone1-0000000200", - "ID": 1, - "Bls": { - "keyspace": "source", - "shard": "0", - "filter": { - "rules": [ - { - "match": "t1" - } - ] - } - }, - "Pos": "pos", - "StopPos": "", - "State": "Copying", - "MaxReplicationLag": 0, - "DBName": "vt_target", - "TimeUpdated": 1234, - "Message": "", - "CopyState": [ - { - "Table": "t", - "LastPK": "1" - } - ] - } - ], - "80-/zone1-0000000210": [ - { - "Shard": "80-", - "Tablet": "zone1-0000000210", - "ID": 1, - "Bls": { - "keyspace": "source", - "shard": "0", - "filter": { - "rules": [ - { - "match": "t1" - } - ] - } - }, - "Pos": "pos", - "StopPos": "", - "State": "Copying", - "MaxReplicationLag": 0, - "DBName": "vt_target", - "TimeUpdated": 1234, - "Message": "", - "CopyState": [ - { - "Table": "t", - "LastPK": "1" - } - ] - } + "SourceLocation": { + "Keyspace": "source", + "Shards": [ + "0" ] + }, + "TargetLocation": { + "Keyspace": "target", + "Shards": [ + "-80", + "80-" + ] + }, + "ShardStatuses": { + "-80/zone1-0000000200": { + "MasterReplicationStatuses": [ + { + "Shard": "-80", + "Tablet": "zone1-0000000200", + "ID": 1, + "Bls": { + "keyspace": "source", + "shard": "0", + "filter": { + "rules": [ + { + "match": "t1" + } + ] + } + }, + "Pos": "pos", + "StopPos": "", + "State": "Copying", + "MaxReplicationLag": 0, + "DBName": "vt_target", + "TimeUpdated": 1234, + "Message": "", + "CopyState": [ + { + "Table": "t", + "LastPK": "1" + } + ] + } + ], + "TabletControls": null, + "MasterIsServing": true + }, + "80-/zone1-0000000210": { + "MasterReplicationStatuses": [ + { + "Shard": "80-", + "Tablet": "zone1-0000000210", + "ID": 1, + "Bls": { + "keyspace": "source", + "shard": "0", + "filter": { + "rules": [ + { + "match": "t1" + } + ] + } + }, + "Pos": "pos", + "StopPos": "", + "State": "Copying", + "MaxReplicationLag": 0, + "DBName": "vt_target", + "TimeUpdated": 1234, + "Message": "", + "CopyState": [ + { + "Table": "t", + "LastPK": "1" + } + ] + } + ], + "TabletControls": null, + "MasterIsServing": true + } } }