Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"os"
"testing"

"github.com/stretchr/testify/require"

"github.com/stretchr/testify/assert"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
Expand All @@ -47,45 +49,43 @@ func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()

exitcode, err := func() (int, error) {
exitCode := func() int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()

// Reserve vtGate port in order to pass it to vtTablet
clusterInstance.VtgateGrpcPort = clusterInstance.GetAndReservePort()

// Start topo server
if err := clusterInstance.StartTopo(); err != nil {
return 1, err
err := clusterInstance.StartTopo()
if err != nil {
panic(err)
}

// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: sqlSchema,
}
if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false); err != nil {
return 1, err
err = clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false)
if err != nil {
panic(err)
}

// Set a short onterm timeout so the test goes faster.
clusterInstance.VtGateExtraArgs = []string{"-onterm_timeout", "1s"}
if err := clusterInstance.StartVtgate(); err != nil {
return 1, err
err = clusterInstance.StartVtgate()
if err != nil {
panic(err)
}
vtParams = mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}

return m.Run(), nil
return m.Run()
}()
if err != nil {
fmt.Printf("%v\n", err)
os.Exit(1)
} else {
os.Exit(exitcode)
}
os.Exit(exitCode)
}

func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
Expand All @@ -101,9 +101,7 @@ func TestTransactionRollBackWhenShutDown(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
defer conn.Close()

exec(t, conn, "insert into buffer(id, msg) values(3,'mark')")
Expand All @@ -126,9 +124,7 @@ func TestTransactionRollBackWhenShutDown(t *testing.T) {
Port: clusterInstance.VtgateMySQLPort,
}
conn2, err := mysql.Connect(ctx, &vtParams)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
defer conn2.Close()

vtParams = mysql.ConnParams{
Expand All @@ -142,3 +138,26 @@ func TestTransactionRollBackWhenShutDown(t *testing.T) {
want = `[[INT64(3)]]`
assert.Equal(t, want, got)
}

func TestErrorInAutocommitSession(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

exec(t, conn, "set autocommit=true")
exec(t, conn, "insert into buffer(id, msg) values(1,'foo')")
_, err = conn.ExecuteFetch("insert into buffer(id, msg) values(1,'bar')", 1, true)
require.Error(t, err) // this should fail with duplicate error
exec(t, conn, "insert into buffer(id, msg) values(2,'baz')")

conn2, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn2.Close()
result := exec(t, conn2, "select * from buffer order by id")

// if we have properly working autocommit code, both the successful inserts should be visible to a second
// connection, even if we have not done an explicit commit
assert.Equal(t, `[[INT64(1) VARCHAR("foo")] [INT64(2) VARCHAR("baz")]]`, fmt.Sprintf("%v", result.Rows))
}
17 changes: 17 additions & 0 deletions go/vt/vtgate/executor_dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,23 @@ func TestInsertOnDupKey(t *testing.T) {
}
}

func TestAutocommitFail(t *testing.T) {
executor, sbc1, _, _ := createExecutorEnv()

query := "insert into user (id) values (1)"
sbc1.MustFailCodes[vtrpcpb.Code_ALREADY_EXISTS] = 1
masterSession.Reset()
masterSession.Autocommit = true
defer func() {
masterSession.Autocommit = false
}()
_, err := executorExec(executor, query, nil)
require.Error(t, err)

// make sure we have closed and rolled back any transactions started
assert.False(t, masterSession.InTransaction, "left with tx open")
}

func TestInsertComments(t *testing.T) {
executor, sbc1, sbc2, sbclookup := createExecutorEnv()

Expand Down
7 changes: 3 additions & 4 deletions go/vt/vtgate/plan_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ func (e *planExecute) insideTransaction(ctx context.Context, safeSession *SafeSe
if err := e.e.txConn.Begin(ctx, safeSession); err != nil {
return nil, err
}
// The defer acts as a failsafe. If commit was successful,
// the rollback will be a no-op.
defer e.e.txConn.Rollback(ctx, safeSession)
}

// The SetAutocommitable flag should be same as mustCommit.
Expand All @@ -184,10 +187,6 @@ func (e *planExecute) insideTransaction(ctx context.Context, safeSession *SafeSe
}

if mustCommit {
// The defer acts as a failsafe. If commit was successful,
// the rollback will be a no-op.
defer e.e.txConn.Rollback(ctx, safeSession)

commitStart := time.Now()
if err := e.e.txConn.Commit(ctx, safeSession); err != nil {
return nil, err
Expand Down