diff --git a/go/vt/vtctl/grpcvtctldserver/server.go b/go/vt/vtctl/grpcvtctldserver/server.go index 883e2fc4892..ec7f05c6576 100644 --- a/go/vt/vtctl/grpcvtctldserver/server.go +++ b/go/vt/vtctl/grpcvtctldserver/server.go @@ -32,6 +32,7 @@ import ( "vitess.io/vitess/go/event" "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/sqlescape" + "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" @@ -921,6 +922,12 @@ func (s *VtctldServer) GetVSchema(ctx context.Context, req *vtctldatapb.GetVSche // GetWorkflows is part of the vtctlservicepb.VtctldServer interface. func (s *VtctldServer) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflowsRequest) (*vtctldatapb.GetWorkflowsResponse, error) { + span, ctx := trace.NewSpan(ctx, "VtctldServer.GetWorkflows") + defer span.Finish() + + span.Annotate("keyspace", req.Keyspace) + span.Annotate("active_only", req.ActiveOnly) + return s.ws.GetWorkflows(ctx, req) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index bd50ac581a3..397f9d30e75 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" @@ -262,6 +263,12 @@ func (s *Server) GetCellsWithTableReadsSwitched( // It has the same signature as the vtctlservicepb.VtctldServer's GetWorkflows // rpc, and grpcvtctldserver delegates to this function. func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflowsRequest) (*vtctldatapb.GetWorkflowsResponse, error) { + span, ctx := trace.NewSpan(ctx, "workflow.Server.GetWorkflows") + defer span.Finish() + + span.Annotate("keyspace", req.Keyspace) + span.Annotate("active_only", req.ActiveOnly) + where := "" if req.ActiveOnly { where = "WHERE state <> 'Stopped'" @@ -307,6 +314,15 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows // - targetShardsByWorkflow[workflow.Name] != nil // - workflow.ShardStatuses != nil scanWorkflow := func(ctx context.Context, workflow *vtctldatapb.Workflow, row []sqltypes.Value, tablet *topo.TabletInfo) error { + span, ctx := trace.NewSpan(ctx, "workflow.Server.scanWorkflow") + defer span.Finish() + + span.Annotate("keyspace", req.Keyspace) + span.Annotate("shard", tablet.Shard) + span.Annotate("active_only", req.ActiveOnly) + span.Annotate("workflow", workflow.Name) + span.Annotate("tablet_alias", tablet.AliasString()) + id, err := evalengine.ToInt64(row[0]) if err != nil { return err @@ -357,6 +373,8 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows return err } + span.Annotate("num_copy_states", len(stream.CopyStates)) + switch { case strings.Contains(strings.ToLower(stream.Message), "error"): stream.State = "Error" @@ -499,6 +517,14 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows } func (s *Server) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletInfo, id int64) ([]*vtctldatapb.Workflow_Stream_CopyState, error) { + span, ctx := trace.NewSpan(ctx, "workflow.Server.getWorkflowCopyStates") + defer span.Finish() + + span.Annotate("keyspace", tablet.Keyspace) + span.Annotate("shard", tablet.Shard) + span.Annotate("tablet_alias", tablet.AliasString()) + span.Annotate("vrepl_id", id) + query := fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id = %d", id) qr, err := s.tmc.VReplicationExec(ctx, tablet.Tablet, query) if err != nil {