From 81c78b70ae869fbe46f21c28ab5b002818639a51 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 3 Oct 2022 16:54:59 +0530 Subject: [PATCH 1/4] test: added failing e2e test Signed-off-by: Harshit Gangal --- .../cluster_endtoend_vttablet_prscomplex.yml | 118 +++++++++++++++ go/test/endtoend/cluster/mysqlctl_process.go | 21 ++- .../endtoend/reparent/prscomplex/main_test.go | 143 ++++++++++++++++++ .../endtoend/reparent/prscomplex/schema.sql | 5 + test/ci_workflow_gen.go | 1 + test/config.json | 9 ++ 6 files changed, 294 insertions(+), 3 deletions(-) create mode 100644 .github/workflows/cluster_endtoend_vttablet_prscomplex.yml create mode 100644 go/test/endtoend/reparent/prscomplex/main_test.go create mode 100644 go/test/endtoend/reparent/prscomplex/schema.sql diff --git a/.github/workflows/cluster_endtoend_vttablet_prscomplex.yml b/.github/workflows/cluster_endtoend_vttablet_prscomplex.yml new file mode 100644 index 00000000000..692c8afd7e4 --- /dev/null +++ b/.github/workflows/cluster_endtoend_vttablet_prscomplex.yml @@ -0,0 +1,118 @@ +# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows" + +name: Cluster (vttablet_prscomplex) +on: [push, pull_request] +concurrency: + group: format('{0}-{1}', ${{ github.ref }}, 'Cluster (vttablet_prscomplex)') + cancel-in-progress: true + +env: + LAUNCHABLE_ORGANIZATION: "vitess" + LAUNCHABLE_WORKSPACE: "vitess-app" + GITHUB_PR_HEAD_SHA: "${{ github.event.pull_request.head.sha }}" + +jobs: + build: + name: Run endtoend tests on Cluster (vttablet_prscomplex) + runs-on: ubuntu-18.04 + + steps: + - name: Check if workflow needs to be skipped + id: skip-workflow + run: | + skip='false' + if [[ "${{github.event.pull_request}}" == "" ]] && [[ "${{github.ref}}" != "refs/heads/main" ]] && [[ ! "${{github.ref}}" =~ ^refs/heads/release-[0-9]+\.[0-9]$ ]] && [[ ! "${{github.ref}}" =~ "refs/tags/.*" ]]; then + skip='true' + fi + echo Skip ${skip} + echo "::set-output name=skip-workflow::${skip}" + + - name: Check out code + if: steps.skip-workflow.outputs.skip-workflow == 'false' + uses: actions/checkout@v2 + + - name: Check for changes in relevant files + if: steps.skip-workflow.outputs.skip-workflow == 'false' + uses: frouioui/paths-filter@main + id: changes + with: + token: '' + filters: | + end_to_end: + - 'go/**/*.go' + - 'test.go' + - 'Makefile' + - 'build.env' + - 'go.[sumod]' + - 'proto/*.proto' + - 'tools/**' + - 'config/**' + - 'bootstrap.sh' + - '.github/workflows/cluster_endtoend_vttablet_prscomplex.yml' + + - name: Set up Go + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + uses: actions/setup-go@v2 + with: + go-version: 1.18.5 + + - name: Set up python + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + uses: actions/setup-python@v2 + + - name: Tune the OS + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + run: | + echo '1024 65535' | sudo tee -a /proc/sys/net/ipv4/ip_local_port_range + # Increase the asynchronous non-blocking I/O. More information at https://dev.mysql.com/doc/refman/5.7/en/innodb-parameters.html#sysvar_innodb_use_native_aio + echo "fs.aio-max-nr = 1048576" | sudo tee -a /etc/sysctl.conf + sudo sysctl -p /etc/sysctl.conf + + - name: Get dependencies + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + run: | + sudo apt-get update + sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata + sudo service mysql stop + sudo service etcd stop + sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/ + sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld + go mod download + + # install JUnit report formatter + go install github.com/vitessio/go-junit-report@HEAD + + - name: Setup launchable dependencies + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + run: | + # Get Launchable CLI installed. If you can, make it a part of the builder image to speed things up + pip3 install --user launchable~=1.0 > /dev/null + + # verify that launchable setup is all correct. + launchable verify || true + + # Tell Launchable about the build you are producing and testing + launchable record build --name "$GITHUB_RUN_ID" --source . + + - name: Run cluster endtoend test + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + timeout-minutes: 30 + run: | + # We set the VTDATAROOT to the /tmp folder to reduce the file path of mysql.sock file + # which musn't be more than 107 characters long. + export VTDATAROOT="/tmp/" + source build.env + + set -x + + # run the tests however you normally do, then produce a JUnit XML file + eatmydata -- go run test.go -docker=false -follow -shard vttablet_prscomplex | tee -a output.txt | go-junit-report -set-exit-code > report.xml + + - name: Print test output and Record test result in launchable + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && always() + run: | + # send recorded tests to launchable + launchable record tests --build "$GITHUB_RUN_ID" go-test . || true + + # print test output + cat output.txt diff --git a/go/test/endtoend/cluster/mysqlctl_process.go b/go/test/endtoend/cluster/mysqlctl_process.go index 97ccaf80f86..3478d5dc806 100644 --- a/go/test/endtoend/cluster/mysqlctl_process.go +++ b/go/test/endtoend/cluster/mysqlctl_process.go @@ -65,7 +65,16 @@ func (mysqlctl *MysqlctlProcess) InitDb() (err error) { // Start executes mysqlctl command to start mysql instance func (mysqlctl *MysqlctlProcess) Start() (err error) { - tmpProcess, err := mysqlctl.StartProcess() + tmpProcess, err := mysqlctl.startProcess(true) + if err != nil { + return err + } + return tmpProcess.Wait() +} + +// StartProvideInit executes mysqlctl command to start mysql instance +func (mysqlctl *MysqlctlProcess) StartProvideInit(init bool) (err error) { + tmpProcess, err := mysqlctl.startProcess(init) if err != nil { return err } @@ -74,6 +83,10 @@ func (mysqlctl *MysqlctlProcess) Start() (err error) { // StartProcess starts the mysqlctl and returns the process reference func (mysqlctl *MysqlctlProcess) StartProcess() (*exec.Cmd, error) { + return mysqlctl.startProcess(true) +} + +func (mysqlctl *MysqlctlProcess) startProcess(init bool) (*exec.Cmd, error) { tmpProcess := exec.Command( mysqlctl.Binary, "--log_dir", mysqlctl.LogDirectory, @@ -120,8 +133,10 @@ ssl_key={{.Dir}}/server-001-key.pem tmpProcess.Env = append(tmpProcess.Env, "VTDATAROOT="+os.Getenv("VTDATAROOT")) } - tmpProcess.Args = append(tmpProcess.Args, "init", "--", - "--init_db_sql_file", mysqlctl.InitDBFile) + if init { + tmpProcess.Args = append(tmpProcess.Args, "init", "--", + "--init_db_sql_file", mysqlctl.InitDBFile) + } } tmpProcess.Args = append(tmpProcess.Args, "start") log.Infof("Starting mysqlctl with command: %v", tmpProcess.Args) diff --git a/go/test/endtoend/reparent/prscomplex/main_test.go b/go/test/endtoend/reparent/prscomplex/main_test.go new file mode 100644 index 00000000000..7d17b6dcc65 --- /dev/null +++ b/go/test/endtoend/reparent/prscomplex/main_test.go @@ -0,0 +1,143 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package misc + +import ( + "context" + _ "embed" + "flag" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" + rutils "vitess.io/vitess/go/test/endtoend/reparent/utils" + "vitess.io/vitess/go/test/endtoend/utils" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + keyspaceName = "ks" + cell = "test" + + //go:embed schema.sql + schemaSQL string +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, "localhost") + defer clusterInstance.Teardown() + + // Start topo server + err := clusterInstance.StartTopo() + if err != nil { + return 1 + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: schemaSQL, + } + clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, + "--queryserver-config-query-timeout=9000", + "--queryserver-config-pool-size=3", + "--queryserver-config-stream-pool-size=3", + "--queryserver-config-transaction-cap=2", + "--queryserver-config-transaction-timeout=20", + "--shutdown_grace_period=3", + "--queryserver-config-schema-change-signal=false") + err = clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false) + if err != nil { + return 1 + } + + // Start vtgate + clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, + "--planner-version=gen4", + "--mysql_default_workload=olap", + "--schema_change_signal=false") + err = clusterInstance.StartVtgate() + if err != nil { + return 1 + } + + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + return m.Run() + }() + os.Exit(exitCode) +} + +func TestAcquireSameConnID(t *testing.T) { + defer func() { + err := recover() + if err != nil { + require.Equal(t, "Fail in goroutine after TestAcquireSameConnID has completed", err) + } + }() + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + utils.Exec(t, conn, "set sql_mode=''") + _ = utils.Exec(t, conn, "select connection_id()") + //connID, err := qr.Rows[0][0].ToInt64() + //require.NoError(t, err) + + primTablet := clusterInstance.Keyspaces[0].Shards[0].PrimaryTablet() + err = primTablet.MysqlctlProcess.Stop() + require.NoError(t, err) + + err = primTablet.MysqlctlProcess.StartProvideInit(false) + require.NoError(t, err) + + go func() { + // this will trigger reconnect with a new connection id, which will be lower than the origin connection id. + _, _ = utils.ExecAllowError(t, conn, "select connection_id(), sleep(4000)") + }() + time.Sleep(5 * time.Second) + + // run through 100 times to acquire new connection, this might override the original connection id. + var conn2 *mysql.Conn + for i := 0; i < 100; i++ { + conn2, err = mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + + utils.Exec(t, conn2, "set sql_mode=''") + // ReserveExecute + _ = utils.Exec(t, conn2, "select connection_id()") + + // Execute + _ = utils.Exec(t, conn2, "select connection_id()") + + } + + text, err := rutils.Prs(t, clusterInstance, clusterInstance.Keyspaces[0].Shards[0].Replica()) + require.NoError(t, err, text) +} diff --git a/go/test/endtoend/reparent/prscomplex/schema.sql b/go/test/endtoend/reparent/prscomplex/schema.sql new file mode 100644 index 00000000000..3e78cab09d6 --- /dev/null +++ b/go/test/endtoend/reparent/prscomplex/schema.sql @@ -0,0 +1,5 @@ +create table t1( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; \ No newline at end of file diff --git a/test/ci_workflow_gen.go b/test/ci_workflow_gen.go index 4c8869b72e9..b37150b42f2 100644 --- a/test/ci_workflow_gen.go +++ b/test/ci_workflow_gen.go @@ -126,6 +126,7 @@ var ( "schemadiff_vrepl", "topo_connection_cache", "vtgate_partial_keyspace", + "vttablet_prscomplex", } clusterSelfHostedList = []string{ diff --git a/test/config.json b/test/config.json index feca79dc511..061a6d1db48 100644 --- a/test/config.json +++ b/test/config.json @@ -1214,6 +1214,15 @@ "Shard": "topo_connection_cache", "RetryMax": 1, "Tags": [] + }, + "prscomplex": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/reparent/prscomplex"], + "Command": [], + "Manual": false, + "Shard": "vttablet_prscomplex", + "RetryMax": 1, + "Tags": [""] } } } From 6b770917d79b06d77044d0e364da5d306f0c9dad Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 3 Oct 2022 17:01:54 +0530 Subject: [PATCH 2/4] log txID and reserveID in stream execute Signed-off-by: Harshit Gangal --- go/vt/vttablet/tabletserver/tabletserver.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index c00054157c2..bf1bf878d0a 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -863,6 +863,8 @@ func (tsv *TabletServer) streamExecute(ctx context.Context, target *querypb.Targ if transactionID != 0 { connID = transactionID } + logStats.ReservedID = reservedID + logStats.TransactionID = transactionID var connSetting *pools.Setting if len(settings) > 0 { From cb5e463b1afb0b98c1b8a784cab23f7ac4f0ffa5 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 3 Oct 2022 20:19:40 +0530 Subject: [PATCH 3/4] fix: maintain list of qd per key on the map and check for current connection id while removing Signed-off-by: Harshit Gangal --- go/vt/vttablet/tabletserver/query_list.go | 72 ++++++++++++------- .../vttablet/tabletserver/query_list_test.go | 28 +++++++- 2 files changed, 74 insertions(+), 26 deletions(-) diff --git a/go/vt/vttablet/tabletserver/query_list.go b/go/vt/vttablet/tabletserver/query_list.go index dfad235e721..d16680dff75 100644 --- a/go/vt/vttablet/tabletserver/query_list.go +++ b/go/vt/vttablet/tabletserver/query_list.go @@ -17,13 +17,12 @@ limitations under the License. package tabletserver import ( + "context" "html/template" "sort" "sync" "time" - "context" - "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/callinfo" "vitess.io/vitess/go/vt/sqlparser" @@ -53,14 +52,14 @@ type QueryList struct { name string mu sync.Mutex - queryDetails map[int64]*QueryDetail + queryDetails map[int64][]*QueryDetail } // NewQueryList creates a new QueryList func NewQueryList(name string) *QueryList { return &QueryList{ name: name, - queryDetails: make(map[int64]*QueryDetail), + queryDetails: make(map[int64][]*QueryDetail), } } @@ -68,25 +67,46 @@ func NewQueryList(name string) *QueryList { func (ql *QueryList) Add(qd *QueryDetail) { ql.mu.Lock() defer ql.mu.Unlock() - ql.queryDetails[qd.connID] = qd + qds, exists := ql.queryDetails[qd.connID] + if exists { + ql.queryDetails[qd.connID] = append(qds, qd) + } else { + ql.queryDetails[qd.connID] = []*QueryDetail{qd} + } } // Remove removes a QueryDetail from QueryList func (ql *QueryList) Remove(qd *QueryDetail) { ql.mu.Lock() defer ql.mu.Unlock() - delete(ql.queryDetails, qd.connID) + qds, exists := ql.queryDetails[qd.connID] + if !exists { + return + } + if len(qds) == 1 { + delete(ql.queryDetails, qd.connID) + return + } + for i, q := range qds { + // match with the actual connection ID. + if q.conn.ID() == qd.conn.ID() { + ql.queryDetails[qd.connID] = append(qds[:i], qds[i+1:]...) + return + } + } } // Terminate updates the query status and kills the connection func (ql *QueryList) Terminate(connID int64) bool { ql.mu.Lock() defer ql.mu.Unlock() - qd := ql.queryDetails[connID] - if qd == nil { + qds, exists := ql.queryDetails[connID] + if !exists { return false } - qd.conn.Kill("QueryList.Terminate()", time.Since(qd.start)) + for _, qd := range qds { + _ = qd.conn.Kill("QueryList.Terminate()", time.Since(qd.start)) + } return true } @@ -94,8 +114,10 @@ func (ql *QueryList) Terminate(connID int64) bool { func (ql *QueryList) TerminateAll() { ql.mu.Lock() defer ql.mu.Unlock() - for _, qd := range ql.queryDetails { - qd.conn.Kill("QueryList.TerminateAll()", time.Since(qd.start)) + for _, qds := range ql.queryDetails { + for _, qd := range qds { + _ = qd.conn.Kill("QueryList.TerminateAll()", time.Since(qd.start)) + } } } @@ -120,20 +142,22 @@ func (a byStartTime) Less(i, j int) bool { return a[i].Start.Before(a[j].Start) // AppendQueryzRows returns a list of QueryDetailzRow sorted by start time func (ql *QueryList) AppendQueryzRows(rows []QueryDetailzRow) []QueryDetailzRow { ql.mu.Lock() - for _, qd := range ql.queryDetails { - query := qd.conn.Current() - if streamlog.GetRedactDebugUIQueries() { - query, _ = sqlparser.RedactSQLQuery(query) - } - row := QueryDetailzRow{ - Type: ql.name, - Query: query, - ContextHTML: callinfo.HTMLFromContext(qd.ctx), - Start: qd.start, - Duration: time.Since(qd.start), - ConnID: qd.connID, + for _, qds := range ql.queryDetails { + for _, qd := range qds { + query := qd.conn.Current() + if streamlog.GetRedactDebugUIQueries() { + query, _ = sqlparser.RedactSQLQuery(query) + } + row := QueryDetailzRow{ + Type: ql.name, + Query: query, + ContextHTML: callinfo.HTMLFromContext(qd.ctx), + Start: qd.start, + Duration: time.Since(qd.start), + ConnID: qd.connID, + } + rows = append(rows, row) } - rows = append(rows, row) } ql.mu.Unlock() sort.Sort(byStartTime(rows)) diff --git a/go/vt/vttablet/tabletserver/query_list_test.go b/go/vt/vttablet/tabletserver/query_list_test.go index 40c546ef8ca..02b24d86cda 100644 --- a/go/vt/vttablet/tabletserver/query_list_test.go +++ b/go/vt/vttablet/tabletserver/query_list_test.go @@ -17,10 +17,11 @@ limitations under the License. package tabletserver import ( + "context" "testing" "time" - "context" + "github.com/stretchr/testify/require" ) type testConn struct { @@ -48,7 +49,7 @@ func TestQueryList(t *testing.T) { qd := NewQueryDetail(context.Background(), &testConn{id: connID}) ql.Add(qd) - if qd1, ok := ql.queryDetails[connID]; !ok || qd1.connID != connID { + if qd1, ok := ql.queryDetails[connID]; !ok || qd1[0].connID != connID { t.Errorf("failed to add to QueryList") } @@ -66,3 +67,26 @@ func TestQueryList(t *testing.T) { t.Errorf("failed to remove from QueryList") } } + +func TestQueryListChangeConnIDInMiddle(t *testing.T) { + ql := NewQueryList("test") + connID := int64(1) + qd1 := NewQueryDetail(context.Background(), &testConn{id: connID}) + ql.Add(qd1) + + conn := &testConn{id: connID} + qd2 := NewQueryDetail(context.Background(), conn) + ql.Add(qd2) + + require.Len(t, ql.queryDetails[1], 2) + + // change the connID in the middle + conn.id = 2 + + // remove the same object. + ql.Remove(qd2) + + require.Len(t, ql.queryDetails[1], 1) + require.Equal(t, qd1, ql.queryDetails[1][0]) + require.NotEqual(t, qd2, ql.queryDetails[1][0]) +} From 1a923a0f19ea9b527972262422bfece2691778ca Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 3 Oct 2022 22:01:48 +0530 Subject: [PATCH 4/4] added additional comments Signed-off-by: Harshit Gangal --- go/test/endtoend/reparent/prscomplex/main_test.go | 11 ++++++++--- go/vt/vttablet/tabletserver/query_list.go | 5 ++++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/go/test/endtoend/reparent/prscomplex/main_test.go b/go/test/endtoend/reparent/prscomplex/main_test.go index 7d17b6dcc65..9b4a7b86b8c 100644 --- a/go/test/endtoend/reparent/prscomplex/main_test.go +++ b/go/test/endtoend/reparent/prscomplex/main_test.go @@ -93,6 +93,11 @@ func TestMain(m *testing.M) { os.Exit(exitCode) } +/* +TestAcquireSameConnID tests that a query started on a connection gets reconnected with a new connection. +Another query acquires the old connection ID and does not override the query list maintained by the vttablet process. +PRS should not fail as the query list is maintained appropriately. +*/ func TestAcquireSameConnID(t *testing.T) { defer func() { err := recover() @@ -105,15 +110,14 @@ func TestAcquireSameConnID(t *testing.T) { require.NoError(t, err) defer conn.Close() + // start a reserved connection utils.Exec(t, conn, "set sql_mode=''") _ = utils.Exec(t, conn, "select connection_id()") - //connID, err := qr.Rows[0][0].ToInt64() - //require.NoError(t, err) + // restart the mysql to trigger reconnect on next query. primTablet := clusterInstance.Keyspaces[0].Shards[0].PrimaryTablet() err = primTablet.MysqlctlProcess.Stop() require.NoError(t, err) - err = primTablet.MysqlctlProcess.StartProvideInit(false) require.NoError(t, err) @@ -138,6 +142,7 @@ func TestAcquireSameConnID(t *testing.T) { } + // prs should happen without any error. text, err := rutils.Prs(t, clusterInstance, clusterInstance.Keyspaces[0].Shards[0].Replica()) require.NoError(t, err, text) } diff --git a/go/vt/vttablet/tabletserver/query_list.go b/go/vt/vttablet/tabletserver/query_list.go index d16680dff75..e78199c50ad 100644 --- a/go/vt/vttablet/tabletserver/query_list.go +++ b/go/vt/vttablet/tabletserver/query_list.go @@ -51,7 +51,10 @@ func NewQueryDetail(ctx context.Context, conn killable) *QueryDetail { type QueryList struct { name string - mu sync.Mutex + mu sync.Mutex + // on reconnect connection id will get reused by a different connection. + // so have to maintain a list to compare with the actual connection. + // and remove appropriately. queryDetails map[int64][]*QueryDetail }