Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 49 additions & 17 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
Expand Down Expand Up @@ -301,6 +302,7 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
return nil, err
}

m := sync.Mutex{} // guards access to the following maps during concurrent calls to scanWorkflow
workflowsMap := make(map[string]*vtctldatapb.Workflow, len(results))
sourceKeyspaceByWorkflow := make(map[string]string, len(results))
sourceShardsByWorkflow := make(map[string]sets.String, len(results))
Expand Down Expand Up @@ -386,6 +388,15 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
stream.State = "Lagging"
}

// At this point, we're going to start modifying the maps defined
// outside this function, as well as fields on the passed-in Workflow
// pointer. Since we're running concurrently, take the lock.
//
// We've already made the remote call to getCopyStates, so synchronizing
// here shouldn't hurt too badly, performance-wise.
m.Lock()
defer m.Unlock()

shardStreamKey := fmt.Sprintf("%s/%s", tablet.Shard, tablet.AliasString())
shardStream, ok := workflow.ShardStreams[shardStreamKey]
if !ok {
Expand Down Expand Up @@ -436,6 +447,11 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
return nil
}

var (
scanWorkflowWg sync.WaitGroup
scanWorkflowErrors concurrency.FirstErrorRecorder
)

for tablet, result := range results {
qr := sqltypes.Proto3ToResult(result)

Expand Down Expand Up @@ -464,22 +480,23 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
targetShardsByWorkflow[workflowName] = sets.NewString()
}

if err := scanWorkflow(ctx, workflow, row, tablet); err != nil {
return nil, err
}

// Sort shard streams by stream_id ASC, to support an optimization
// in fetchStreamLogs below.
for _, shardStreams := range workflow.ShardStreams {
sort.Slice(shardStreams.Streams, func(i, j int) bool {
return shardStreams.Streams[i].Id < shardStreams.Streams[j].Id
})
}
scanWorkflowWg.Add(1)
go func(ctx context.Context, workflow *vtctldatapb.Workflow, row []sqltypes.Value, tablet *topo.TabletInfo) {
defer scanWorkflowWg.Done()
if err := scanWorkflow(ctx, workflow, row, tablet); err != nil {
scanWorkflowErrors.RecordError(err)
}
}(ctx, workflow, row, tablet)
}
}

scanWorkflowWg.Wait()
if scanWorkflowErrors.HasErrors() {
return nil, scanWorkflowErrors.Error()
}

var (
wg sync.WaitGroup
fetchLogsWG sync.WaitGroup
vrepLogQuery = strings.TrimSpace(`
SELECT
id,
Expand All @@ -499,7 +516,11 @@ ORDER BY
)

fetchStreamLogs := func(ctx context.Context, workflow *vtctldatapb.Workflow) {
defer wg.Done()
span, ctx := trace.NewSpan(ctx, "workflow.Server.scanWorkflow")
defer span.Finish()

span.Annotate("keyspace", req.Keyspace)
span.Annotate("workflow", workflow.Name)

results, err := vx.WithWorkflow(workflow.Name).QueryContext(ctx, vrepLogQuery)
if err != nil {
Expand Down Expand Up @@ -651,15 +672,26 @@ ORDER BY

workflow.MaxVReplicationLag = int64(maxVReplicationLag)

// Fetch logs for all streams associated with this workflow in the background.
wg.Add(1)
go fetchStreamLogs(ctx, workflow)
// Sort shard streams by stream_id ASC, to support an optimization
// in fetchStreamLogs below.
for _, shardStreams := range workflow.ShardStreams {
sort.Slice(shardStreams.Streams, func(i, j int) bool {
return shardStreams.Streams[i].Id < shardStreams.Streams[j].Id
})
}

workflows = append(workflows, workflow)

// Fetch logs for all streams associated with this workflow in the background.
fetchLogsWG.Add(1)
go func(ctx context.Context, workflow *vtctldatapb.Workflow) {
defer fetchLogsWG.Done()
fetchStreamLogs(ctx, workflow)
}(ctx, workflow)
}

// Wait for all the log fetchers to finish.
wg.Wait()
fetchLogsWG.Wait()

return &vtctldatapb.GetWorkflowsResponse{
Workflows: workflows,
Expand Down