diff --git a/go/mysql/constants.go b/go/mysql/constants.go index 89ef5e0afa3..a2d6ec3c0ae 100644 --- a/go/mysql/constants.go +++ b/go/mysql/constants.go @@ -438,6 +438,7 @@ const ( ERInvalidOnUpdate = 1294 ERUnknownTimeZone = 1298 ERInvalidCharacterString = 1300 + ERSavepointNotExist = 1305 ERIllegalReference = 1247 ERDerivedMustHaveAlias = 1248 ERTableNameNotAllowedHere = 1250 diff --git a/go/test/endtoend/vtgate/misc_test.go b/go/test/endtoend/vtgate/misc_test.go index 55cf98328ae..64176dda9fa 100644 --- a/go/test/endtoend/vtgate/misc_test.go +++ b/go/test/endtoend/vtgate/misc_test.go @@ -608,6 +608,117 @@ func TestUnion(t *testing.T) { assertMatches(t, conn, `(SELECT 1,'a' order by 1) union (SELECT 1,'a' ORDER BY 1)`, `[[INT64(1) VARCHAR("a")]]`) } +func TestSavepointInTx(t *testing.T) { + defer cluster.PanicHandler(t) + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn.Close() + + exec(t, conn, "savepoint a") + exec(t, conn, "start transaction") + exec(t, conn, "savepoint b") + exec(t, conn, "rollback to b") + exec(t, conn, "release savepoint b") + exec(t, conn, "savepoint b") + exec(t, conn, "insert into t1(id1, id2) values(1,1)") // -80 + exec(t, conn, "savepoint c") + exec(t, conn, "insert into t1(id1, id2) values(4,4)") // 80- + exec(t, conn, "savepoint d") + exec(t, conn, "insert into t1(id1, id2) values(2,2)") // -80 + exec(t, conn, "savepoint e") + + // Validate all the data. + exec(t, conn, "use `ks:-80`") + assertMatches(t, conn, "select id1 from t1 order by id1", `[[INT64(1)] [INT64(2)]]`) + exec(t, conn, "use `ks:80-`") + assertMatches(t, conn, "select id1 from t1 order by id1", `[[INT64(4)]]`) + exec(t, conn, "use") + assertMatches(t, conn, "select id1 from t1 order by id1", `[[INT64(1)] [INT64(2)] [INT64(4)]]`) + + _, err = conn.ExecuteFetch("rollback work to savepoint a", 1000, true) + require.Error(t, err) + + exec(t, conn, "release savepoint d") + + _, err = conn.ExecuteFetch("rollback to d", 1000, true) + require.Error(t, err) + assertMatches(t, conn, "select id1 from t1 order by id1", `[[INT64(1)] [INT64(2)] [INT64(4)]]`) + + exec(t, conn, "rollback to c") + assertMatches(t, conn, "select id1 from t1 order by id1", `[[INT64(1)]]`) + + exec(t, conn, "insert into t1(id1, id2) values(2,2),(3,3),(4,4)") + assertMatches(t, conn, "select id1 from t1 order by id1", `[[INT64(1)] [INT64(2)] [INT64(3)] [INT64(4)]]`) + + exec(t, conn, "rollback to b") + assertMatches(t, conn, "select id1 from t1 order by id1", `[]`) + + exec(t, conn, "commit") + assertMatches(t, conn, "select id1 from t1 order by id1", `[]`) + + exec(t, conn, "start transaction") + + exec(t, conn, "insert into t1(id1, id2) values(2,2),(3,3),(4,4)") + assertMatches(t, conn, "select id1 from t1 order by id1", `[[INT64(2)] [INT64(3)] [INT64(4)]]`) + + // After previous commit all the savepoints are cleared. + _, err = conn.ExecuteFetch("rollback to b", 1000, true) + require.Error(t, err) + + exec(t, conn, "rollback") + assertMatches(t, conn, "select id1 from t1 order by id1", `[]`) +} + +func TestSavepointOutsideTx(t *testing.T) { + defer cluster.PanicHandler(t) + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn.Close() + + exec(t, conn, "savepoint a") + exec(t, conn, "savepoint b") + + _, err = conn.ExecuteFetch("rollback to b", 1, true) + require.Error(t, err) + _, err = conn.ExecuteFetch("release savepoint a", 1, true) + require.Error(t, err) +} + +func TestSavepointAdditionalCase(t *testing.T) { + defer cluster.PanicHandler(t) + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn.Close() + + exec(t, conn, "start transaction") + exec(t, conn, "savepoint a") + exec(t, conn, "insert into t1(id1, id2) values(1,1)") // -80 + exec(t, conn, "insert into t1(id1, id2) values(2,2),(3,3),(4,4)") // -80 & 80- + assertMatches(t, conn, "select id1 from t1 order by id1", `[[INT64(1)] [INT64(2)] [INT64(3)] [INT64(4)]]`) + + exec(t, conn, "rollback to a") + assertMatches(t, conn, "select id1 from t1 order by id1", `[]`) + + exec(t, conn, "commit") + assertMatches(t, conn, "select id1 from t1 order by id1", `[]`) + + exec(t, conn, "start transaction") + exec(t, conn, "insert into t1(id1, id2) values(1,1)") // -80 + exec(t, conn, "savepoint a") + exec(t, conn, "insert into t1(id1, id2) values(2,2),(3,3)") // -80 + exec(t, conn, "insert into t1(id1, id2) values(4,4)") // 80- + assertMatches(t, conn, "select id1 from t1 order by id1", `[[INT64(1)] [INT64(2)] [INT64(3)] [INT64(4)]]`) + + exec(t, conn, "rollback to a") + assertMatches(t, conn, "select id1 from t1 order by id1", `[[INT64(1)]]`) + + exec(t, conn, "rollback") + assertMatches(t, conn, "select id1 from t1 order by id1", `[]`) +} + func assertMatches(t *testing.T, conn *mysql.Conn, query, expected string) { t.Helper() qr := exec(t, conn, query) diff --git a/go/vt/proto/vtgate/vtgate.pb.go b/go/vt/proto/vtgate/vtgate.pb.go index c73627f3a04..4396ecbb55a 100644 --- a/go/vt/proto/vtgate/vtgate.pb.go +++ b/go/vt/proto/vtgate/vtgate.pb.go @@ -141,7 +141,9 @@ type Session struct { // TODO: systay should we keep this so we can apply it ordered? SystemVariables map[string]string `protobuf:"bytes,14,rep,name=system_variables,json=systemVariables,proto3" json:"system_variables,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` // row_count keeps track of the last seen rows affected for this session - RowCount int64 `protobuf:"varint,15,opt,name=row_count,json=rowCount,proto3" json:"row_count,omitempty"` + RowCount int64 `protobuf:"varint,15,opt,name=row_count,json=rowCount,proto3" json:"row_count,omitempty"` + // savepoints stores all the savepoints call for this session + Savepoints []string `protobuf:"bytes,16,rep,name=savepoints,proto3" json:"savepoints,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -270,6 +272,13 @@ func (m *Session) GetRowCount() int64 { return 0 } +func (m *Session) GetSavepoints() []string { + if m != nil { + return m.Savepoints + } + return nil +} + type Session_ShardSession struct { Target *query.Target `protobuf:"bytes,1,opt,name=target,proto3" json:"target,omitempty"` TransactionId int64 `protobuf:"varint,2,opt,name=transaction_id,json=transactionId,proto3" json:"transaction_id,omitempty"` @@ -979,77 +988,78 @@ func init() { func init() { proto.RegisterFile("vtgate.proto", fileDescriptor_aab96496ceaf1ebb) } var fileDescriptor_aab96496ceaf1ebb = []byte{ - // 1140 bytes of a gzipped FileDescriptorProto + // 1157 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0xdd, 0x6e, 0x1b, 0x45, 0x14, 0xee, 0xfa, 0xdf, 0xc7, 0x7f, 0xcb, 0xd4, 0x2d, 0x5b, 0x53, 0xc0, 0x72, 0x5b, 0xd5, 0x0d, 0xc8, 0x46, 0x41, 0xa0, 0x0a, 0x81, 0x50, 0xe2, 0xb8, 0x95, 0xab, 0x24, 0x0e, 0x63, 0x27, 0x91, - 0x10, 0x68, 0xb5, 0xf1, 0x4e, 0x9c, 0x55, 0x9d, 0x1d, 0x77, 0x66, 0xec, 0xe0, 0xa7, 0xe0, 0x9e, - 0x17, 0xe0, 0x11, 0x78, 0x07, 0xee, 0xb8, 0xe5, 0x69, 0xd0, 0xfc, 0xac, 0xbd, 0x31, 0x81, 0xa6, - 0xa9, 0x72, 0xb3, 0x9a, 0xf3, 0x33, 0x67, 0xcf, 0xf9, 0xbe, 0x73, 0x66, 0x06, 0x8a, 0x73, 0x31, - 0xf6, 0x04, 0x69, 0x4d, 0x19, 0x15, 0x14, 0x65, 0xb4, 0x54, 0xb3, 0x4f, 0x82, 0x70, 0x42, 0xc7, - 0xbe, 0x27, 0x3c, 0x6d, 0xa9, 0x15, 0xde, 0xcc, 0x08, 0x5b, 0x18, 0xa1, 0x2c, 0xe8, 0x94, 0xc6, - 0x8d, 0x73, 0xc1, 0xa6, 0x23, 0x2d, 0x34, 0xfe, 0xce, 0x41, 0x76, 0x40, 0x38, 0x0f, 0x68, 0x88, - 0x9e, 0x40, 0x39, 0x08, 0x5d, 0xc1, 0xbc, 0x90, 0x7b, 0x23, 0x11, 0xd0, 0xd0, 0xb1, 0xea, 0x56, - 0x33, 0x87, 0x4b, 0x41, 0x38, 0x5c, 0x29, 0x51, 0x07, 0xca, 0xfc, 0xcc, 0x63, 0xbe, 0xcb, 0xf5, - 0x3e, 0xee, 0x24, 0xea, 0xc9, 0x66, 0x61, 0xf3, 0x61, 0xcb, 0x64, 0x67, 0xe2, 0xb5, 0x06, 0xd2, - 0xcb, 0x08, 0xb8, 0xc4, 0x63, 0x12, 0x47, 0x9f, 0x00, 0x78, 0x33, 0x41, 0x47, 0xf4, 0xfc, 0x3c, - 0x10, 0x4e, 0x4a, 0xfd, 0x27, 0xa6, 0x41, 0x8f, 0xa0, 0x24, 0x3c, 0x36, 0x26, 0xc2, 0xe5, 0x82, - 0x05, 0xe1, 0xd8, 0x49, 0xd7, 0xad, 0x66, 0x1e, 0x17, 0xb5, 0x72, 0xa0, 0x74, 0xa8, 0x0d, 0x59, - 0x3a, 0x15, 0x2a, 0x85, 0x4c, 0xdd, 0x6a, 0x16, 0x36, 0xef, 0xb5, 0x74, 0xe1, 0xdd, 0x5f, 0xc8, - 0x68, 0x26, 0x48, 0x5f, 0x1b, 0x71, 0xe4, 0x85, 0xb6, 0xc1, 0x8e, 0x95, 0xe7, 0x9e, 0x53, 0x9f, - 0x38, 0xd9, 0xba, 0xd5, 0x2c, 0x6f, 0x7e, 0x18, 0x25, 0x1f, 0xab, 0x74, 0x8f, 0xfa, 0x04, 0x57, - 0xc4, 0x65, 0x05, 0x6a, 0x43, 0xee, 0xc2, 0x63, 0x61, 0x10, 0x8e, 0xb9, 0x93, 0x53, 0x85, 0xdf, - 0x35, 0x7f, 0xfd, 0x41, 0x7e, 0x8f, 0xb5, 0x0d, 0x2f, 0x9d, 0xd0, 0xf7, 0x50, 0x9c, 0x32, 0xb2, - 0x42, 0x2b, 0x7f, 0x0d, 0xb4, 0x0a, 0x53, 0x46, 0x96, 0x58, 0x6d, 0x41, 0x69, 0x4a, 0xb9, 0x58, - 0x45, 0x80, 0x6b, 0x44, 0x28, 0xca, 0x2d, 0xcb, 0x10, 0x8f, 0xa1, 0x3c, 0xf1, 0xb8, 0x70, 0x83, - 0x90, 0x13, 0x26, 0xdc, 0xc0, 0x77, 0x0a, 0x75, 0xab, 0x99, 0xc2, 0x45, 0xa9, 0xed, 0x29, 0x65, - 0xcf, 0x47, 0x1f, 0x03, 0x9c, 0xd2, 0x59, 0xe8, 0xbb, 0x8c, 0x5e, 0x70, 0xa7, 0xa8, 0x3c, 0xf2, - 0x4a, 0x83, 0xe9, 0x05, 0x47, 0x2e, 0xdc, 0x9f, 0x71, 0xc2, 0x5c, 0x9f, 0x9c, 0x06, 0x21, 0xf1, - 0xdd, 0xb9, 0xc7, 0x02, 0xef, 0x64, 0x42, 0xb8, 0x53, 0x52, 0x09, 0x3d, 0x5b, 0x4f, 0xe8, 0x90, - 0x13, 0xb6, 0xa3, 0x9d, 0x8f, 0x22, 0xdf, 0x6e, 0x28, 0xd8, 0x02, 0x57, 0x67, 0x57, 0x98, 0x50, - 0x1f, 0x6c, 0xbe, 0xe0, 0x82, 0x9c, 0xc7, 0x42, 0x97, 0x55, 0xe8, 0xc7, 0xff, 0xaa, 0x55, 0xf9, - 0xad, 0x45, 0xad, 0xf0, 0xcb, 0x5a, 0xf4, 0x11, 0xe4, 0x19, 0xbd, 0x70, 0x47, 0x74, 0x16, 0x0a, - 0xa7, 0x52, 0xb7, 0x9a, 0x49, 0x9c, 0x63, 0xf4, 0xa2, 0x23, 0xe5, 0xda, 0x1f, 0x16, 0x14, 0xe3, - 0x90, 0xa1, 0x27, 0x90, 0xd1, 0xed, 0xa5, 0xfa, 0xbe, 0xb0, 0x59, 0x32, 0xbc, 0x0e, 0x95, 0x12, - 0x1b, 0xa3, 0x1c, 0x93, 0x78, 0x13, 0x05, 0xbe, 0x93, 0x50, 0x91, 0x4b, 0x31, 0x6d, 0xcf, 0x47, - 0xcf, 0xa1, 0x28, 0x64, 0x16, 0xc2, 0xf5, 0x26, 0x81, 0xc7, 0x9d, 0xa4, 0xe9, 0xd0, 0xe5, 0x34, - 0x0e, 0x95, 0x75, 0x4b, 0x1a, 0x71, 0x41, 0xac, 0x04, 0xf4, 0x29, 0x14, 0x18, 0xe1, 0x84, 0xcd, - 0x89, 0x2f, 0xa3, 0xa7, 0x54, 0x74, 0x88, 0x54, 0x3d, 0xbf, 0xf6, 0x13, 0x3c, 0xf8, 0x4f, 0x68, - 0x91, 0x0d, 0xc9, 0xd7, 0x64, 0xa1, 0x4a, 0xc8, 0x63, 0xb9, 0x44, 0xcf, 0x20, 0x3d, 0xf7, 0x26, - 0x33, 0xa2, 0xf2, 0x5c, 0xb5, 0xeb, 0x76, 0x10, 0x2e, 0xf7, 0x62, 0xed, 0xf1, 0x4d, 0xe2, 0xb9, - 0x55, 0xdb, 0x86, 0xea, 0x55, 0xe8, 0x5e, 0x11, 0xb8, 0x1a, 0x0f, 0x9c, 0x8f, 0xc5, 0x78, 0x95, - 0xca, 0x25, 0xed, 0x54, 0xe3, 0xf7, 0x04, 0x94, 0xcd, 0x28, 0x62, 0xf2, 0x66, 0x46, 0xb8, 0x40, - 0x9f, 0x43, 0x7e, 0xe4, 0x4d, 0x26, 0x84, 0xc9, 0xca, 0x34, 0xcc, 0x95, 0x96, 0x3e, 0x90, 0x3a, - 0x4a, 0xdf, 0xdb, 0xc1, 0x39, 0xed, 0xd1, 0xf3, 0xd1, 0x33, 0xc8, 0x9a, 0xa6, 0x37, 0xb9, 0x57, - 0xd6, 0xfa, 0x00, 0x47, 0x76, 0xf4, 0x14, 0xd2, 0xaa, 0x2c, 0x83, 0xf3, 0x07, 0x51, 0x91, 0xb2, - 0x7b, 0xd5, 0x60, 0x62, 0x6d, 0x47, 0x5f, 0x81, 0x01, 0xdb, 0x15, 0x8b, 0x29, 0x51, 0xe8, 0x96, - 0x37, 0xab, 0xeb, 0xb4, 0x0c, 0x17, 0x53, 0x82, 0x41, 0x2c, 0xd7, 0x92, 0xf5, 0xd7, 0x64, 0xc1, - 0xa7, 0xde, 0x88, 0xb8, 0xea, 0x28, 0x53, 0x47, 0x4e, 0x1e, 0x97, 0x22, 0xad, 0x6a, 0xa5, 0xf8, - 0x91, 0x94, 0xbd, 0xce, 0x91, 0xf4, 0x2a, 0x95, 0x4b, 0xdb, 0x99, 0xc6, 0xaf, 0x16, 0x54, 0x96, - 0x48, 0xf1, 0x29, 0x0d, 0xb9, 0xfc, 0x63, 0x9a, 0x30, 0x46, 0xd9, 0x1a, 0x4c, 0xf8, 0xa0, 0xd3, - 0x95, 0x6a, 0xac, 0xad, 0xef, 0x82, 0xd1, 0x06, 0x64, 0x18, 0xe1, 0xb3, 0x89, 0x30, 0x20, 0xa1, - 0xf8, 0xc1, 0x85, 0x95, 0x05, 0x1b, 0x8f, 0xc6, 0x5f, 0x09, 0xb8, 0x6b, 0x32, 0xda, 0xf6, 0xc4, - 0xe8, 0xec, 0xd6, 0x09, 0xfc, 0x0c, 0xb2, 0x32, 0x9b, 0x80, 0xc8, 0x51, 0x49, 0x5e, 0x4d, 0x61, - 0xe4, 0xf1, 0x1e, 0x24, 0x7a, 0xfc, 0xd2, 0x0d, 0x97, 0xd6, 0x37, 0x9c, 0xc7, 0xe3, 0x37, 0xdc, - 0x2d, 0x71, 0xdd, 0xf8, 0xcd, 0x82, 0xea, 0x65, 0x4c, 0x6f, 0x8d, 0xea, 0x2f, 0x20, 0xab, 0x89, - 0x8c, 0xd0, 0xbc, 0x6f, 0x72, 0xd3, 0x34, 0x1f, 0x07, 0xe2, 0x4c, 0x87, 0x8e, 0xdc, 0xe4, 0xb0, - 0x56, 0x07, 0x82, 0x11, 0xef, 0xfc, 0xbd, 0x46, 0x76, 0x39, 0x87, 0x89, 0x77, 0x9b, 0xc3, 0xe4, - 0x8d, 0xe7, 0x30, 0xf5, 0x16, 0x6e, 0xd2, 0xd7, 0x7a, 0x1a, 0xc4, 0xb0, 0xcd, 0xfc, 0x3f, 0xb6, - 0x8d, 0x0e, 0xdc, 0x5b, 0x03, 0xca, 0xd0, 0xb8, 0x9a, 0x2f, 0xeb, 0xad, 0xf3, 0xf5, 0x33, 0x3c, - 0xc0, 0x84, 0xd3, 0xc9, 0x9c, 0xc4, 0x3a, 0xef, 0x66, 0x90, 0x23, 0x48, 0xf9, 0xc2, 0x5c, 0x43, - 0x79, 0xac, 0xd6, 0x8d, 0x87, 0x50, 0xbb, 0x2a, 0xbc, 0x4e, 0xb4, 0xf1, 0xa7, 0x05, 0xe5, 0x23, - 0x5d, 0xc3, 0xcd, 0x7e, 0xb9, 0x46, 0x5e, 0xe2, 0x9a, 0xe4, 0x3d, 0x85, 0xf4, 0x7c, 0x2c, 0x53, - 0x8d, 0x0e, 0xe9, 0xd8, 0xcb, 0xf5, 0xe8, 0xa5, 0x08, 0x7c, 0xac, 0xed, 0x12, 0xc9, 0xd3, 0x60, - 0x22, 0x08, 0x53, 0xec, 0x4a, 0x24, 0x63, 0x9e, 0x2f, 0x94, 0x05, 0x1b, 0x8f, 0xc6, 0x77, 0x50, - 0x59, 0xd6, 0xb2, 0x22, 0x82, 0xcc, 0x49, 0x28, 0xb8, 0x63, 0xa9, 0xe6, 0xbf, 0xb4, 0xfd, 0xa8, - 0x2b, 0x4d, 0xd8, 0x78, 0x6c, 0xec, 0x40, 0x65, 0xed, 0xcd, 0x87, 0x2a, 0x50, 0x38, 0xdc, 0x1f, - 0x1c, 0x74, 0x3b, 0xbd, 0x17, 0xbd, 0xee, 0x8e, 0x7d, 0x07, 0x01, 0x64, 0x06, 0xbd, 0xfd, 0x97, - 0xbb, 0x5d, 0xdb, 0x42, 0x79, 0x48, 0xef, 0x1d, 0xee, 0x0e, 0x7b, 0x76, 0x42, 0x2e, 0x87, 0xc7, - 0xfd, 0x83, 0x8e, 0x9d, 0xdc, 0xf8, 0x16, 0x0a, 0x1d, 0xf5, 0x72, 0xed, 0x33, 0x9f, 0x30, 0xb9, - 0x61, 0xbf, 0x8f, 0xf7, 0xb6, 0x76, 0xed, 0x3b, 0x28, 0x0b, 0xc9, 0x03, 0x2c, 0x77, 0xe6, 0x20, - 0x75, 0xd0, 0x1f, 0x0c, 0xed, 0x04, 0x2a, 0x03, 0x6c, 0x1d, 0x0e, 0xfb, 0x9d, 0xfe, 0xde, 0x5e, - 0x6f, 0x68, 0x27, 0xb7, 0xbf, 0x86, 0x4a, 0x40, 0x5b, 0xf3, 0x40, 0x10, 0xce, 0xf5, 0xc3, 0xfc, - 0xc7, 0x47, 0x46, 0x0a, 0x68, 0x5b, 0xaf, 0xda, 0x63, 0xda, 0x9e, 0x8b, 0xb6, 0xb2, 0xb6, 0x75, - 0x6b, 0x9e, 0x64, 0x94, 0xf4, 0xe5, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x57, 0xac, 0xe8, 0xa9, - 0x18, 0x0c, 0x00, 0x00, + 0x10, 0x68, 0xb5, 0xf1, 0x4e, 0x9c, 0x51, 0x9d, 0x9d, 0xed, 0xcc, 0xd8, 0xc1, 0x4f, 0xc1, 0x3d, + 0x2f, 0xc0, 0x23, 0xf0, 0x0e, 0xdc, 0xf1, 0x3a, 0x5c, 0xa1, 0x99, 0x59, 0xdb, 0x1b, 0x13, 0x68, + 0x9a, 0x2a, 0x37, 0xab, 0x3d, 0x3f, 0x73, 0xe6, 0x9c, 0xef, 0x3b, 0x67, 0x66, 0xa0, 0x38, 0x93, + 0x63, 0x4f, 0x92, 0x56, 0xc8, 0x99, 0x64, 0x28, 0x63, 0xa4, 0x9a, 0x7d, 0x42, 0x83, 0x09, 0x1b, + 0xfb, 0x9e, 0xf4, 0x8c, 0xa5, 0x56, 0x78, 0x33, 0x25, 0x7c, 0x1e, 0x09, 0x65, 0xc9, 0x42, 0x16, + 0x37, 0xce, 0x24, 0x0f, 0x47, 0x46, 0x68, 0xfc, 0x9d, 0x83, 0xec, 0x80, 0x08, 0x41, 0x59, 0x80, + 0x9e, 0x40, 0x99, 0x06, 0xae, 0xe4, 0x5e, 0x20, 0xbc, 0x91, 0xa4, 0x2c, 0x70, 0xac, 0xba, 0xd5, + 0xcc, 0xe1, 0x12, 0x0d, 0x86, 0x2b, 0x25, 0xea, 0x40, 0x59, 0x9c, 0x79, 0xdc, 0x77, 0x85, 0x59, + 0x27, 0x9c, 0x44, 0x3d, 0xd9, 0x2c, 0x6c, 0x3e, 0x6c, 0x45, 0xd9, 0x45, 0xf1, 0x5a, 0x03, 0xe5, + 0x15, 0x09, 0xb8, 0x24, 0x62, 0x92, 0x40, 0x9f, 0x00, 0x78, 0x53, 0xc9, 0x46, 0xec, 0xfc, 0x9c, + 0x4a, 0x27, 0xa5, 0xf7, 0x89, 0x69, 0xd0, 0x23, 0x28, 0x49, 0x8f, 0x8f, 0x89, 0x74, 0x85, 0xe4, + 0x34, 0x18, 0x3b, 0xe9, 0xba, 0xd5, 0xcc, 0xe3, 0xa2, 0x51, 0x0e, 0xb4, 0x0e, 0xb5, 0x21, 0xcb, + 0x42, 0xa9, 0x53, 0xc8, 0xd4, 0xad, 0x66, 0x61, 0xf3, 0x5e, 0xcb, 0x14, 0xde, 0xfd, 0x85, 0x8c, + 0xa6, 0x92, 0xf4, 0x8d, 0x11, 0x2f, 0xbc, 0xd0, 0x36, 0xd8, 0xb1, 0xf2, 0xdc, 0x73, 0xe6, 0x13, + 0x27, 0x5b, 0xb7, 0x9a, 0xe5, 0xcd, 0x0f, 0x17, 0xc9, 0xc7, 0x2a, 0xdd, 0x63, 0x3e, 0xc1, 0x15, + 0x79, 0x59, 0x81, 0xda, 0x90, 0xbb, 0xf0, 0x78, 0x40, 0x83, 0xb1, 0x70, 0x72, 0xba, 0xf0, 0xbb, + 0xd1, 0xae, 0x3f, 0xa8, 0xef, 0xb1, 0xb1, 0xe1, 0xa5, 0x13, 0xfa, 0x1e, 0x8a, 0x21, 0x27, 0x2b, + 0xb4, 0xf2, 0xd7, 0x40, 0xab, 0x10, 0x72, 0xb2, 0xc4, 0x6a, 0x0b, 0x4a, 0x21, 0x13, 0x72, 0x15, + 0x01, 0xae, 0x11, 0xa1, 0xa8, 0x96, 0x2c, 0x43, 0x3c, 0x86, 0xf2, 0xc4, 0x13, 0xd2, 0xa5, 0x81, + 0x20, 0x5c, 0xba, 0xd4, 0x77, 0x0a, 0x75, 0xab, 0x99, 0xc2, 0x45, 0xa5, 0xed, 0x69, 0x65, 0xcf, + 0x47, 0x1f, 0x03, 0x9c, 0xb2, 0x69, 0xe0, 0xbb, 0x9c, 0x5d, 0x08, 0xa7, 0xa8, 0x3d, 0xf2, 0x5a, + 0x83, 0xd9, 0x85, 0x40, 0x2e, 0xdc, 0x9f, 0x0a, 0xc2, 0x5d, 0x9f, 0x9c, 0xd2, 0x80, 0xf8, 0xee, + 0xcc, 0xe3, 0xd4, 0x3b, 0x99, 0x10, 0xe1, 0x94, 0x74, 0x42, 0xcf, 0xd6, 0x13, 0x3a, 0x14, 0x84, + 0xef, 0x18, 0xe7, 0xa3, 0x85, 0x6f, 0x37, 0x90, 0x7c, 0x8e, 0xab, 0xd3, 0x2b, 0x4c, 0xa8, 0x0f, + 0xb6, 0x98, 0x0b, 0x49, 0xce, 0x63, 0xa1, 0xcb, 0x3a, 0xf4, 0xe3, 0x7f, 0xd5, 0xaa, 0xfd, 0xd6, + 0xa2, 0x56, 0xc4, 0x65, 0x2d, 0xfa, 0x08, 0xf2, 0x9c, 0x5d, 0xb8, 0x23, 0x36, 0x0d, 0xa4, 0x53, + 0xa9, 0x5b, 0xcd, 0x24, 0xce, 0x71, 0x76, 0xd1, 0x51, 0xb2, 0x6a, 0x41, 0xe1, 0xcd, 0x48, 0xc8, + 0x68, 0x20, 0x85, 0x63, 0xd7, 0x93, 0xcd, 0x3c, 0x8e, 0x69, 0x6a, 0x7f, 0x58, 0x50, 0x8c, 0x43, + 0x8a, 0x9e, 0x40, 0xc6, 0xb4, 0x9f, 0x9e, 0x8b, 0xc2, 0x66, 0x29, 0xe2, 0x7d, 0xa8, 0x95, 0x38, + 0x32, 0xaa, 0x31, 0x8a, 0x37, 0x19, 0xf5, 0x9d, 0x84, 0xde, 0xb9, 0x14, 0xd3, 0xf6, 0x7c, 0xf4, + 0x1c, 0x8a, 0x52, 0x65, 0x29, 0x5d, 0x6f, 0x42, 0x3d, 0xe1, 0x24, 0xa3, 0x0e, 0x5e, 0x4e, 0xeb, + 0x50, 0x5b, 0xb7, 0x94, 0x11, 0x17, 0xe4, 0x4a, 0x40, 0x9f, 0x42, 0x81, 0x13, 0x41, 0xf8, 0x8c, + 0xf8, 0x2a, 0x7a, 0x4a, 0x47, 0x87, 0x85, 0xaa, 0xe7, 0xd7, 0x7e, 0x82, 0x07, 0xff, 0x09, 0x3d, + 0xb2, 0x21, 0xf9, 0x9a, 0xcc, 0x75, 0x09, 0x79, 0xac, 0x7e, 0xd1, 0x33, 0x48, 0xcf, 0xbc, 0xc9, + 0x94, 0xe8, 0x3c, 0x57, 0xed, 0xbc, 0x4d, 0x83, 0xe5, 0x5a, 0x6c, 0x3c, 0xbe, 0x49, 0x3c, 0xb7, + 0x6a, 0xdb, 0x50, 0xbd, 0x0a, 0xfd, 0x2b, 0x02, 0x57, 0xe3, 0x81, 0xf3, 0xb1, 0x18, 0xaf, 0x52, + 0xb9, 0xa4, 0x9d, 0x6a, 0xfc, 0x9e, 0x80, 0x72, 0x34, 0xaa, 0x98, 0xbc, 0x99, 0x12, 0x21, 0xd1, + 0xe7, 0x90, 0x1f, 0x79, 0x93, 0x09, 0xe1, 0xaa, 0x32, 0x03, 0x73, 0xa5, 0x65, 0x0e, 0xac, 0x8e, + 0xd6, 0xf7, 0x76, 0x70, 0xce, 0x78, 0xf4, 0x7c, 0xf4, 0x0c, 0xb2, 0xd1, 0x50, 0x44, 0xb9, 0x57, + 0xd6, 0xfa, 0x04, 0x2f, 0xec, 0xe8, 0x29, 0xa4, 0x75, 0x59, 0x11, 0xce, 0x1f, 0x2c, 0x8a, 0x54, + 0xdd, 0xad, 0x07, 0x17, 0x1b, 0x3b, 0xfa, 0x0a, 0x22, 0xb0, 0x5d, 0x39, 0x0f, 0x89, 0x46, 0xb7, + 0xbc, 0x59, 0x5d, 0xa7, 0x65, 0x38, 0x0f, 0x09, 0x06, 0xb9, 0xfc, 0x57, 0xac, 0xbf, 0x26, 0x73, + 0x11, 0x7a, 0x23, 0xe2, 0xea, 0xa3, 0x4e, 0x1f, 0x49, 0x79, 0x5c, 0x5a, 0x68, 0x75, 0x2b, 0xc5, + 0x8f, 0xac, 0xec, 0x75, 0x8e, 0xac, 0x57, 0xa9, 0x5c, 0xda, 0xce, 0x34, 0x7e, 0xb5, 0xa0, 0xb2, + 0x44, 0x4a, 0x84, 0x2c, 0x10, 0x6a, 0xc7, 0x34, 0xe1, 0x9c, 0xf1, 0x35, 0x98, 0xf0, 0x41, 0xa7, + 0xab, 0xd4, 0xd8, 0x58, 0xdf, 0x05, 0xa3, 0x0d, 0xc8, 0x70, 0x22, 0xa6, 0x13, 0x19, 0x81, 0x84, + 0xe2, 0x07, 0x1b, 0xd6, 0x16, 0x1c, 0x79, 0x34, 0xfe, 0x4a, 0xc0, 0xdd, 0x28, 0xa3, 0x6d, 0x4f, + 0x8e, 0xce, 0x6e, 0x9d, 0xc0, 0xcf, 0x20, 0xab, 0xb2, 0xa1, 0x44, 0x8d, 0x4a, 0xf2, 0x6a, 0x0a, + 0x17, 0x1e, 0xef, 0x41, 0xa2, 0x27, 0x2e, 0xdd, 0x80, 0x69, 0x73, 0x03, 0x7a, 0x22, 0x7e, 0x03, + 0xde, 0x12, 0xd7, 0x8d, 0xdf, 0x2c, 0xa8, 0x5e, 0xc6, 0xf4, 0xd6, 0xa8, 0xfe, 0x02, 0xb2, 0x86, + 0xc8, 0x05, 0x9a, 0xf7, 0xa3, 0xdc, 0x0c, 0xcd, 0xc7, 0x54, 0x9e, 0x99, 0xd0, 0x0b, 0x37, 0x35, + 0xac, 0xd5, 0x81, 0xe4, 0xc4, 0x3b, 0x7f, 0xaf, 0x91, 0x5d, 0xce, 0x61, 0xe2, 0xdd, 0xe6, 0x30, + 0x79, 0xe3, 0x39, 0x4c, 0xbd, 0x85, 0x9b, 0xf4, 0xb5, 0x9e, 0x0e, 0x31, 0x6c, 0x33, 0xff, 0x8f, + 0x6d, 0xa3, 0x03, 0xf7, 0xd6, 0x80, 0x8a, 0x68, 0x5c, 0xcd, 0x97, 0xf5, 0xd6, 0xf9, 0xfa, 0x19, + 0x1e, 0x60, 0x22, 0xd8, 0x64, 0x46, 0x62, 0x9d, 0x77, 0x33, 0xc8, 0x11, 0xa4, 0x7c, 0x19, 0x5d, + 0x43, 0x79, 0xac, 0xff, 0x1b, 0x0f, 0xa1, 0x76, 0x55, 0x78, 0x93, 0x68, 0xe3, 0x4f, 0x0b, 0xca, + 0x47, 0xa6, 0x86, 0x9b, 0x6d, 0xb9, 0x46, 0x5e, 0xe2, 0x9a, 0xe4, 0x3d, 0x85, 0xf4, 0x6c, 0xac, + 0x52, 0x5d, 0x1c, 0xd2, 0xb1, 0x97, 0xed, 0xd1, 0x4b, 0x49, 0x7d, 0x6c, 0xec, 0x0a, 0xc9, 0x53, + 0x3a, 0x91, 0x84, 0x6b, 0x76, 0x15, 0x92, 0x31, 0xcf, 0x17, 0xda, 0x82, 0x23, 0x8f, 0xc6, 0x77, + 0x50, 0x59, 0xd6, 0xb2, 0x22, 0x82, 0xcc, 0x88, 0xba, 0xf6, 0x2d, 0xdd, 0xfc, 0x97, 0x96, 0x1f, + 0x75, 0x95, 0x09, 0x47, 0x1e, 0x1b, 0x3b, 0x50, 0x59, 0x7b, 0x13, 0xa2, 0x0a, 0x14, 0x0e, 0xf7, + 0x07, 0x07, 0xdd, 0x4e, 0xef, 0x45, 0xaf, 0xbb, 0x63, 0xdf, 0x41, 0x00, 0x99, 0x41, 0x6f, 0xff, + 0xe5, 0x6e, 0xd7, 0xb6, 0x50, 0x1e, 0xd2, 0x7b, 0x87, 0xbb, 0xc3, 0x9e, 0x9d, 0x50, 0xbf, 0xc3, + 0xe3, 0xfe, 0x41, 0xc7, 0x4e, 0x6e, 0x7c, 0x0b, 0x85, 0x8e, 0x7e, 0xd9, 0xf6, 0xb9, 0x4f, 0xb8, + 0x5a, 0xb0, 0xdf, 0xc7, 0x7b, 0x5b, 0xbb, 0xf6, 0x1d, 0x94, 0x85, 0xe4, 0x01, 0x56, 0x2b, 0x73, + 0x90, 0x3a, 0xe8, 0x0f, 0x86, 0x76, 0x02, 0x95, 0x01, 0xb6, 0x0e, 0x87, 0xfd, 0x4e, 0x7f, 0x6f, + 0xaf, 0x37, 0xb4, 0x93, 0xdb, 0x5f, 0x43, 0x85, 0xb2, 0xd6, 0x8c, 0x4a, 0x22, 0x84, 0x79, 0xb8, + 0xff, 0xf8, 0x28, 0x92, 0x28, 0x6b, 0x9b, 0xbf, 0xf6, 0x98, 0xb5, 0x67, 0xb2, 0xad, 0xad, 0x6d, + 0xd3, 0x9a, 0x27, 0x19, 0x2d, 0x7d, 0xf9, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x5f, 0xb1, 0x56, + 0x1a, 0x38, 0x0c, 0x00, 0x00, } diff --git a/go/vt/sqlparser/analyzer.go b/go/vt/sqlparser/analyzer.go index c66619c0738..a582b00485e 100644 --- a/go/vt/sqlparser/analyzer.go +++ b/go/vt/sqlparser/analyzer.go @@ -53,6 +53,9 @@ const ( StmtComment StmtPriv StmtExplain + StmtSavepoint + StmtSRollback + StmtRelease ) //ASTToStatementType returns a StatementType from an AST stmt @@ -84,6 +87,12 @@ func ASTToStatementType(stmt Statement) StatementType { return StmtCommit case *Rollback: return StmtRollback + case *Savepoint: + return StmtSavepoint + case *SRollback: + return StmtSRollback + case *Release: + return StmtRelease default: return StmtUnknown } @@ -167,6 +176,12 @@ func Preview(sql string) StatementType { return StmtOther case "grant", "revoke": return StmtPriv + case "savepoint": + return StmtSavepoint + case "release": + return StmtRelease + case "rollback": + return StmtSRollback } return StmtUnknown } @@ -205,6 +220,12 @@ func (s StatementType) String() string { return "PRIV" case StmtExplain: return "EXPLAIN" + case StmtSavepoint: + return "SAVEPOINT" + case StmtSRollback: + return "SAVEPOINT_ROLLBACK" + case StmtRelease: + return "RELEASE" default: return "UNKNOWN" } diff --git a/go/vt/srvtopo/resolver.go b/go/vt/srvtopo/resolver.go index 1d25975cf16..7c0efc7d5a2 100644 --- a/go/vt/srvtopo/resolver.go +++ b/go/vt/srvtopo/resolver.go @@ -247,3 +247,8 @@ func ValuesEqual(vss1, vss2 [][]*querypb.Value) bool { } return true } + +// GetGateway returns the used gateway +func (r *Resolver) GetGateway() Gateway { + return r.gateway +} diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index bd037086539..2e2cc92ff10 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -304,6 +304,44 @@ func (e *Executor) handleRollback(ctx context.Context, safeSession *SafeSession, return &sqltypes.Result{}, err } +func (e *Executor) handleSavepoint(ctx context.Context, safeSession *SafeSession, sql string, planType string, logStats *LogStats, nonTxResponse func(query string) (*sqltypes.Result, error)) (*sqltypes.Result, error) { + execStart := time.Now() + logStats.PlanTime = execStart.Sub(logStats.StartTime) + logStats.ShardQueries = uint32(len(safeSession.ShardSessions)) + e.updateQueryCounts(planType, "", "", int64(logStats.ShardQueries)) + defer func() { + logStats.ExecuteTime = time.Since(execStart) + }() + + if len(safeSession.ShardSessions) == 0 { + if safeSession.InTransaction() { + // Storing, as this needs to be executed just after starting transaction on the shard. + safeSession.StoreSavepoint(sql) + return &sqltypes.Result{}, nil + } + return nonTxResponse(sql) + } + var rss []*srvtopo.ResolvedShard + for _, shardSession := range safeSession.ShardSessions { + rss = append(rss, &srvtopo.ResolvedShard{ + Target: shardSession.Target, + Gateway: e.resolver.resolver.GetGateway(), + }) + } + queries := make([]*querypb.BoundQuery, len(rss)) + for i := range rss { + queries[i] = &querypb.BoundQuery{Sql: sql} + } + + qr, errs := e.ExecuteMultiShard(ctx, rss, queries, safeSession, false, false) + err := vterrors.Aggregate(errs) + if err != nil { + return nil, err + } + safeSession.StoreSavepoint(sql) + return qr, nil +} + // CloseSession closes the current transaction, if any. It is called both for explicit "rollback" // statements and implicitly when the mysql server closes the connection. func (e *Executor) CloseSession(ctx context.Context, safeSession *SafeSession) error { diff --git a/go/vt/vtgate/executor_framework_test.go b/go/vt/vtgate/executor_framework_test.go index 344c6688885..d8e87ddf41a 100644 --- a/go/vt/vtgate/executor_framework_test.go +++ b/go/vt/vtgate/executor_framework_test.go @@ -532,7 +532,9 @@ func testQueryLog(t *testing.T, logChan chan interface{}, method, stmtType, sql // fields[9] is ExecuteTime which is not set for certain statements SET, // BEGIN, COMMIT, ROLLBACK, etc - if stmtType != "BEGIN" && stmtType != "COMMIT" && stmtType != "ROLLBACK" && stmtType != "SET" { + switch stmtType { + case "BEGIN", "COMMIT", "ROLLBACK", "SET", "SAVEPOINT", "SAVEPOINT_ROLLBACK", "RELEASE": + default: testNonZeroDuration(t, "ExecuteTime", fields[9]) } diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index afffc780639..3624ff18127 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -2148,6 +2148,135 @@ func TestExecutorOtherAdmin(t *testing.T) { } } +func TestExecutorSavepointInTx(t *testing.T) { + executor, sbc1, sbc2, _ := createExecutorEnv() + logChan := QueryLogger.Subscribe("TestExecutorSavepoint") + defer QueryLogger.Unsubscribe(logChan) + + session := NewSafeSession(&vtgatepb.Session{Autocommit: false, TargetString: "@master"}) + _, err := exec(executor, session, "savepoint a") + require.NoError(t, err) + _, err = exec(executor, session, "rollback to a") + require.NoError(t, err) + _, err = exec(executor, session, "release savepoint a") + require.NoError(t, err) + _, err = exec(executor, session, "select id from user where id = 1") + require.NoError(t, err) + _, err = exec(executor, session, "savepoint b") + require.NoError(t, err) + _, err = exec(executor, session, "rollback to b") + require.NoError(t, err) + _, err = exec(executor, session, "release savepoint b") + require.NoError(t, err) + _, err = exec(executor, session, "select id from user where id = 3") + require.NoError(t, err) + _, err = exec(executor, session, "rollback") + require.NoError(t, err) + sbc1WantQueries := []*querypb.BoundQuery{{ + Sql: "savepoint a", + BindVariables: map[string]*querypb.BindVariable{}, + }, { + Sql: "rollback to a", + BindVariables: map[string]*querypb.BindVariable{}, + }, { + Sql: "release savepoint a", + BindVariables: map[string]*querypb.BindVariable{}, + }, { + Sql: "select id from user where id = 1", + BindVariables: map[string]*querypb.BindVariable{}, + }, { + Sql: "savepoint b", + BindVariables: map[string]*querypb.BindVariable{}, + }, { + Sql: "rollback to b", + BindVariables: map[string]*querypb.BindVariable{}, + }, { + Sql: "release savepoint b", + BindVariables: map[string]*querypb.BindVariable{}, + }} + + sbc2WantQueries := []*querypb.BoundQuery{{ + Sql: "savepoint a", + BindVariables: map[string]*querypb.BindVariable{}, + }, { + Sql: "rollback to a", + BindVariables: map[string]*querypb.BindVariable{}, + }, { + Sql: "release savepoint a", + BindVariables: map[string]*querypb.BindVariable{}, + }, { + Sql: "savepoint b", + BindVariables: map[string]*querypb.BindVariable{}, + }, { + Sql: "rollback to b", + BindVariables: map[string]*querypb.BindVariable{}, + }, { + Sql: "release savepoint b", + BindVariables: map[string]*querypb.BindVariable{}, + }, { + Sql: "select id from user where id = 3", + BindVariables: map[string]*querypb.BindVariable{}, + }} + utils.MustMatch(t, sbc1WantQueries, sbc1.Queries, "") + utils.MustMatch(t, sbc2WantQueries, sbc2.Queries, "") + testQueryLog(t, logChan, "TestExecute", "SAVEPOINT", "savepoint a", 0) + testQueryLog(t, logChan, "TestExecute", "SAVEPOINT_ROLLBACK", "rollback to a", 0) + testQueryLog(t, logChan, "TestExecute", "RELEASE", "release savepoint a", 0) + testQueryLog(t, logChan, "TestExecute", "SELECT", "select id from user where id = 1", 1) + testQueryLog(t, logChan, "TestExecute", "SAVEPOINT", "savepoint b", 1) + testQueryLog(t, logChan, "TestExecute", "SAVEPOINT_ROLLBACK", "rollback to b", 1) + testQueryLog(t, logChan, "TestExecute", "RELEASE", "release savepoint b", 1) + testQueryLog(t, logChan, "TestExecute", "SELECT", "select id from user where id = 3", 1) + testQueryLog(t, logChan, "TestExecute", "ROLLBACK", "rollback", 2) +} + +func TestExecutorSavepointWithoutTx(t *testing.T) { + executor, sbc1, sbc2, _ := createExecutorEnv() + logChan := QueryLogger.Subscribe("TestExecutorSavepoint") + defer QueryLogger.Unsubscribe(logChan) + + session := NewSafeSession(&vtgatepb.Session{Autocommit: true, TargetString: "@master", InTransaction: false}) + _, err := exec(executor, session, "savepoint a") + require.NoError(t, err) + _, err = exec(executor, session, "rollback to a") + require.Error(t, err) + _, err = exec(executor, session, "release savepoint a") + require.Error(t, err) + _, err = exec(executor, session, "select id from user where id = 1") + require.NoError(t, err) + _, err = exec(executor, session, "savepoint b") + require.NoError(t, err) + _, err = exec(executor, session, "rollback to b") + require.Error(t, err) + _, err = exec(executor, session, "release savepoint b") + require.Error(t, err) + _, err = exec(executor, session, "select id from user where id = 3") + require.NoError(t, err) + sbc1WantQueries := []*querypb.BoundQuery{{ + Sql: "select id from user where id = 1", + BindVariables: map[string]*querypb.BindVariable{}, + }} + + sbc2WantQueries := []*querypb.BoundQuery{{ + Sql: "select id from user where id = 3", + BindVariables: map[string]*querypb.BindVariable{}, + }} + utils.MustMatch(t, sbc1WantQueries, sbc1.Queries, "") + utils.MustMatch(t, sbc2WantQueries, sbc2.Queries, "") + testQueryLog(t, logChan, "TestExecute", "SAVEPOINT", "savepoint a", 0) + testQueryLog(t, logChan, "TestExecute", "SAVEPOINT_ROLLBACK", "rollback to a", 0) + testQueryLog(t, logChan, "TestExecute", "RELEASE", "release savepoint a", 0) + testQueryLog(t, logChan, "TestExecute", "SELECT", "select id from user where id = 1", 1) + testQueryLog(t, logChan, "TestExecute", "SAVEPOINT", "savepoint b", 0) + testQueryLog(t, logChan, "TestExecute", "SAVEPOINT_ROLLBACK", "rollback to b", 0) + testQueryLog(t, logChan, "TestExecute", "RELEASE", "release savepoint b", 0) + testQueryLog(t, logChan, "TestExecute", "SELECT", "select id from user where id = 3", 1) +} + +func exec(executor *Executor, session *SafeSession, sql string) (*sqltypes.Result, error) { + return executor.Execute(context.Background(), "TestExecute", session, sql, nil) +} + func makeComments(text string) sqlparser.MarginComments { return sqlparser.MarginComments{Trailing: text} } diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index 7bdb9fdd759..e6f49d297da 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -20,6 +20,8 @@ import ( "context" "time" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" querypb "vitess.io/vitess/go/vt/proto/query" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" @@ -83,6 +85,24 @@ func (e *Executor) newExecute(ctx context.Context, safeSession *SafeSession, sql case sqlparser.StmtRollback: qr, err := e.handleRollback(ctx, safeSession, logStats) return sqlparser.StmtRollback, qr, err + case sqlparser.StmtSavepoint: + qr, err := e.handleSavepoint(ctx, safeSession, plan.Original, "Savepoint", logStats, func(_ string) (*sqltypes.Result, error) { + // Safely to ignore as there is no transaction. + return &sqltypes.Result{}, nil + }) + return sqlparser.StmtSavepoint, qr, err + case sqlparser.StmtSRollback: + qr, err := e.handleSavepoint(ctx, safeSession, plan.Original, "Rollback Savepoint", logStats, func(query string) (*sqltypes.Result, error) { + // Error as there is no transaction, so there is no savepoint that exists. + return nil, mysql.NewSQLError(mysql.ERSavepointNotExist, "42000", "SAVEPOINT does not exist: %s", query) + }) + return sqlparser.StmtSRollback, qr, err + case sqlparser.StmtRelease: + qr, err := e.handleSavepoint(ctx, safeSession, plan.Original, "Release Savepoint", logStats, func(query string) (*sqltypes.Result, error) { + // Error as there is no transaction, so there is no savepoint that exists. + return nil, mysql.NewSQLError(mysql.ERSavepointNotExist, "42000", "SAVEPOINT does not exist: %s", query) + }) + return sqlparser.StmtRelease, qr, err } // 3: Prepare for execution diff --git a/go/vt/vtgate/planbuilder/builder.go b/go/vt/vtgate/planbuilder/builder.go index ae348a23290..6adfffdd970 100644 --- a/go/vt/vtgate/planbuilder/builder.go +++ b/go/vt/vtgate/planbuilder/builder.go @@ -336,11 +336,9 @@ func createInstructionFor(query string, stmt sqlparser.Statement, vschema Contex return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: Database DDL %v", sqlparser.String(stmt)) case *sqlparser.Show, *sqlparser.SetTransaction: return nil, ErrPlanNotSupported - case *sqlparser.Begin, *sqlparser.Commit, *sqlparser.Rollback: + case *sqlparser.Begin, *sqlparser.Commit, *sqlparser.Rollback, *sqlparser.Savepoint, *sqlparser.SRollback, *sqlparser.Release: // Empty by design. Not executed by a plan return nil, nil - case *sqlparser.Savepoint, *sqlparser.SRollback, *sqlparser.Release: - return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: Savepoint construct %v", sqlparser.String(stmt)) } return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "BUG: unexpected statement type: %T", stmt) diff --git a/go/vt/vtgate/planbuilder/plan_test.go b/go/vt/vtgate/planbuilder/plan_test.go index 53f6a5d9d3f..35e64234b74 100644 --- a/go/vt/vtgate/planbuilder/plan_test.go +++ b/go/vt/vtgate/planbuilder/plan_test.go @@ -174,6 +174,7 @@ func TestPlan(t *testing.T) { testFile(t, "set_cases.txt", testOutputTempDir, vschemaWrapper) testFile(t, "set_sysvar_cases.txt", testOutputTempDir, vschemaWrapper) testFile(t, "union_cases.txt", testOutputTempDir, vschemaWrapper) + testFile(t, "transaction_cases.txt", testOutputTempDir, vschemaWrapper) } func TestOne(t *testing.T) { diff --git a/go/vt/vtgate/planbuilder/testdata/transaction_cases.txt b/go/vt/vtgate/planbuilder/testdata/transaction_cases.txt new file mode 100644 index 00000000000..b29317c7a26 --- /dev/null +++ b/go/vt/vtgate/planbuilder/testdata/transaction_cases.txt @@ -0,0 +1,48 @@ +# Begin +"begin" +{ + "QueryType": "BEGIN", + "Original": "begin" +} + +# Start Transaction +"start transaction" +{ + "QueryType": "BEGIN", + "Original": "start transaction" +} + +# Commit +"commit" +{ + "QueryType": "COMMIT", + "Original": "commit" +} + +# Rollback +"rollback" +{ + "QueryType": "ROLLBACK", + "Original": "rollback" +} + +# Savepoint +"savepoint a" +{ + "QueryType": "SAVEPOINT", + "Original": "savepoint a" +} + +# Savepoint rollback +"rollback work to savepoint a" +{ + "QueryType": "SAVEPOINT_ROLLBACK", + "Original": "rollback work to savepoint a" +} + +# Savepoint release +"release savepoint a" +{ + "QueryType": "RELEASE", + "Original": "release savepoint a" +} diff --git a/go/vt/vtgate/planbuilder/testdata/unsupported_cases.txt b/go/vt/vtgate/planbuilder/testdata/unsupported_cases.txt index ca4723ae58b..2d3cabdded6 100644 --- a/go/vt/vtgate/planbuilder/testdata/unsupported_cases.txt +++ b/go/vt/vtgate/planbuilder/testdata/unsupported_cases.txt @@ -419,15 +419,3 @@ # ambiguous LIMIT "select id from user limit 1 union all select id from music limit 1" "Incorrect usage of UNION and LIMIT - add parens to disambiguate your query (errno 1221) (sqlstate 21000)" - -# Savepoint -"savepoint a" -"unsupported: Savepoint construct savepoint a" - -# Savepoint rollback -"rollback work to savepoint a" -"unsupported: Savepoint construct rollback to a" - -# Savepoint release -"release savepoint a" -"unsupported: Savepoint construct release savepoint a" diff --git a/go/vt/vtgate/safe_session.go b/go/vt/vtgate/safe_session.go index ea4cb83ee15..0d58e38d70f 100644 --- a/go/vt/vtgate/safe_session.go +++ b/go/vt/vtgate/safe_session.go @@ -96,6 +96,7 @@ func (session *SafeSession) Reset() { session.PreSessions = nil session.PostSessions = nil session.commitOrder = vtgatepb.CommitOrder_NORMAL + session.Savepoints = nil } // SetAutocommittable sets the state to autocommitable if true. @@ -262,3 +263,10 @@ func (session *SafeSession) SetSystemVariable(name string, expr string) { } session.SystemVariables[name] = expr } + +//StoreSavepoint stores the savepoint and release savepoint queries in the session +func (session *SafeSession) StoreSavepoint(sql string) { + session.mu.Lock() + defer session.mu.Unlock() + session.Savepoints = append(session.Savepoints, sql) +} diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index cdefed7ceb5..09ce648d8cb 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -170,7 +170,7 @@ func (stc *ScatterConn) Execute( case autocommit: innerqr, err = stc.executeAutocommit(ctx, rs, query, bindVars, opts) case shouldBegin: - innerqr, transactionID, alias, err = rs.Gateway.BeginExecute(ctx, rs.Target, nil, query, bindVars, 0, options) + innerqr, transactionID, alias, err = rs.Gateway.BeginExecute(ctx, rs.Target, session.Savepoints, query, bindVars, 0, options) default: var qs queryservice.QueryService _, usingLegacy := rs.Gateway.(*DiscoveryGateway) @@ -248,7 +248,7 @@ func (stc *ScatterConn) ExecuteMultiShard( // tansactionID and alias are not used by this call, it is one round trip innerqr, err = stc.executeAutocommit(ctx, rs, queries[i].Sql, queries[i].BindVariables, opts) case shouldBegin: - innerqr, transactionID, alias, err = rs.Gateway.BeginExecute(ctx, rs.Target, nil, queries[i].Sql, queries[i].BindVariables, 0, opts) + innerqr, transactionID, alias, err = rs.Gateway.BeginExecute(ctx, rs.Target, session.Savepoints, queries[i].Sql, queries[i].BindVariables, 0, opts) default: var qs queryservice.QueryService _, usingLegacy := rs.Gateway.(*DiscoveryGateway) diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index 7b197480741..6da51987dce 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -189,12 +189,27 @@ func (sbc *SandboxConn) StreamExecute(ctx context.Context, target *querypb.Targe // Begin is part of the QueryService interface. func (sbc *SandboxConn) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (int64, *topodatapb.TabletAlias, error) { + return sbc.begin(ctx, target, nil, 0, options) +} + +func (sbc *SandboxConn) begin(ctx context.Context, target *querypb.Target, preQueries []string, reservedID int64, options *querypb.ExecuteOptions) (int64, *topodatapb.TabletAlias, error) { sbc.BeginCount.Add(1) err := sbc.getError() if err != nil { return 0, nil, err } - return sbc.TransactionID.Add(1), sbc.tablet.Alias, nil + + transactionID := reservedID + if transactionID == 0 { + transactionID = sbc.TransactionID.Add(1) + } + for _, preQuery := range preQueries { + _, err := sbc.Execute(ctx, target, preQuery, nil, transactionID, reservedID, options) + if err != nil { + return 0, nil, err + } + } + return transactionID, sbc.tablet.Alias, nil } // Commit is part of the QueryService interface. @@ -297,8 +312,8 @@ func (sbc *SandboxConn) ReadTransaction(ctx context.Context, target *querypb.Tar } // BeginExecute is part of the QueryService interface. -func (sbc *SandboxConn) BeginExecute(ctx context.Context, target *querypb.Target, _ []string, query string, bindVars map[string]*querypb.BindVariable, reservedID int64, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, *topodatapb.TabletAlias, error) { - transactionID, alias, err := sbc.Begin(ctx, target, options) +func (sbc *SandboxConn) BeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, query string, bindVars map[string]*querypb.BindVariable, reservedID int64, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, *topodatapb.TabletAlias, error) { + transactionID, alias, err := sbc.begin(ctx, target, preQueries, reservedID, options) if err != nil { return nil, 0, nil, err } diff --git a/proto/vtgate.proto b/proto/vtgate.proto index 21ba0651573..719b3e696d2 100644 --- a/proto/vtgate.proto +++ b/proto/vtgate.proto @@ -121,6 +121,10 @@ message Session { // row_count keeps track of the last seen rows affected for this session int64 row_count = 15; + + // Stores savepoint and release savepoint calls inside a transaction + // and is reset once transaction is committed or rolled back. + repeated string savepoints = 16; } // ExecuteRequest is the payload to Execute.