Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1595,11 +1595,11 @@ func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession,
}

// ExecuteMultiShard implements the IExecutor interface
func (e *Executor) ExecuteMultiShard(ctx context.Context, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, tabletType topodatapb.TabletType, session *SafeSession, notInTransaction bool, autocommit bool) (qr *sqltypes.Result, errs []error) {
return e.scatterConn.ExecuteMultiShard(ctx, rss, queries, tabletType, session, notInTransaction, autocommit)
func (e *Executor) ExecuteMultiShard(ctx context.Context, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, notInTransaction bool, autocommit bool) (qr *sqltypes.Result, errs []error) {
return e.scatterConn.ExecuteMultiShard(ctx, rss, queries, session, notInTransaction, autocommit)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you can drop the tabletType from the outer function also. Is there a reason to keep it?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

// StreamExecuteMulti implements the IExecutor interface
func (e *Executor) StreamExecuteMulti(ctx context.Context, query string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, tabletType topodatapb.TabletType, options *querypb.ExecuteOptions, callback func(reply *sqltypes.Result) error) error {
return e.scatterConn.StreamExecuteMulti(ctx, query, rss, vars, tabletType, options, callback)
func (e *Executor) StreamExecuteMulti(ctx context.Context, query string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, options *querypb.ExecuteOptions, callback func(reply *sqltypes.Result) error) error {
return e.scatterConn.StreamExecuteMulti(ctx, query, rss, vars, options, callback)
}
2 changes: 0 additions & 2 deletions go/vt/vtgate/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func (res *Resolver) Execute(
sql,
bindVars,
rss,
tabletType,
session,
notInTransaction,
options,
Expand Down Expand Up @@ -134,7 +133,6 @@ func (res *Resolver) StreamExecute(
sql,
bindVars,
rss,
tabletType,
options,
callback)
return err
Expand Down
38 changes: 13 additions & 25 deletions go/vt/vtgate/scatter_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/srvtopo"
Expand Down Expand Up @@ -141,7 +140,6 @@ func (stc *ScatterConn) Execute(
query string,
bindVars map[string]*querypb.BindVariable,
rss []*srvtopo.ResolvedShard,
tabletType topodatapb.TabletType,
session *SafeSession,
notInTransaction bool,
options *querypb.ExecuteOptions,
Expand All @@ -156,7 +154,6 @@ func (stc *ScatterConn) Execute(
ctx,
"Execute",
rss,
tabletType,
session,
notInTransaction,
func(rs *srvtopo.ResolvedShard, i int, shouldBegin bool, transactionID int64) (int64, error) {
Expand Down Expand Up @@ -204,7 +201,6 @@ func (stc *ScatterConn) ExecuteMultiShard(
ctx context.Context,
rss []*srvtopo.ResolvedShard,
queries []*querypb.BoundQuery,
tabletType topodatapb.TabletType,
session *SafeSession,
notInTransaction bool,
autocommit bool,
Expand All @@ -218,7 +214,6 @@ func (stc *ScatterConn) ExecuteMultiShard(
ctx,
"Execute",
rss,
tabletType,
session,
notInTransaction,
func(rs *srvtopo.ResolvedShard, i int, shouldBegin bool, transactionID int64) (int64, error) {
Expand Down Expand Up @@ -301,7 +296,6 @@ func (stc *ScatterConn) StreamExecute(
query string,
bindVars map[string]*querypb.BindVariable,
rss []*srvtopo.ResolvedShard,
tabletType topodatapb.TabletType,
options *querypb.ExecuteOptions,
callback func(reply *sqltypes.Result) error,
) error {
Expand All @@ -310,7 +304,7 @@ func (stc *ScatterConn) StreamExecute(
var mu sync.Mutex
fieldSent := false

allErrors := stc.multiGo(ctx, "StreamExecute", rss, tabletType, func(rs *srvtopo.ResolvedShard, i int) error {
allErrors := stc.multiGo("StreamExecute", rss, func(rs *srvtopo.ResolvedShard, i int) error {
return rs.QueryService.StreamExecute(ctx, rs.Target, query, bindVars, 0, options, func(qr *sqltypes.Result) error {
return stc.processOneStreamingResult(&mu, &fieldSent, qr, callback)
})
Expand All @@ -328,15 +322,14 @@ func (stc *ScatterConn) StreamExecuteMulti(
query string,
rss []*srvtopo.ResolvedShard,
bindVars []map[string]*querypb.BindVariable,
tabletType topodatapb.TabletType,
options *querypb.ExecuteOptions,
callback func(reply *sqltypes.Result) error,
) error {
// mu protects fieldSent, callback and replyErr
var mu sync.Mutex
fieldSent := false

allErrors := stc.multiGo(ctx, "StreamExecute", rss, tabletType, func(rs *srvtopo.ResolvedShard, i int) error {
allErrors := stc.multiGo("StreamExecute", rss, func(rs *srvtopo.ResolvedShard, i int) error {
return rs.QueryService.StreamExecute(ctx, rs.Target, query, bindVars[i], 0, options, func(qr *sqltypes.Result) error {
return stc.processOneStreamingResult(&mu, &fieldSent, qr, callback)
})
Expand Down Expand Up @@ -390,7 +383,7 @@ func (stc *ScatterConn) MessageStream(ctx context.Context, rss []*srvtopo.Resolv
var mu sync.Mutex
fieldSent := false
lastErrors := newTimeTracker()
allErrors := stc.multiGo(ctx, "MessageStream", rss, topodatapb.TabletType_MASTER, func(rs *srvtopo.ResolvedShard, i int) error {
allErrors := stc.multiGo("MessageStream", rss, func(rs *srvtopo.ResolvedShard, i int) error {
// This loop handles the case where a reparent happens, which can cause
// an individual stream to end. If we don't succeed on the retries for
// messageStreamGracePeriod, we abort and return an error.
Expand Down Expand Up @@ -463,10 +456,8 @@ func (stc *ScatterConn) GetHealthCheckCacheStatus() discovery.TabletsCacheStatus
// shards in parallel. This does not handle any transaction state.
// The action function must match the shardActionFunc2 signature.
func (stc *ScatterConn) multiGo(
ctx context.Context,
name string,
rss []*srvtopo.ResolvedShard,
tabletType topodatapb.TabletType,
action shardActionFunc,
) (allErrors *concurrency.AllErrorRecorder) {
allErrors = new(concurrency.AllErrorRecorder)
Expand Down Expand Up @@ -515,7 +506,6 @@ func (stc *ScatterConn) multiGoTransaction(
ctx context.Context,
name string,
rss []*srvtopo.ResolvedShard,
tabletType topodatapb.TabletType,
session *SafeSession,
notInTransaction bool,
action shardActionTransactionFunc,
Expand Down Expand Up @@ -544,25 +534,23 @@ func (stc *ScatterConn) multiGoTransaction(
}
}

var wg sync.WaitGroup
if numShards == 1 {
// only one shard, do it synchronously.
for i, rs := range rss {
oneShard(rs, i)
goto end
}
} else {
var wg sync.WaitGroup
for i, rs := range rss {
wg.Add(1)
go func(rs *srvtopo.ResolvedShard, i int) {
defer wg.Done()
oneShard(rs, i)
}(rs, i)
}
wg.Wait()
}

for i, rs := range rss {
wg.Add(1)
go func(rs *srvtopo.ResolvedShard, i int) {
defer wg.Done()
oneShard(rs, i)
}(rs, i)
}
wg.Wait()

end:
if session.MustRollback() {
stc.txConn.Rollback(ctx, session)
}
Expand Down
Loading