Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a0bdfe5
Add reserve connection APIs to protobuf
systay Apr 24, 2020
052412e
Added reserve connection method in grpc queryservice server
harshit-gangal Apr 24, 2020
e2268f0
Added Reserve to tx-pool
systay May 6, 2020
1c61bbc
added execWithRetry in TxConnection
harshit-gangal May 6, 2020
aacf55b
Removed unused Stop() from tx_engine
systay May 8, 2020
21df047
Introduce interfaces to break the dependencies between TabletServer a…
systay May 11, 2020
de799eb
Extracted connection handling from tx_pool
systay May 12, 2020
c8583d7
extracted transaction properties from txConnection
harshit-gangal May 12, 2020
ae8c88c
More clean up around TxConnection
systay May 12, 2020
0a9b679
Moved BeginAgain, and changed to ConnID insted of int64
systay May 12, 2020
da4294f
Unit tests for ActivePool
systay May 13, 2020
b300d19
Fix endtoend tests
systay May 13, 2020
9cdea4f
Renamed Get/Recycle to GetAndBlock/Unblock
systay May 13, 2020
8f21486
changed activepool->statefulconnectionpool, dedicatedconn->statefulco…
harshit-gangal May 14, 2020
cadc951
Merged txpool Begin and LocalBegin
systay May 14, 2020
ab33d33
Removed func from tx_engine.Begin
systay May 14, 2020
c8768c6
test cleanups
systay May 14, 2020
e8010cc
Merged Commit/LocalCommit, and stopped TxPool from releasing connections
systay May 15, 2020
e0e5c8d
Removed and renamed methods on tx_pool.go
systay May 15, 2020
657b480
Small touch-ups
systay May 15, 2020
6030136
Refactor tx_executor
systay May 15, 2020
bbb2bd5
Protect txPool from using without checking state
systay May 15, 2020
9cb9f39
Fixed test failures
systay May 19, 2020
79385ec
Changed tx_pool to use interface instead of struct
systay May 19, 2020
68075cd
Renamed field
systay May 19, 2020
456275c
Renamed conclude to Releasef
systay May 19, 2020
405adfb
Make String() on StatefulConnection nil safe
systay May 21, 2020
ce8d72d
Uncommented tests
systay May 21, 2020
f4d3209
Clean up of tests
systay May 21, 2020
b45e7b6
More testing
systay May 21, 2020
18e0dc2
rename trustedconnection to IStatefulConnection and added test for tx…
harshit-gangal May 22, 2020
1a735f4
remove t.cleanup call from failing test
harshit-gangal May 22, 2020
a446396
tabletserver-refactor: review comments
harshit-gangal May 25, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ type DB struct {
patternData []exprResult
// queryCalled keeps track of how many times a query was called.
queryCalled map[string]int
// querylog keeps track of all called queries
querylog []string

// This next set of fields is used when ordering of the queries matters.

Expand Down Expand Up @@ -340,6 +342,7 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
db.mu.Lock()
defer db.mu.Unlock()
db.queryCalled[key]++
db.querylog = append(db.querylog, key)

// Check if we should close the connection and provoke errno 2013.
if db.shouldClose {
Expand Down Expand Up @@ -523,6 +526,11 @@ func (db *DB) GetQueryCalledNum(query string) int {
return num
}

//QueryLog returns the query log in a semicomma separated string
func (db *DB) QueryLog() string {
return strings.Join(db.querylog, ";")
}

// EnableConnFail makes connection to this fake DB fail.
func (db *DB) EnableConnFail() {
db.mu.Lock()
Expand Down
9 changes: 6 additions & 3 deletions go/pools/numbered.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (u *unregistered) Size() int {
return 1
}

//NewNumbered creates a new numbered
func NewNumbered() *Numbered {
n := &Numbered{
resources: make(map[int64]*numberedWrapper),
Expand Down Expand Up @@ -86,7 +87,7 @@ func (nu *Numbered) Register(id int64, val interface{}, enforceTimeout bool) err

// Unregister forgets the specified resource. If the resource is not present, it's ignored.
func (nu *Numbered) Unregister(id int64, reason string) {
success := nu.unregister(id, reason)
success := nu.unregister(id)
if success {
nu.recentlyUnregistered.Set(
fmt.Sprintf("%v", id), &unregistered{reason: reason, timeUnregistered: time.Now()})
Expand All @@ -95,7 +96,7 @@ func (nu *Numbered) Unregister(id int64, reason string) {

// unregister forgets the resource, if it exists. Returns whether or not the resource existed at
// time of Unregister.
func (nu *Numbered) unregister(id int64, reason string) bool {
func (nu *Numbered) unregister(id int64) bool {
nu.mu.Lock()
defer nu.mu.Unlock()

Expand Down Expand Up @@ -199,11 +200,13 @@ func (nu *Numbered) WaitForEmpty() {
}
}

//StatsJSON returns stats in JSON format
func (nu *Numbered) StatsJSON() string {
return fmt.Sprintf("{\"Size\": %v}", nu.Size())
}

func (nu *Numbered) Size() (size int64) {
//Size returns the current size
func (nu *Numbered) Size() int64 {
nu.mu.Lock()
defer nu.mu.Unlock()
return int64(len(nu.resources))
Expand Down
36 changes: 9 additions & 27 deletions go/vt/vttablet/endtoend/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"reflect"
"testing"

"github.com/stretchr/testify/require"
"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/vttablet/endtoend/framework"
Expand Down Expand Up @@ -118,16 +119,14 @@ func TestBatchRead(t *testing.T) {
want := []sqltypes.Result{qr1, qr2}

qrl, err := client.ExecuteBatch(queries, false)
if err != nil {
t.Error(err)
return
}
require.NoError(t, err)
if !reflect.DeepEqual(qrl, want) {
t.Errorf("ExecueBatch: \n%#v, want \n%#v", prettyPrintArr(qrl), prettyPrintArr(want))
}
}

func TestBatchTransaction(t *testing.T) {

client := framework.NewClient()
queries := []*querypb.BoundQuery{{
Sql: "insert into vitess_test values(4, null, null, null)",
Expand All @@ -148,37 +147,25 @@ func TestBatchTransaction(t *testing.T) {

// Not in transaction, AsTransaction false
qrl, err := client.ExecuteBatch(queries, false)
if err != nil {
t.Error(err)
return
}
require.NoError(t, err)
if !reflect.DeepEqual(qrl[1].Rows, wantRows) {
t.Errorf("Rows: \n%#v, want \n%#v", qrl[1].Rows, wantRows)
}

// Not in transaction, AsTransaction true
qrl, err = client.ExecuteBatch(queries, true)
if err != nil {
t.Error(err)
return
}
require.NoError(t, err)
if !reflect.DeepEqual(qrl[1].Rows, wantRows) {
t.Errorf("Rows: \n%#v, want \n%#v", qrl[1].Rows, wantRows)
}

// In transaction, AsTransaction false
func() {
err = client.Begin(false)
if err != nil {
t.Error(err)
return
}
require.NoError(t, err)
defer client.Commit()
qrl, err = client.ExecuteBatch(queries, false)
if err != nil {
t.Error(err)
return
}
require.NoError(t, err)
if !reflect.DeepEqual(qrl[1].Rows, wantRows) {
t.Errorf("Rows: \n%#v, want \n%#v", qrl[1].Rows, wantRows)
}
Expand All @@ -187,15 +174,10 @@ func TestBatchTransaction(t *testing.T) {
// In transaction, AsTransaction true
func() {
err = client.Begin(false)
if err != nil {
t.Error(err)
return
}
require.NoError(t, err)
defer client.Rollback()
qrl, err = client.ExecuteBatch(queries, true)
want := "cannot start a new transaction in the scope of an existing one"
if err == nil || err.Error() != want {
t.Errorf("Error: %v, want %s", err, want)
}
require.EqualError(t, err, want)
}()
}
139 changes: 20 additions & 119 deletions go/vt/vttablet/endtoend/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ limitations under the License.
package endtoend

import (
"fmt"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand All @@ -33,98 +34,23 @@ import (
)

// compareIntDiff returns an error if end[tag] != start[tag]+diff.
func compareIntDiff(end map[string]interface{}, tag string, start map[string]interface{}, diff int) error {
return verifyIntValue(end, tag, framework.FetchInt(start, tag)+diff)
func compareIntDiff(t *testing.T, end map[string]interface{}, tag string, start map[string]interface{}, diff int) {
t.Helper()
verifyIntValue(t, end, tag, framework.FetchInt(start, tag)+diff)
}

// verifyIntValue returns an error if values[tag] != want.
func verifyIntValue(values map[string]interface{}, tag string, want int) error {
got := framework.FetchInt(values, tag)
if got != want {
return fmt.Errorf("%s: %d, want %d", tag, got, want)
}
return nil
}

func TestConfigVars(t *testing.T) {
currentConfig := tabletenv.NewCurrentConfig()
vars := framework.DebugVars()
cases := []struct {
tag string
val int
}{{
tag: "ConnPoolAvailable",
val: currentConfig.OltpReadPool.Size,
}, {
tag: "ConnPoolCapacity",
val: currentConfig.OltpReadPool.Size,
}, {
tag: "ConnPoolIdleTimeout",
val: currentConfig.OltpReadPool.IdleTimeoutSeconds * 1e9,
}, {
tag: "ConnPoolMaxCap",
val: currentConfig.OltpReadPool.Size,
}, {
tag: "MaxResultSize",
val: currentConfig.Oltp.MaxRows,
}, {
tag: "WarnResultSize",
val: currentConfig.Oltp.WarnRows,
}, {
tag: "QueryCacheCapacity",
val: currentConfig.QueryCacheSize,
}, {
tag: "QueryTimeout",
val: int(currentConfig.Oltp.QueryTimeoutSeconds * 1e9),
}, {
tag: "SchemaReloadTime",
val: int(currentConfig.SchemaReloadIntervalSeconds * 1e9),
}, {
tag: "StreamBufferSize",
val: currentConfig.StreamBufferSize,
}, {
tag: "StreamConnPoolAvailable",
val: currentConfig.OlapReadPool.Size,
}, {
tag: "StreamConnPoolCapacity",
val: currentConfig.OlapReadPool.Size,
}, {
tag: "StreamConnPoolIdleTimeout",
val: currentConfig.OlapReadPool.IdleTimeoutSeconds * 1e9,
}, {
tag: "StreamConnPoolMaxCap",
val: currentConfig.OlapReadPool.Size,
}, {
tag: "TransactionPoolAvailable",
val: currentConfig.TxPool.Size,
}, {
tag: "TransactionPoolCapacity",
val: currentConfig.TxPool.Size,
}, {
tag: "TransactionPoolIdleTimeout",
val: currentConfig.TxPool.IdleTimeoutSeconds * 1e9,
}, {
tag: "TransactionPoolMaxCap",
val: currentConfig.TxPool.Size,
}, {
tag: "TransactionTimeout",
val: int(currentConfig.Oltp.TxTimeoutSeconds * 1e9),
}}
for _, tcase := range cases {
if err := verifyIntValue(vars, tcase.tag, int(tcase.val)); err != nil {
t.Error(err)
}
}
func verifyIntValue(t *testing.T, values map[string]interface{}, tag string, want int) {
t.Helper()
require.Equal(t, want, framework.FetchInt(values, tag), tag)
}

func TestPoolSize(t *testing.T) {
defer framework.Server.SetPoolSize(framework.Server.PoolSize())
framework.Server.SetPoolSize(1)

vstart := framework.DebugVars()
if err := verifyIntValue(vstart, "ConnPoolCapacity", 1); err != nil {
t.Error(err)
}
verifyIntValue(t, vstart, "ConnPoolCapacity", 1)

var wg sync.WaitGroup
wg.Add(2)
Expand Down Expand Up @@ -270,34 +196,19 @@ func TestQueryPlanCache(t *testing.T) {
_, _ = client.Execute("select * from vitess_test where intval=:ival1", bindVars)
_, _ = client.Execute("select * from vitess_test where intval=:ival2", bindVars)
vend := framework.DebugVars()
if err := verifyIntValue(vend, "QueryCacheLength", 1); err != nil {
t.Error(err)
}
if err := verifyIntValue(vend, "QueryCacheSize", 1); err != nil {
t.Error(err)
}
if err := verifyIntValue(vend, "QueryCacheCapacity", 1); err != nil {
t.Error(err)
}
verifyIntValue(t, vend, "QueryCacheLength", 1)
verifyIntValue(t, vend, "QueryCacheSize", 1)
verifyIntValue(t, vend, "QueryCacheCapacity", 1)

framework.Server.SetQueryPlanCacheCap(10)
_, _ = client.Execute("select * from vitess_test where intval=:ival1", bindVars)
vend = framework.DebugVars()
if err := verifyIntValue(vend, "QueryCacheLength", 2); err != nil {
t.Error(err)
}
if err := verifyIntValue(vend, "QueryCacheSize", 2); err != nil {
t.Error(err)
}

verifyIntValue(t, vend, "QueryCacheLength", 2)
verifyIntValue(t, vend, "QueryCacheSize", 2)
_, _ = client.Execute("select * from vitess_test where intval=1", bindVars)
vend = framework.DebugVars()
if err := verifyIntValue(vend, "QueryCacheLength", 3); err != nil {
t.Error(err)
}
if err := verifyIntValue(vend, "QueryCacheSize", 3); err != nil {
t.Error(err)
}
verifyIntValue(t, vend, "QueryCacheLength", 3)
verifyIntValue(t, vend, "QueryCacheSize", 3)
}

func TestMaxResultSize(t *testing.T) {
Expand All @@ -311,10 +222,7 @@ func TestMaxResultSize(t *testing.T) {
if err == nil || !strings.HasPrefix(err.Error(), want) {
t.Errorf("Error: %v, must start with %s", err, want)
}
if err := verifyIntValue(framework.DebugVars(), "MaxResultSize", 2); err != nil {
t.Error(err)
}

verifyIntValue(t, framework.DebugVars(), "MaxResultSize", 2)
framework.Server.SetMaxResultSize(10)
_, err = client.Execute(query, nil)
if err != nil {
Expand All @@ -337,10 +245,7 @@ func TestWarnResultSize(t *testing.T) {
t.Errorf("Warnings.ResultsExceeded counter should have increased by 1, instead got %v", exceededCountDiff)
}

if err := verifyIntValue(framework.DebugVars(), "WarnResultSize", 2); err != nil {
t.Error(err)
}

verifyIntValue(t, framework.DebugVars(), "WarnResultSize", 2)
framework.Server.SetWarnResultSize(10)
_, _ = client.Execute(query, nil)
newerWarningsResultsExceededCount := framework.FetchInt(framework.DebugVars(), "Warnings/ResultsExceeded")
Expand Down Expand Up @@ -370,10 +275,6 @@ func TestQueryTimeout(t *testing.T) {
t.Errorf("Error code: %v, want %v", code, vtrpcpb.Code_ABORTED)
}
vend := framework.DebugVars()
if err := verifyIntValue(vend, "QueryTimeout", int(100*time.Millisecond)); err != nil {
t.Error(err)
}
if err := compareIntDiff(vend, "Kills/Queries", vstart, 1); err != nil {
t.Error(err)
}
verifyIntValue(t, vend, "QueryTimeout", int(100*time.Millisecond))
compareIntDiff(t, vend, "Kills/Queries", vstart, 1)
}
4 changes: 2 additions & 2 deletions go/vt/vttablet/endtoend/framework/eventcatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ func (tc *TxCatcher) Close() {

// Next fetches the next captured transaction.
// If the wait is longer than one second, it returns an error.
func (tc *TxCatcher) Next() (*tabletserver.TxConnection, error) {
func (tc *TxCatcher) Next() (*tabletserver.StatefulConnection, error) {
event, err := tc.catcher.next()
if err != nil {
return nil, err
}
return event.(*tabletserver.TxConnection), nil
return event.(*tabletserver.StatefulConnection), nil
}

// QueryCatcher allows you to capture and fetch queries that are being
Expand Down
Loading