Skip to content
8 changes: 6 additions & 2 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ var commands = []commandGroup{
"Workflow", []command{
{"Workflow", commandWorkflow,
"<ks.workflow> <action> --dry-run",
"Start/Stop/Delete/List/ListAll Workflow on all target tablets in workflow. Example: Workflow merchant.morders Start",
"Start/Stop/Delete/Show/ListAll Workflow on all target tablets in workflow. Example: Workflow merchant.morders Start",
},
},
},
Expand Down Expand Up @@ -2950,6 +2950,10 @@ func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.
}
keyspace := subFlags.Arg(0)
action := strings.ToLower(subFlags.Arg(1))
// Note: List is deprecated and replaced by show.
if action == "list" {
action = "show"
}
var workflow string
var err error
if action != "listall" {
Expand All @@ -2967,7 +2971,7 @@ func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.
if err != nil {
return err
}
if action == "list" || action == "listall" {
if action == "show" || action == "listall" {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not print anything out for show / listall?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rohit-nayak-ps this doesn't look right. However, it is unrelated to the changes in this PR so I'm going to move this forward.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The printing happens internally actually. From what I can tell this is a way of handling split code paths from subcommands.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As Peter replied, show/listall follows a separate path and outputs a json doc. Otherwise we just return rows affected (result of vexec for stop/start/delete workflow)

return nil
}
if len(results) == 0 {
Expand Down
136 changes: 99 additions & 37 deletions go/vt/wrangler/vexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"

"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"

"github.com/golang/protobuf/proto"
"github.com/olekukonko/tablewriter"
Expand Down Expand Up @@ -103,7 +104,7 @@ func (vx *vexec) outputDryRunInfo(wr *Wrangler) error {
table.SetHeader([]string{"Tablet", "ID", "BinLogSource", "State", "DBName", "Current GTID", "MaxReplicationLag"})
for _, master := range vx.masters {
key := fmt.Sprintf("%s/%s", master.Shard, master.AliasString())
for _, stream := range rsr.Statuses[key] {
for _, stream := range rsr.ShardStatuses[key].MasterReplicationStatuses {
table.Append([]string{key, fmt.Sprintf("%d", stream.ID), stream.Bls.String(), stream.State, stream.DBName, stream.Pos, fmt.Sprintf("%d", stream.MaxReplicationLag)})
}
}
Expand Down Expand Up @@ -192,8 +193,9 @@ func (vx *vexec) getMasterForShard(shard string) (*topo.TabletInfo, error) {

// WorkflowAction can start/stop/delete or list streams in _vt.vreplication on all masters in the target keyspace of the workflow.
func (wr *Wrangler) WorkflowAction(ctx context.Context, workflow, keyspace, action string, dryRun bool) (map[*topo.TabletInfo]*sqltypes.Result, error) {
if action == "list" {
return nil, wr.listStreams(ctx, workflow, keyspace)
if action == "show" {
_, err := wr.ShowWorkflow(ctx, workflow, keyspace)
return nil, err
} else if action == "listall" {
_, err := wr.ListAllWorkflows(ctx, keyspace)
return nil, err
Expand Down Expand Up @@ -244,36 +246,70 @@ func (wr *Wrangler) execWorkflowAction(ctx context.Context, workflow, keyspace,
return results, err
}

type replicationStatusResult struct {
Workflow string
SourceKeyspace string
TargetKeyspace string
// ReplicationStatusResult represents the result of trying to get the replication status for a given workflow.
type ReplicationStatusResult struct {
// Workflow represents the name of the workflow relevant to the related replication statuses.
Workflow string
// SourceLocation represents the keyspace and shards that we are vreplicating from.
SourceLocation ReplicationLocation
// TargetLocation represents the keyspace and shards that we are vreplicating into.
TargetLocation ReplicationLocation

// Statuses is a map of <shard>/<master tablet alias> : ShardReplicationStatus (for the given shard).
ShardStatuses map[string]*ShardReplicationStatus
}

Statuses map[string][]*replicationStatus
// ReplicationLocation represents a location that data is either replicating from, or replicating into.
type ReplicationLocation struct {
Keyspace string
Shards []string
}

// ShardReplicationStatus holds relevant vreplication related info for the given shard.
type ShardReplicationStatus struct {
// MasterReplicationStatuses represents all of the replication statuses for the master tablets in the given shard.
MasterReplicationStatuses []*ReplicationStatus
// TabletControls represents the tablet controls for the tablets in the shard.
TabletControls []*topodatapb.Shard_TabletControl
// MasterIsServing indicates whether the master tablet of the given shard is currently serving write traffic.
MasterIsServing bool
}

type copyState struct {
Table string
LastPK string
}

type replicationStatus struct {
Shard string
Tablet string
ID int64
Bls binlogdatapb.BinlogSource
Pos string
StopPos string
State string
// ReplicationStatus includes data from the _vt.vreplication table, along with other useful relevant data.
type ReplicationStatus struct {
// Shard represents the relevant shard name.
Shard string
// Tablet is the tablet alias that the ReplicationStatus came from.
Tablet string
// ID represents the id column from the _vt.vreplication table.
ID int64
// Bls represents the BinlogSource.
Bls binlogdatapb.BinlogSource
// Pos represents the pos column from the _vt.vreplication table.
Pos string
// StopPos represents the stop_pos column from the _vt.vreplication table.
StopPos string
// State represents the state column from the _vt.vreplication table.
State string
// MaxReplicationLag represents the max_replication_lag column from the _vt.vreplication table.
MaxReplicationLag int64
DBName string
TimeUpdated int64
Message string

// DbName represents the db_name column from the _vt.vreplication table.
DBName string
// TimeUpdated represents the time_updated column from the _vt.vreplication table.
TimeUpdated int64
// Message represents the message column from the _vt.vreplication table.
Message string

// CopyState represents the rows from the _vt.copy_state table.
CopyState []copyState
}

func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqltypes.Value, master *topo.TabletInfo) (*replicationStatus, string, error) {
func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqltypes.Value, master *topo.TabletInfo) (*ReplicationStatus, string, error) {
var err error
var id, maxReplicationLag, timeUpdated int64
var state, dbName, pos, stopPos, message string
Expand All @@ -298,7 +334,7 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty
return nil, "", err
}
message = row[8].ToString()
status := &replicationStatus{
status := &ReplicationStatus{
Shard: master.Shard,
Tablet: master.AliasString(),
ID: id,
Expand All @@ -320,32 +356,57 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty
return status, bls.Keyspace, nil
}

func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) (*replicationStatusResult, error) {
var rsr replicationStatusResult
rsr.Statuses = make(map[string][]*replicationStatus)
func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) (*ReplicationStatusResult, error) {
var rsr ReplicationStatusResult
rsr.ShardStatuses = make(map[string]*ShardReplicationStatus)
rsr.Workflow = workflow
rsr.TargetKeyspace = keyspace
var results map[*topo.TabletInfo]*querypb.QueryResult
query := "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, message from _vt.vreplication"
results, err := wr.runVexec(ctx, workflow, keyspace, query, false)
if err != nil {
return nil, err
}

// We set a topo timeout since we contact topo for the shard record.
ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer cancel()
var sourceKeyspace string
sourceShards := sets.NewString()
targetShards := sets.NewString()
for master, result := range results {
var rsrStatus []*replicationStatus
var rsrStatus []*ReplicationStatus
qr := sqltypes.Proto3ToResult(result)
for _, row := range qr.Rows {
status, sourceKeyspace, err := wr.getReplicationStatusFromRow(ctx, row, master)
status, sk, err := wr.getReplicationStatusFromRow(ctx, row, master)
fmt.Printf("getReplicationStatusFromRow status for master %s is %v\n", master.AliasString(), status)
if err != nil {
return nil, err
}
rsr.SourceKeyspace = sourceKeyspace
sourceKeyspace = sk
sourceShards.Insert(status.Bls.Shard)

rsrStatus = append(rsrStatus, status)
}
rsr.Statuses[fmt.Sprintf("%s/%s", master.Shard, master.AliasString())] = rsrStatus
si, err := wr.ts.GetShard(ctx, keyspace, master.Shard)
if err != nil {
return nil, err
}
targetShards.Insert(si.ShardName())
rsr.ShardStatuses[fmt.Sprintf("%s/%s", master.Shard, master.AliasString())] = &ShardReplicationStatus{
MasterReplicationStatuses: rsrStatus,
TabletControls: si.TabletControls,
MasterIsServing: si.IsMasterServing,
}
}
rsr.SourceLocation = ReplicationLocation{
Keyspace: sourceKeyspace,
Shards: sourceShards.List(),
}
rsr.TargetLocation = ReplicationLocation{
Keyspace: keyspace,
Shards: targetShards.List(),
}

return &rsr, nil
}

Expand Down Expand Up @@ -374,20 +435,21 @@ func (wr *Wrangler) ListAllWorkflows(ctx context.Context, keyspace string) ([]st
return workflows, nil
}

func (wr *Wrangler) listStreams(ctx context.Context, workflow, keyspace string) error {
// ShowWorkflow will return all of the relevant replication related information for the given workflow.
func (wr *Wrangler) ShowWorkflow(ctx context.Context, workflow, keyspace string) (*ReplicationStatusResult, error) {
replStatus, err := wr.getStreams(ctx, workflow, keyspace)
if err != nil {
return err
return nil, err
}
if len(replStatus.Statuses) == 0 {
return fmt.Errorf("no streams found for workflow %s in keyspace %s", workflow, keyspace)
if len(replStatus.ShardStatuses) == 0 {
return nil, fmt.Errorf("no streams found for workflow %s in keyspace %s", workflow, keyspace)
}

if err := dumpStreamListAsJSON(replStatus, wr); err != nil {
return err
return nil, err
}

return nil
return replStatus, nil
}

func updateState(message, state string, cs []copyState, timeUpdated int64) string {
Expand All @@ -401,7 +463,7 @@ func updateState(message, state string, cs []copyState, timeUpdated int64) strin
return state
}

func dumpStreamListAsJSON(replStatus *replicationStatusResult, wr *Wrangler) error {
func dumpStreamListAsJSON(replStatus *ReplicationStatusResult, wr *Wrangler) error {
text, err := json.MarshalIndent(replStatus, "", "\t")
if err != nil {
return err
Expand Down
Loading