Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions go/pools/numbered.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,15 @@ func (nu *Numbered) Get(id int64, purpose string) (val interface{}, err error) {
}

// Put unlocks a resource for someone else to use.
func (nu *Numbered) Put(id int64) {
func (nu *Numbered) Put(id int64, updateTime bool) {
nu.mu.Lock()
defer nu.mu.Unlock()
if nw, ok := nu.resources[id]; ok {
nw.inUse = false
nw.purpose = ""
nw.timeUsed = time.Now()
if updateTime {
nw.timeUsed = time.Now()
}
}
}

Expand All @@ -162,7 +164,7 @@ func (nu *Numbered) GetOutdated(age time.Duration, purpose string) (vals []inter
if nw.inUse || !nw.enforceTimeout {
continue
}
if nw.timeCreated.Add(age).Sub(now) <= 0 {
if nw.timeUsed.Add(age).Sub(now) <= 0 {
nw.inUse = true
nw.purpose = purpose
vals = append(vals, nw.val)
Expand Down
6 changes: 3 additions & 3 deletions go/pools/numbered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestNumbered(t *testing.T) {
if _, err = p.Get(id, "test1"); err.Error() != "in use: test" {
t.Errorf("want 'in use: test', got '%v'", err)
}
p.Put(id)
p.Put(id, true)
if _, err = p.Get(1, "test2"); err.Error() != "not found" {
t.Errorf("want 'not found', got '%v'", err)
}
Expand Down Expand Up @@ -75,9 +75,9 @@ func TestNumbered(t *testing.T) {
t.Errorf("want 'in use: by outdated', got '%v'", err)
}
for _, v := range vals {
p.Put(v.(int64))
p.Put(v.(int64), true)
}
p.Put(2) // put to 2 to ensure it's not idle
p.Put(2, true) // put to 2 to ensure it's not idle
time.Sleep(100 * time.Millisecond)

// p has 0, 1, 2 (2 is idle)
Expand Down
110 changes: 110 additions & 0 deletions go/vt/vttablet/endtoend/connkilling/connkiller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
Copyright 2020 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

/*
All tests in this package come with a three second time out for OLTP session
*/
package connkilling

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"vitess.io/vitess/go/vt/vttablet/endtoend/framework"
)

func TestTxKillerKillsTransactionsInReservedConnections(t *testing.T) {
client := framework.NewClient()
defer client.Release()

_, err := client.ReserveBeginExecute("select 42", nil, nil)
require.NoError(t, err)

assertIsKilledWithin5Seconds(t, client)
}

func TestTxKillerDoesNotKillReservedConnectionsInUse(t *testing.T) {
client := framework.NewClient()
defer client.Release()

_, err := client.ReserveExecute("select 42", nil, nil)
require.NoError(t, err)

assertIsNotKilledOver5Second(t, client)
}

func TestTxKillerCountsTimeFromTxStartedNotStatefulConnCreated(t *testing.T) {
client := framework.NewClient()
defer client.Release()

// reserve connection at 0th second
_, err := client.ReserveExecute("select 42", nil, nil)
require.NoError(t, err)

// elapsed 2 seconds
time.Sleep(2 * time.Second)

// update the timer on tx start - new tx timer starts
_, err = client.BeginExecute("select 44", nil, nil)
require.NoError(t, err)

// elapsed 1 second from tx and 3 second from reserved conn.
time.Sleep(1 * time.Second)
_, err = client.Execute("select 43", nil)
require.NoError(t, err)

// elapsed 2 second from tx and 4 second from reserved conn. It does not fail.
time.Sleep(1 * time.Second)
_, err = client.Execute("select 43", nil)
require.NoError(t, err)

assertIsKilledWithin5Seconds(t, client)
}

func TestTxKillerKillsTransactionThreeSecondsAfterCreation(t *testing.T) {
client := framework.NewClient()
defer client.Release()

_, err := client.BeginExecute("select 42", nil, nil)
require.NoError(t, err)

assertIsKilledWithin5Seconds(t, client)
}

func assertIsNotKilledOver5Second(t *testing.T, client *framework.QueryClient) {
for i := 0; i < 5; i++ {
_, err := client.Execute("select 43", nil)
require.NoError(t, err)
time.Sleep(1 * time.Second)
}
}

func assertIsKilledWithin5Seconds(t *testing.T, client *framework.QueryClient) {
var err error
// when it is used once per second
for i := 0; i < 5; i++ {
_, err = client.Execute("select 43", nil)
if err != nil {
break
}
time.Sleep(1 * time.Second)
}

// then it should still be killed. transactions are tracked per tx-creation time and not last-used time
require.Error(t, err)
require.Contains(t, err.Error(), "exceeded timeout: 3s")
}
Loading