diff --git a/go/vt/vttablet/tabletserver/bench_test.go b/go/vt/vttablet/tabletserver/bench_test.go index 43d3faea4df..8ecb0f26b00 100644 --- a/go/vt/vttablet/tabletserver/bench_test.go +++ b/go/vt/vttablet/tabletserver/bench_test.go @@ -55,7 +55,7 @@ func init() { } func BenchmarkExecuteVarBinary(b *testing.B) { - db, tsv := setupTabletServerTest(nil) + db, tsv := setupTabletServerTest(nil, "") defer db.Close() defer tsv.StopService() @@ -77,7 +77,7 @@ func BenchmarkExecuteVarBinary(b *testing.B) { } func BenchmarkExecuteExpression(b *testing.B) { - db, tsv := setupTabletServerTest(nil) + db, tsv := setupTabletServerTest(nil, "") defer db.Close() defer tsv.StopService() diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 6ef9abd7358..6288532cdc1 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -643,6 +643,15 @@ func (tsv *TabletServer) Execute(ctx context.Context, target *querypb.Target, sq return err } result = result.StripMetadata(sqltypes.IncludeFieldsOrDefault(options)) + + // Change database name in mysql output to the keyspace name + if sqltypes.IncludeFieldsOrDefault(options) == querypb.ExecuteOptions_ALL { + for _, f := range result.Fields { + if f.Database != "" { + f.Database = tsv.sm.target.Keyspace + } + } + } return nil }, ) @@ -678,7 +687,18 @@ func (tsv *TabletServer) StreamExecute(ctx context.Context, target *querypb.Targ logStats: logStats, tsv: tsv, } - return qre.Stream(callback) + newCallback := func(result *sqltypes.Result) error { + if sqltypes.IncludeFieldsOrDefault(options) == querypb.ExecuteOptions_ALL { + // Change database name in mysql output to the keyspace name + for _, f := range result.Fields { + if f.Database != "" { + f.Database = tsv.sm.target.Keyspace + } + } + } + return callback(result) + } + return qre.Stream(newCallback) }, ) } diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 05900a06902..01eaf7e6291 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -56,7 +56,7 @@ import ( ) func TestBeginOnReplica(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -364,7 +364,7 @@ func TestTabletServerConcludeTransaction(t *testing.T) { func TestTabletServerBeginFail(t *testing.T) { config := tabletenv.NewDefaultConfig() config.TxPool.Size = 1 - db, tsv := setupTabletServerTestCustom(t, config) + db, tsv := setupTabletServerTestCustom(t, config, "") defer tsv.StopService() defer db.Close() @@ -377,7 +377,7 @@ func TestTabletServerBeginFail(t *testing.T) { } func TestTabletServerCommitTransaction(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -402,7 +402,7 @@ func TestTabletServerCommitTransaction(t *testing.T) { } func TestTabletServerCommiRollbacktFail(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -415,7 +415,7 @@ func TestTabletServerCommiRollbacktFail(t *testing.T) { } func TestTabletServerRollback(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -475,7 +475,7 @@ func TestTabletServerCommitPrepared(t *testing.T) { } func TestTabletServerReserveConnection(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -497,7 +497,7 @@ func TestTabletServerReserveConnection(t *testing.T) { } func TestTabletServerExecNonExistentConnection(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -511,7 +511,7 @@ func TestTabletServerExecNonExistentConnection(t *testing.T) { } func TestTabletServerReleaseNonExistentConnection(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -524,7 +524,7 @@ func TestTabletServerReleaseNonExistentConnection(t *testing.T) { } func TestMakeSureToCloseDbConnWhenBeginQueryFails(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -538,7 +538,7 @@ func TestMakeSureToCloseDbConnWhenBeginQueryFails(t *testing.T) { } func TestTabletServerReserveAndBeginCommit(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -607,7 +607,7 @@ func TestTabletServerRollbackPrepared(t *testing.T) { } func TestTabletServerStreamExecute(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -631,7 +631,7 @@ func TestTabletServerStreamExecute(t *testing.T) { } func TestTabletServerStreamExecuteComments(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -673,7 +673,7 @@ func TestTabletServerStreamExecuteComments(t *testing.T) { } } func TestTabletServerExecuteBatch(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -695,7 +695,7 @@ func TestTabletServerExecuteBatch(t *testing.T) { } func TestTabletServerExecuteBatchFailEmptyQueryList(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -706,7 +706,7 @@ func TestTabletServerExecuteBatchFailEmptyQueryList(t *testing.T) { } func TestTabletServerExecuteBatchFailAsTransaction(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -722,7 +722,7 @@ func TestTabletServerExecuteBatchFailAsTransaction(t *testing.T) { } func TestTabletServerExecuteBatchBeginFail(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -739,7 +739,7 @@ func TestTabletServerExecuteBatchBeginFail(t *testing.T) { } func TestTabletServerExecuteBatchCommitFail(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -760,7 +760,7 @@ func TestTabletServerExecuteBatchCommitFail(t *testing.T) { } func TestTabletServerExecuteBatchSqlExecFailInTransaction(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -795,7 +795,7 @@ func TestTabletServerExecuteBatchSqlExecFailInTransaction(t *testing.T) { } func TestTabletServerExecuteBatchCallCommitWithoutABegin(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -810,7 +810,7 @@ func TestTabletServerExecuteBatchCallCommitWithoutABegin(t *testing.T) { } func TestExecuteBatchNestedTransaction(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -860,7 +860,7 @@ func TestSerializeTransactionsSameRow(t *testing.T) { config.HotRowProtection.MaxConcurrency = 1 // Reduce the txpool to 2 because we should never consume more than two slots. config.TxPool.Size = 2 - db, tsv := setupTabletServerTestCustom(t, config) + db, tsv := setupTabletServerTestCustom(t, config, "") defer tsv.StopService() defer db.Close() @@ -965,7 +965,7 @@ func TestDMLQueryWithoutWhereClause(t *testing.T) { config.HotRowProtection.Mode = tabletenv.Enable config.HotRowProtection.MaxConcurrency = 1 config.TxPool.Size = 2 - db, tsv := setupTabletServerTestCustom(t, config) + db, tsv := setupTabletServerTestCustom(t, config, "") defer tsv.StopService() defer db.Close() @@ -1001,7 +1001,7 @@ func TestSerializeTransactionsSameRow_ExecuteBatchAsTransaction(t *testing.T) { config.HotRowProtection.MaxConcurrency = 1 // Reduce the txpool to 2 because we should never consume more than two slots. config.TxPool.Size = 2 - db, tsv := setupTabletServerTestCustom(t, config) + db, tsv := setupTabletServerTestCustom(t, config, "") defer tsv.StopService() defer db.Close() @@ -1113,7 +1113,7 @@ func TestSerializeTransactionsSameRow_ConcurrentTransactions(t *testing.T) { config.HotRowProtection.MaxConcurrency = 2 // Reduce the txpool to 2 because we should never consume more than two slots. config.TxPool.Size = 2 - db, tsv := setupTabletServerTestCustom(t, config) + db, tsv := setupTabletServerTestCustom(t, config, "") defer tsv.StopService() defer db.Close() @@ -1247,7 +1247,7 @@ func TestSerializeTransactionsSameRow_TooManyPendingRequests(t *testing.T) { config.HotRowProtection.Mode = tabletenv.Enable config.HotRowProtection.MaxQueueSize = 1 config.HotRowProtection.MaxConcurrency = 1 - db, tsv := setupTabletServerTestCustom(t, config) + db, tsv := setupTabletServerTestCustom(t, config, "") defer tsv.StopService() defer db.Close() @@ -1330,7 +1330,7 @@ func TestSerializeTransactionsSameRow_TooManyPendingRequests_ExecuteBatchAsTrans config.HotRowProtection.Mode = tabletenv.Enable config.HotRowProtection.MaxQueueSize = 1 config.HotRowProtection.MaxConcurrency = 1 - db, tsv := setupTabletServerTestCustom(t, config) + db, tsv := setupTabletServerTestCustom(t, config, "") defer tsv.StopService() defer db.Close() @@ -1416,7 +1416,7 @@ func TestSerializeTransactionsSameRow_RequestCanceled(t *testing.T) { config := tabletenv.NewDefaultConfig() config.HotRowProtection.Mode = tabletenv.Enable config.HotRowProtection.MaxConcurrency = 1 - db, tsv := setupTabletServerTestCustom(t, config) + db, tsv := setupTabletServerTestCustom(t, config, "") defer tsv.StopService() defer db.Close() @@ -1900,7 +1900,7 @@ func TestACLHUP(t *testing.T) { } func TestConfigChanges(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -1965,7 +1965,7 @@ func TestConfigChanges(t *testing.T) { } func TestReserveBeginExecute(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() target := querypb.Target{TabletType: topodatapb.TabletType_MASTER} @@ -1987,7 +1987,7 @@ func TestReserveBeginExecute(t *testing.T) { } func TestReserveExecute_WithoutTx(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -2007,7 +2007,7 @@ func TestReserveExecute_WithoutTx(t *testing.T) { } func TestReserveExecute_WithTx(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() target := querypb.Target{TabletType: topodatapb.TabletType_MASTER} @@ -2064,7 +2064,7 @@ func TestRelease(t *testing.T) { name += " reserve" } t.Run(name, func(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() db.AddQueryPattern(".*", &sqltypes.Result{}) @@ -2101,7 +2101,7 @@ func TestRelease(t *testing.T) { } func TestReserveStats(t *testing.T) { - db, tsv := setupTabletServerTest(t) + db, tsv := setupTabletServerTest(t, "") defer tsv.StopService() defer db.Close() @@ -2154,17 +2154,269 @@ func TestReserveStats(t *testing.T) { assert.NotEmpty(t, tsv.te.txPool.env.Stats().UserReservedTimesNs.Counts()["test"]) } -func setupTabletServerTest(t *testing.T) (*fakesqldb.DB, *TabletServer) { +func TestDatabaseNameReplaceByKeyspaceNameExecuteMethod(t *testing.T) { + db, tsv := setupTabletServerTest(t, "keyspaceName") + db.SetName("databaseInMysql") + defer tsv.StopService() + defer db.Close() + + executeSQL := "select * from test_table limit 1000" + executeSQLResult := &sqltypes.Result{ + Fields: []*querypb.Field{ + { + Type: sqltypes.VarBinary, + Database: "databaseInMysql", + }, + }, + RowsAffected: 1, + Rows: [][]sqltypes.Value{ + {sqltypes.NewVarBinary("row01")}, + }, + } + db.AddQuery(executeSQL, executeSQLResult) + target := tsv.sm.target + + // Testing Execute Method + transactionID, _, err := tsv.Begin(ctx, &target, nil) + require.NoError(t, err) + res, err := tsv.Execute(ctx, &target, executeSQL, nil, transactionID, 0, &querypb.ExecuteOptions{IncludedFields: querypb.ExecuteOptions_ALL}) + require.NoError(t, err) + for _, field := range res.Fields { + require.Equal(t, "keyspaceName", field.Database) + } + _, err = tsv.Commit(ctx, &target, transactionID) + require.NoError(t, err) +} + +func TestDatabaseNameReplaceByKeyspaceNameStreamExecuteMethod(t *testing.T) { + db, tsv := setupTabletServerTest(t, "keyspaceName") + db.SetName("databaseInMysql") + defer tsv.StopService() + defer db.Close() + + executeSQL := "select * from test_table limit 1000" + executeSQLResult := &sqltypes.Result{ + Fields: []*querypb.Field{ + { + Type: sqltypes.VarBinary, + Database: "databaseInMysql", + }, + }, + RowsAffected: 1, + Rows: [][]sqltypes.Value{ + {sqltypes.NewVarBinary("row01")}, + }, + } + db.AddQuery(executeSQL, executeSQLResult) + target := tsv.sm.target + + // Testing StreamExecute Method + callback := func(res *sqltypes.Result) error { + for _, field := range res.Fields { + if field.Database != "" { + require.Equal(t, "keyspaceName", field.Database) + } + } + return nil + } + err := tsv.StreamExecute(ctx, &target, executeSQL, nil, 0, &querypb.ExecuteOptions{IncludedFields: querypb.ExecuteOptions_ALL}, callback) + require.NoError(t, err) +} + +func TestDatabaseNameReplaceByKeyspaceNameExecuteBatchMethod(t *testing.T) { + db, tsv := setupTabletServerTest(t, "keyspaceName") + db.SetName("databaseInMysql") + defer tsv.StopService() + defer db.Close() + + executeSQL := "select * from test_table limit 1000" + executeSQLResult := &sqltypes.Result{ + Fields: []*querypb.Field{ + { + Type: sqltypes.VarBinary, + Database: "databaseInMysql", + }, + }, + RowsAffected: 1, + Rows: [][]sqltypes.Value{ + {sqltypes.NewVarBinary("row01")}, + }, + } + db.AddQuery(executeSQL, executeSQLResult) + target := tsv.sm.target + + // Testing ExecuteBatch Method + results, err := tsv.ExecuteBatch(ctx, &target, []*querypb.BoundQuery{ + { + Sql: executeSQL, + BindVariables: nil, + }, + { + Sql: executeSQL, + BindVariables: nil, + }, + }, true, 0, &querypb.ExecuteOptions{IncludedFields: querypb.ExecuteOptions_ALL}) + require.NoError(t, err) + for _, res := range results { + for _, field := range res.Fields { + require.Equal(t, "keyspaceName", field.Database) + } + } +} + +func TestDatabaseNameReplaceByKeyspaceNameBeginExecuteMethod(t *testing.T) { + db, tsv := setupTabletServerTest(t, "keyspaceName") + db.SetName("databaseInMysql") + defer tsv.StopService() + defer db.Close() + + executeSQL := "select * from test_table limit 1000" + executeSQLResult := &sqltypes.Result{ + Fields: []*querypb.Field{ + { + Type: sqltypes.VarBinary, + Database: "databaseInMysql", + }, + }, + RowsAffected: 1, + Rows: [][]sqltypes.Value{ + {sqltypes.NewVarBinary("row01")}, + }, + } + db.AddQuery(executeSQL, executeSQLResult) + target := tsv.sm.target + + // Test BeginExecute Method + res, transactionID, _, err := tsv.BeginExecute(ctx, &target, nil, executeSQL, nil, 0, &querypb.ExecuteOptions{IncludedFields: querypb.ExecuteOptions_ALL}) + require.NoError(t, err) + for _, field := range res.Fields { + require.Equal(t, "keyspaceName", field.Database) + } + _, err = tsv.Commit(ctx, &target, transactionID) + require.NoError(t, err) +} + +func TestDatabaseNameReplaceByKeyspaceNameBeginExecuteBatchMethod(t *testing.T) { + db, tsv := setupTabletServerTest(t, "keyspaceName") + db.SetName("databaseInMysql") + defer tsv.StopService() + defer db.Close() + + executeSQL := "select * from test_table limit 1000" + executeSQLResult := &sqltypes.Result{ + Fields: []*querypb.Field{ + { + Type: sqltypes.VarBinary, + Database: "databaseInMysql", + }, + }, + RowsAffected: 1, + Rows: [][]sqltypes.Value{ + {sqltypes.NewVarBinary("row01")}, + }, + } + db.AddQuery(executeSQL, executeSQLResult) + target := tsv.sm.target + + // Test BeginExecuteBatch Method + results, transactionID, _, err := tsv.BeginExecuteBatch(ctx, &target, []*querypb.BoundQuery{ + { + Sql: executeSQL, + BindVariables: nil, + }, + { + Sql: executeSQL, + BindVariables: nil, + }, + }, false, &querypb.ExecuteOptions{IncludedFields: querypb.ExecuteOptions_ALL}) + require.NoError(t, err) + for _, res := range results { + for _, field := range res.Fields { + require.Equal(t, "keyspaceName", field.Database) + } + } + _, err = tsv.Commit(ctx, &target, transactionID) + require.NoError(t, err) +} + +func TestDatabaseNameReplaceByKeyspaceNameReserveExecuteMethod(t *testing.T) { + db, tsv := setupTabletServerTest(t, "keyspaceName") + db.SetName("databaseInMysql") + defer tsv.StopService() + defer db.Close() + + executeSQL := "select * from test_table limit 1000" + executeSQLResult := &sqltypes.Result{ + Fields: []*querypb.Field{ + { + Type: sqltypes.VarBinary, + Database: "databaseInMysql", + }, + }, + RowsAffected: 1, + Rows: [][]sqltypes.Value{ + {sqltypes.NewVarBinary("row01")}, + }, + } + db.AddQuery(executeSQL, executeSQLResult) + target := tsv.sm.target + + // Test ReserveExecute + res, rID, _, err := tsv.ReserveExecute(ctx, &target, nil, executeSQL, nil, 0, &querypb.ExecuteOptions{IncludedFields: querypb.ExecuteOptions_ALL}) + require.NoError(t, err) + for _, field := range res.Fields { + require.Equal(t, "keyspaceName", field.Database) + } + err = tsv.Release(ctx, &target, 0, rID) + require.NoError(t, err) +} + +func TestDatabaseNameReplaceByKeyspaceNameReserveBeginExecuteMethod(t *testing.T) { + db, tsv := setupTabletServerTest(t, "keyspaceName") + db.SetName("databaseInMysql") + defer tsv.StopService() + defer db.Close() + + executeSQL := "select * from test_table limit 1000" + executeSQLResult := &sqltypes.Result{ + Fields: []*querypb.Field{ + { + Type: sqltypes.VarBinary, + Database: "databaseInMysql", + }, + }, + RowsAffected: 1, + Rows: [][]sqltypes.Value{ + {sqltypes.NewVarBinary("row01")}, + }, + } + db.AddQuery(executeSQL, executeSQLResult) + target := tsv.sm.target + + // Test for ReserveBeginExecute + res, transactionID, reservedID, _, err := tsv.ReserveBeginExecute(ctx, &target, nil, executeSQL, nil, &querypb.ExecuteOptions{IncludedFields: querypb.ExecuteOptions_ALL}) + require.NoError(t, err) + for _, field := range res.Fields { + require.Equal(t, "keyspaceName", field.Database) + } + err = tsv.Release(ctx, &target, transactionID, reservedID) + require.NoError(t, err) +} + +func setupTabletServerTest(t *testing.T, keyspaceName string) (*fakesqldb.DB, *TabletServer) { config := tabletenv.NewDefaultConfig() - return setupTabletServerTestCustom(t, config) + return setupTabletServerTestCustom(t, config, keyspaceName) } -func setupTabletServerTestCustom(t *testing.T, config *tabletenv.TabletConfig) (*fakesqldb.DB, *TabletServer) { +func setupTabletServerTestCustom(t *testing.T, config *tabletenv.TabletConfig, keyspaceName string) (*fakesqldb.DB, *TabletServer) { db := setupFakeDB(t) tsv := NewTabletServer("TabletServerTest", config, memorytopo.NewServer(""), topodatapb.TabletAlias{}) require.Equal(t, StateNotConnected, tsv.sm.State()) dbcfgs := newDBConfigs(db) - target := querypb.Target{TabletType: topodatapb.TabletType_MASTER} + target := querypb.Target{ + Keyspace: keyspaceName, + TabletType: topodatapb.TabletType_MASTER, + } err := tsv.StartService(target, dbcfgs, nil /* mysqld */) require.NoError(t, err) return db, tsv