Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package workflow
import (
"context"
"errors"
"flag"
"fmt"
"sort"
"strings"
Expand All @@ -29,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/key"
Expand All @@ -39,11 +41,17 @@ import (
"vitess.io/vitess/go/vt/vttablet/tmclient"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
"vitess.io/vitess/go/vt/proto/vttime"
)

var (
vExecPoolSize = flag.Uint("workflow_server_vexec_pool_size", 1000, "maximum number of concurrent vexec queries to allow")
vExecPoolTimeout = flag.Duration("workflow_server_vexec_pool_default_timeout", time.Millisecond*50, "default timeout to wait acquiring a connection from the vexec pool. zero implies no timeout")
Comment on lines +51 to +52

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should update release notes with this and if we expect to roll out limits per call (imo) we should have a better approach than a flag for concurrency-per-call

)

var (
// ErrInvalidWorkflow is a catchall error type for conditions that should be
// impossible when operating on a workflow.
Expand All @@ -56,6 +64,10 @@ var (
// target keyspaces across different shard primaries. This should be
// impossible.
ErrMultipleTargetKeyspaces = errors.New("multiple target keyspaces for a single workflow")

// ErrVExecConnTimeout is returned if a call to acquire a server's vexecPool
// hits a deadline exceeded
ErrVExecConnTimeout = errors.New("timeout exceeded getting vexec conn")
)

// Server provides an API to work with Vitess workflows, like vreplication
Expand All @@ -66,16 +78,18 @@ var (
// actions on vreplication workflows, and schema migration workflows entirely,
// are not yet supported, but planned.
type Server struct {
ts *topo.Server
tmc tmclient.TabletManagerClient
ts *topo.Server
tmc tmclient.TabletManagerClient
vexecPool *sync2.Semaphore
}

// NewServer returns a new server instance with the given topo.Server and
// TabletManagerClient.
func NewServer(ts *topo.Server, tmc tmclient.TabletManagerClient) *Server {
return &Server{
ts: ts,
tmc: tmc,
ts: ts,
tmc: tmc,
vexecPool: sync2.NewSemaphore(int(*vExecPoolSize), *vExecPoolTimeout),
}
}

Expand All @@ -92,6 +106,8 @@ func (s *Server) CheckReshardingJournalExistsOnTablet(ctx context.Context, table
exists bool
)

// Note we do not use the s.vexecPool semaphore here, to maintain consistent
// behavior with the existing wrangler APIs this function was migrated from.
query := fmt.Sprintf("select val from _vt.resharding_journal where id=%v", migrationID)
p3qr, err := s.tmc.VReplicationExec(ctx, tablet, query)
if err != nil {
Expand Down Expand Up @@ -297,7 +313,13 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
)

vx := vexec.NewVExec(req.Keyspace, "", s.ts, s.tmc)

if !s.vexecPool.AcquireContext(ctx) {
return nil, ErrVExecConnTimeout
}
Comment on lines 315 to +319

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tioli:

func NewVExecThrottled(..., pool *sync2.Semaphore) *VExec

and then moving other vexec endpoints over to it is a tiiiiiiny bit less effort; also you don't have to remember to manage locking around each query

results, err := vx.QueryContext(ctx, query)
s.vexecPool.Release()

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -522,7 +544,18 @@ ORDER BY
span.Annotate("keyspace", req.Keyspace)
span.Annotate("workflow", workflow.Name)

results, err := vx.WithWorkflow(workflow.Name).QueryContext(ctx, vrepLogQuery)
var (
results map[*topo.TabletInfo]*querypb.QueryResult
err error
)

if !s.vexecPool.AcquireContext(ctx) {
err = ErrVExecConnTimeout
} else {
results, err = vx.WithWorkflow(workflow.Name).QueryContext(ctx, vrepLogQuery)
s.vexecPool.Release()
}

if err != nil {
// Note that we do not return here. If there are any query results
// in the map (i.e. some tablets returned successfully), we will
Expand Down Expand Up @@ -708,7 +741,14 @@ func (s *Server) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletI
span.Annotate("vrepl_id", id)

query := fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id = %d", id)

if !s.vexecPool.AcquireContext(ctx) {
return nil, ErrVExecConnTimeout
}

qr, err := s.tmc.VReplicationExec(ctx, tablet.Tablet, query)
s.vexecPool.Release()

if err != nil {
return nil, err
}
Expand Down