Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go/test/endtoend/vtgate/get_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,9 @@ func TestLocksWithTxFailure(t *testing.T) {
_, err = conn1.ExecuteFetch("commit", 1, true)
require.Error(t, err)

// in the second connection, lock acquisition succeeds.
assertMatches(t, conn2, `select get_lock('lock', 2)`, `[[INT64(1)]]`)
assertMatches(t, conn2, `select release_lock('lock')`, `[[INT64(1)]]`)
// in the second connection, lock acquisition should fail as first connection still hold the lock though the transaction has failed.
assertMatches(t, conn2, `select get_lock('lock', 2)`, `[[INT64(0)]]`)
assertMatches(t, conn2, `select release_lock('lock')`, `[[INT64(0)]]`)
}

func TestLocksWithTxOngoingAndReleaseLock(t *testing.T) {
Expand Down
168 changes: 89 additions & 79 deletions go/vt/proto/vtgate/vtgate.pb.go

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type noopVCursor struct {
ctx context.Context
}

func (t noopVCursor) ExecuteLock(rs *srvtopo.ResolvedShard, query *querypb.BoundQuery) (*sqltypes.Result, error) {
panic("implement me")
}

func (t noopVCursor) NeedsReservedConn() {
}

Expand Down Expand Up @@ -141,7 +145,6 @@ func (t noopVCursor) ResolveDestinations(keyspace string, ids []*querypb.Value,
}

var _ VCursor = (*loggingVCursor)(nil)

var _ SessionActions = (*loggingVCursor)(nil)

// loggingVCursor logs requests and allows you to verify
Expand Down
102 changes: 102 additions & 0 deletions go/vt/vtgate/engine/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
Copyright 2020 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package engine

import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)

var _ Primitive = (*Lock)(nil)

//Lock primitive will execute sql containing lock functions
type Lock struct {
// Keyspace specifies the keyspace to send the query to.
Keyspace *vindexes.Keyspace

// TargetDestination specifies an explicit target destination to send the query to.
TargetDestination key.Destination

// Query specifies the query to be executed.
Query string

noInputs

noTxNeeded
}

// RouteType is part of the Primitive interface
func (l *Lock) RouteType() string {
return "lock"
}

// GetKeyspaceName is part of the Primitive interface
func (l *Lock) GetKeyspaceName() string {
return l.Keyspace.Name
}

// GetTableName is part of the Primitive interface
func (l *Lock) GetTableName() string {
return "dual"
}

// Execute is part of the Primitive interface
func (l *Lock) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
rss, _, err := vcursor.ResolveDestinations(l.Keyspace.Name, nil, []key.Destination{l.TargetDestination})
if err != nil {
return nil, err
}
if len(rss) != 1 {
return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "lock query cannot be routed to vttablet: %v", rss)
}

query := &querypb.BoundQuery{
Sql: l.Query,
BindVariables: bindVars,
}
return vcursor.ExecuteLock(rss[0], query)
}

// StreamExecute is part of the Primitive interface
func (l *Lock) StreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
qr, err := l.Execute(vcursor, bindVars, wantfields)
if err != nil {
return err
}
return callback(qr)
}

// GetFields is part of the Primitive interface
func (l *Lock) GetFields(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
return nil, vterrors.New(vtrpc.Code_UNIMPLEMENTED, "not implements in lock primitive")
}

func (l *Lock) description() PrimitiveDescription {
other := map[string]interface{}{
"Query": l.Query,
}
return PrimitiveDescription{
OperatorType: "Lock",
Keyspace: l.Keyspace,
TargetDestination: l.TargetDestination,
Other: other,
}
}
2 changes: 2 additions & 0 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ type (
ExecuteVSchema(keyspace string, vschemaDDL *sqlparser.DDL) error

Session() SessionActions

ExecuteLock(rs *srvtopo.ResolvedShard, query *querypb.BoundQuery) (*sqltypes.Result, error)
}

//SessionActions gives primitives ability to interact with the session state
Expand Down
78 changes: 0 additions & 78 deletions go/vt/vtgate/engine/reserve.go

This file was deleted.

8 changes: 7 additions & 1 deletion go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ type Executor struct {
var executorOnce sync.Once

const pathQueryPlans = "/debug/query_plans"

const pathScatterStats = "/debug/scatter_stats"
const pathVSchema = "/debug/vschema"

Expand Down Expand Up @@ -343,7 +344,7 @@ func (e *Executor) handleSavepoint(ctx context.Context, safeSession *SafeSession
// CloseSession releases the current connection, which rollbacks open transactions and closes reserved connections.
// It is called then the MySQL servers closes the connection to its client.
func (e *Executor) CloseSession(ctx context.Context, safeSession *SafeSession) error {
return e.txConn.Release(ctx, safeSession)
return e.txConn.ReleaseAll(ctx, safeSession)
}

func (e *Executor) handleSet(ctx context.Context, safeSession *SafeSession, sql string, logStats *LogStats) (*sqltypes.Result, error) {
Expand Down Expand Up @@ -1618,3 +1619,8 @@ func (e *Executor) ExecuteMultiShard(ctx context.Context, rss []*srvtopo.Resolve
func (e *Executor) StreamExecuteMulti(ctx context.Context, query string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, options *querypb.ExecuteOptions, callback func(reply *sqltypes.Result) error) error {
return e.scatterConn.StreamExecuteMulti(ctx, query, rss, vars, options, callback)
}

//ExecuteLock implments the IExecutor interface
func (e *Executor) ExecuteLock(ctx context.Context, rs *srvtopo.ResolvedShard, query *querypb.BoundQuery, session *SafeSession) (*sqltypes.Result, error) {
return e.scatterConn.ExecuteLock(ctx, rs, query, session)
}
22 changes: 14 additions & 8 deletions go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2253,15 +2253,21 @@ func TestSelectLock(t *testing.T) {
BindVariables: map[string]*querypb.BindVariable{},
}}
wantSession := &vtgatepb.Session{
InTransaction: true,
InReservedConn: true,
ShardSessions: []*vtgatepb.Session_ShardSession{
{
Target: &querypb.Target{Keyspace: "TestExecutor", Shard: "-20", TabletType: topodatapb.TabletType_MASTER},
TabletAlias: sbc1.Tablet().Alias,
TransactionId: 12345,
ReservedId: 12345,
InTransaction: true,
ShardSessions: []*vtgatepb.Session_ShardSession{{
Target: &querypb.Target{
Keyspace: "TestExecutor",
Shard: "-20",
TabletType: topodatapb.TabletType_MASTER,
},
TransactionId: 12345,
TabletAlias: sbc1.Tablet().Alias,
}},
LockSession: &vtgatepb.Session_ShardSession{

Target: &querypb.Target{Keyspace: "TestExecutor", Shard: "-20", TabletType: topodatapb.TabletType_MASTER},
TabletAlias: sbc1.Tablet().Alias,
ReservedId: 1,
},
FoundRows: 1,
RowCount: -1,
Expand Down
12 changes: 4 additions & 8 deletions go/vt/vtgate/planbuilder/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,10 @@ func buildLockingPrimitive(sel *sqlparser.Select, vschema ContextVSchema) (engin
if err != nil {
return nil, err
}
return &engine.Reserve{
Input: &engine.Send{
Keyspace: ks,
TargetDestination: key.DestinationKeyspaceID{0},
Query: sqlparser.String(sel),
IsDML: false,
SingleShardOnly: true,
},
return &engine.Lock{
Keyspace: ks,
TargetDestination: key.DestinationKeyspaceID{0},
Query: sqlparser.String(sel),
}, nil
}

Expand Down
16 changes: 14 additions & 2 deletions go/vt/vtgate/planbuilder/testdata/lock_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
"QueryType": "SELECT",
"Original": "select get_lock('xyz', 10) from dual",
"Instructions": {
"OperatorType": "Reserve"
"OperatorType": "Lock",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"TargetDestination": "KeyspaceID(00)",
"Query": "select get_lock('xyz', 10) from dual"
}
}

Expand All @@ -14,6 +20,12 @@
"QueryType": "SELECT",
"Original": "select is_free_lock('xyz') from dual",
"Instructions": {
"OperatorType": "Reserve"
"OperatorType": "Lock",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"TargetDestination": "KeyspaceID(00)",
"Query": "select is_free_lock('xyz') from dual"
}
}
36 changes: 36 additions & 0 deletions go/vt/vtgate/safe_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,3 +360,39 @@ func (session *SafeSession) SetPreQueries() []string {
}
return result
}

//SetLockSession sets the lock session.
func (session *SafeSession) SetLockSession(lockSession *vtgatepb.Session_ShardSession) {
session.mu.Lock()
defer session.mu.Unlock()
session.LockSession = lockSession
}

//InLockSession returns whether locking is used on this session.
func (session *SafeSession) InLockSession() bool {
session.mu.Lock()
defer session.mu.Unlock()
return session.LockSession != nil
}

//ResetLock resets the lock session
func (session *SafeSession) ResetLock() {
session.mu.Lock()
defer session.mu.Unlock()
session.LockSession = nil
}

//ResetAll resets the shard sessions and lock session.
func (session *SafeSession) ResetAll() {
session.mu.Lock()
defer session.mu.Unlock()
session.mustRollback = false
session.autocommitState = notAutocommittable
session.Session.InTransaction = false
session.commitOrder = vtgatepb.CommitOrder_NORMAL
session.Savepoints = nil
session.ShardSessions = nil
session.PreSessions = nil
session.PostSessions = nil
session.LockSession = nil
}
Loading