diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 2f94cbcaca3..c6b90478f19 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -19,6 +19,7 @@ package workflow import ( "context" "errors" + "flag" "fmt" "sort" "strings" @@ -29,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/key" @@ -39,11 +41,17 @@ import ( "vitess.io/vitess/go/vt/vttablet/tmclient" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" "vitess.io/vitess/go/vt/proto/vttime" ) +var ( + vExecPoolSize = flag.Uint("workflow_server_vexec_pool_size", 1000, "maximum number of concurrent vexec queries to allow") + vExecPoolTimeout = flag.Duration("workflow_server_vexec_pool_default_timeout", time.Millisecond*50, "default timeout to wait acquiring a connection from the vexec pool. zero implies no timeout") +) + var ( // ErrInvalidWorkflow is a catchall error type for conditions that should be // impossible when operating on a workflow. @@ -56,6 +64,10 @@ var ( // target keyspaces across different shard primaries. This should be // impossible. ErrMultipleTargetKeyspaces = errors.New("multiple target keyspaces for a single workflow") + + // ErrVExecConnTimeout is returned if a call to acquire a server's vexecPool + // hits a deadline exceeded + ErrVExecConnTimeout = errors.New("timeout exceeded getting vexec conn") ) // Server provides an API to work with Vitess workflows, like vreplication @@ -66,16 +78,18 @@ var ( // actions on vreplication workflows, and schema migration workflows entirely, // are not yet supported, but planned. type Server struct { - ts *topo.Server - tmc tmclient.TabletManagerClient + ts *topo.Server + tmc tmclient.TabletManagerClient + vexecPool *sync2.Semaphore } // NewServer returns a new server instance with the given topo.Server and // TabletManagerClient. func NewServer(ts *topo.Server, tmc tmclient.TabletManagerClient) *Server { return &Server{ - ts: ts, - tmc: tmc, + ts: ts, + tmc: tmc, + vexecPool: sync2.NewSemaphore(int(*vExecPoolSize), *vExecPoolTimeout), } } @@ -92,6 +106,8 @@ func (s *Server) CheckReshardingJournalExistsOnTablet(ctx context.Context, table exists bool ) + // Note we do not use the s.vexecPool semaphore here, to maintain consistent + // behavior with the existing wrangler APIs this function was migrated from. query := fmt.Sprintf("select val from _vt.resharding_journal where id=%v", migrationID) p3qr, err := s.tmc.VReplicationExec(ctx, tablet, query) if err != nil { @@ -297,7 +313,13 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows ) vx := vexec.NewVExec(req.Keyspace, "", s.ts, s.tmc) + + if !s.vexecPool.AcquireContext(ctx) { + return nil, ErrVExecConnTimeout + } results, err := vx.QueryContext(ctx, query) + s.vexecPool.Release() + if err != nil { return nil, err } @@ -522,7 +544,18 @@ ORDER BY span.Annotate("keyspace", req.Keyspace) span.Annotate("workflow", workflow.Name) - results, err := vx.WithWorkflow(workflow.Name).QueryContext(ctx, vrepLogQuery) + var ( + results map[*topo.TabletInfo]*querypb.QueryResult + err error + ) + + if !s.vexecPool.AcquireContext(ctx) { + err = ErrVExecConnTimeout + } else { + results, err = vx.WithWorkflow(workflow.Name).QueryContext(ctx, vrepLogQuery) + s.vexecPool.Release() + } + if err != nil { // Note that we do not return here. If there are any query results // in the map (i.e. some tablets returned successfully), we will @@ -708,7 +741,14 @@ func (s *Server) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletI span.Annotate("vrepl_id", id) query := fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id = %d", id) + + if !s.vexecPool.AcquireContext(ctx) { + return nil, ErrVExecConnTimeout + } + qr, err := s.tmc.VReplicationExec(ctx, tablet.Tablet, query) + s.vexecPool.Release() + if err != nil { return nil, err }