Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions .github/workflows/cluster_endtoend_vttablet_prscomplex.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
uses: actions/setup-go@v2
with:
go-version: 1.18.5
go-version: 1.18.7

- name: Set up python
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
Expand All @@ -71,16 +71,17 @@ jobs:
- name: Get dependencies
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
run: |
# Setup Percona Server for MySQL 8.0

# Get key to latest MySQL repo
sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 467B942D3A79BD29
# Setup MySQL 8.0
wget -c https://dev.mysql.com/get/mysql-apt-config_0.8.20-1_all.deb
echo mysql-apt-config mysql-apt-config/select-server select mysql-8.0 | sudo debconf-set-selections
sudo DEBIAN_FRONTEND="noninteractive" dpkg -i mysql-apt-config*
sudo apt-get update
sudo apt-get install -y lsb-release gnupg2 curl
wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb
sudo DEBIAN_FRONTEND="noninteractive" dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb
sudo percona-release setup ps80
sudo apt-get update

# Install everything else we need, and configure
sudo apt-get install -y percona-server-server percona-server-client make unzip g++ etcd git wget eatmydata xz-utils
sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata xz-utils

sudo service mysql stop
sudo service etcd stop
sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/
Expand Down
114 changes: 114 additions & 0 deletions go/test/endtoend/vtgate/transaction/restart/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
Copyright 2022 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package misc

import (
"context"
_ "embed"
"flag"
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
)

var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
keyspaceName = "ks"
cell = "test"

//go:embed schema.sql
schemaSQL string
)

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()

exitCode := func() int {
clusterInstance = cluster.NewCluster(cell, "localhost")
defer clusterInstance.Teardown()

// Start topo server
err := clusterInstance.StartTopo()
if err != nil {
return 1
}

// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: schemaSQL,
}
err = clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false)
if err != nil {
return 1
}

// Start vtgate
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs,
"--planner-version=gen4",
"--mysql_default_workload=olap")
err = clusterInstance.StartVtgate()
if err != nil {
return 1
}

vtParams = mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
return m.Run()
}()
os.Exit(exitCode)
}

/*
TestStreamTxRestart tests that when a connection is killed my mysql (may be due to restart),
then the transaction should not continue to serve the query via reconnect.
*/
func TestStreamTxRestart(t *testing.T) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

utils.Exec(t, conn, "begin")
// BeginStreamExecute
_ = utils.Exec(t, conn, "select connection_id()")

// StreamExecute
_ = utils.Exec(t, conn, "select connection_id()")

// restart the mysql to terminate all the existing connections.
primTablet := clusterInstance.Keyspaces[0].Shards[0].PrimaryTablet()
err = primTablet.MysqlctlProcess.Stop()
require.NoError(t, err)
err = primTablet.MysqlctlProcess.StartProvideInit(false)
require.NoError(t, err)

// query should return connection error
_, err = utils.ExecAllowError(t, conn, "select connection_id()")
require.Error(t, err)
assert.Contains(t, err.Error(), "broken pipe (errno 2006) (sqlstate HY000)")
}
5 changes: 5 additions & 0 deletions go/test/endtoend/vtgate/transaction/restart/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
create table t1(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;
18 changes: 18 additions & 0 deletions go/vt/vttablet/tabletserver/connpool/dbconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,24 @@ func (dbc *DBConn) streamOnce(ctx context.Context, query string, callback func(*
return err
}

// StreamOnce executes the query and streams the results. But, does not retry on connection errors.
func (dbc *DBConn) StreamOnce(ctx context.Context, query string, callback func(*sqltypes.Result) error, alloc func() *sqltypes.Result, streamBufferSize int, includedFields querypb.ExecuteOptions_IncludedFields) error {
resultSent := false
return dbc.streamOnce(
ctx,
query,
func(r *sqltypes.Result) error {
if !resultSent {
resultSent = true
r = r.StripMetadata(includedFields)
}
return callback(r)
},
alloc,
streamBufferSize,
)
}

var (
getModeSQL = "select @@global.sql_mode"
getAutocommit = "select @@autocommit"
Expand Down
22 changes: 8 additions & 14 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (qre *QueryExecutor) Stream(callback StreamCallback) error {
}

var replaceKeyspace string
if sqltypes.IncludeFieldsOrDefault(qre.options) == querypb.ExecuteOptions_ALL {
if sqltypes.IncludeFieldsOrDefault(qre.options) == querypb.ExecuteOptions_ALL && qre.tsv.sm.target.Keyspace != qre.tsv.config.DB.DBName {
replaceKeyspace = qre.tsv.sm.target.Keyspace
}

Expand Down Expand Up @@ -1006,29 +1006,23 @@ func (qre *QueryExecutor) execStreamSQL(conn *connpool.DBConn, isTransaction boo
return callback(result)
}

qd := NewQueryDetail(qre.logStats.Ctx, conn)
start := time.Now()
defer qre.logStats.AddRewrittenSQL(sql, start)

// Add query detail object into QueryExecutor TableServer list w.r.t if it is a transactional or not. Previously we were adding it
// to olapql list regardless but that resulted in problems, where long-running stream queries which can be stateful (or transactional)
// weren't getting cleaned up during unserveCommon>handleShutdownGracePeriod in state_manager.go.
// This change will ensure that long-running streaming stateful queries get gracefully shutdown during ServingTypeChange
// once their grace period is over.
qd := NewQueryDetail(qre.logStats.Ctx, conn)
if isTransaction {
qre.tsv.statefulql.Add(qd)
defer qre.tsv.statefulql.Remove(qd)
} else {
qre.tsv.olapql.Add(qd)
defer qre.tsv.olapql.Remove(qd)
}

start := time.Now()
err := conn.Stream(ctx, sql, callBackClosingSpan, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Get()), sqltypes.IncludeFieldsOrDefault(qre.options))
qre.logStats.AddRewrittenSQL(sql, start)
if err != nil {
// MySQL error that isn't due to a connection issue
return err
return conn.StreamOnce(ctx, sql, callBackClosingSpan, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Get()), sqltypes.IncludeFieldsOrDefault(qre.options))
}
return nil
qre.tsv.olapql.Add(qd)
defer qre.tsv.olapql.Remove(qd)
return conn.Stream(ctx, sql, callBackClosingSpan, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Get()), sqltypes.IncludeFieldsOrDefault(qre.options))
}

func (qre *QueryExecutor) recordUserQuery(queryType string, duration int64) {
Expand Down
9 changes: 9 additions & 0 deletions test/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,15 @@
"RetryMax": 1,
"Tags": []
},
"vtgate_transaction_restart": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vtgate/transaction/restart"],
"Command": [],
"Manual": false,
"Shard": "vtgate_transaction",
"RetryMax": 1,
"Tags": []
},
"vtgate_transaction_rollback": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vtgate/transaction/rollback"],
Expand Down