Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 53 additions & 2 deletions go/test/endtoend/preparestmt/stmt_methods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,9 @@ func TestSpecializedPlan(t *testing.T) {
}, {
query: `select 1 from t1 tbl1, t1 tbl2, t1 tbl3, t1 tbl4 where tbl1.id = ? and tbl2.id = ? and tbl3.id = ? and tbl4.id = ?`,
args: []any{1, 1, 1, 1},
}, {
query: `SELECT e.id, e.name, s.age, ROW_NUMBER() OVER (PARTITION BY e.age ORDER BY s.name DESC) AS age_rank FROM t1 e, t1 s where e.id = ? and s.id = ?`,
args: []any{1, 1},
}}

for _, q := range queries {
Expand All @@ -484,12 +487,23 @@ func TestSpecializedPlan(t *testing.T) {
finalExecCount := getVarValue[float64](t, "Passthrough", func() map[string]any {
return oMap
})
require.EqualValues(t, 15, finalExecCount-initExecCount)
require.EqualValues(t, 20, finalExecCount-initExecCount)

// Validate specialized plan.
randomExec(t, dbo)

// Validate Join Query specialized plan.
p := getPlanWhenReady(t, queries[0].query, 100*time.Millisecond, clusterInstance.VtgateProcess.ReadQueryPlans)
require.NotNil(t, p, "plan not found")
validateJoinSpecializedPlan(t, p)

// Validate Window Function Query specialized plan with failing baseline plan.
p = getPlanWhenReady(t, queries[3].query, 100*time.Millisecond, clusterInstance.VtgateProcess.ReadQueryPlans)
require.NotNil(t, p, "plan not found")
validateBaselineErrSpecializedPlan(t, p)
}

func validateJoinSpecializedPlan(t *testing.T, p map[string]any) {
t.Helper()
plan, exist := p["Instructions"]
require.True(t, exist, "plan Instructions not found")

Expand All @@ -504,6 +518,43 @@ func TestSpecializedPlan(t *testing.T) {
require.Equal(t, "EqualUnique", pd.Inputs[1].Variant)
}

func validateBaselineErrSpecializedPlan(t *testing.T, p map[string]any) {
t.Helper()
plan, exist := p["Instructions"]
require.True(t, exist, "plan Instructions not found")

pm, ok := plan.(map[string]any)
require.True(t, ok, "plan is not of type map[string]any")
require.EqualValues(t, "PlanSwitcher", pm["OperatorType"])
require.EqualValues(t, "VT12001: unsupported: OVER CLAUSE with sharded keyspace", pm["BaselineErr"])

pd, err := engine.PrimitiveDescriptionFromMap(plan.(map[string]any))
require.NoError(t, err)
require.Equal(t, "PlanSwitcher", pd.OperatorType)
require.Len(t, pd.Inputs, 1, "Only Specialized plan should be available")

require.Equal(t, "Optimized", pd.Inputs[0].InputName)
require.Equal(t, "Route", pd.Inputs[0].OperatorType)
require.Equal(t, "EqualUnique", pd.Inputs[0].Variant)
}

// randomExec to make many plans so that plan cache is populated.
func randomExec(t *testing.T, dbo *sql.DB) {
t.Helper()

for i := 1; i < 101; i++ {
// generate a random query
query := fmt.Sprintf("SELECT %d", i)
stmt, err := dbo.Prepare(query)
require.NoError(t, err)

rows, err := stmt.Query()
require.NoError(t, err)
require.NoError(t, rows.Close())
time.Sleep(5 * time.Millisecond)
}
}

// getPlanWhenReady polls for the query plan until it is ready or times out.
func getPlanWhenReady(t *testing.T, sql string, timeout time.Duration, plansFunc func() (map[string]any, error)) map[string]any {
t.Helper()
Expand Down
34 changes: 32 additions & 2 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,7 @@ func (e *Executor) fetchOrCreatePlan(
}
}
if err != nil {
return nil, nil, nil, err
return nil, nil, stmt, err
}

if shouldOptimizePlan(preparedPlan, isExecutePath, plan) {
Expand Down Expand Up @@ -1591,11 +1591,20 @@ func prepareBindVars(paramsCount uint16) map[string]*querypb.BindVariable {
}

func (e *Executor) handlePrepare(ctx context.Context, safeSession *econtext.SafeSession, sql string, logStats *logstats.LogStats) ([]*querypb.Field, uint16, error) {
plan, vcursor, _, err := e.fetchOrCreatePlan(ctx, safeSession, sql, nil, false, true, logStats, false)
plan, vcursor, stmt, err := e.fetchOrCreatePlan(ctx, safeSession, sql, nil, false, true, logStats, false)
execStart := time.Now()
logStats.PlanTime = execStart.Sub(logStats.StartTime)

if err != nil {
if stmt != nil {
// Attempt to build NULL field types for the statement in case planning fails,
// allowing the client to proceed with preparing the statement even without a valid execution plan.
// Hoping that an optimized plan can be built later when parameter values are available.
flds, paramCount, success := buildNullFieldTypes(stmt)
if success {
return flds, paramCount, nil
}
}
logStats.Error = err
return nil, 0, err
}
Expand All @@ -1622,6 +1631,27 @@ func (e *Executor) handlePrepare(ctx context.Context, safeSession *econtext.Safe
return qr.Fields, plan.ParamsCount, err
}

// buildNullFieldTypes builds a list of NULL field types for the given statement.
func buildNullFieldTypes(stmt sqlparser.Statement) ([]*querypb.Field, uint16, bool) {
sel, ok := stmt.(sqlparser.SelectStatement)
if !ok {
return nil, countArguments(stmt), true
}
var fields []*querypb.Field
for _, expr := range sel.GetColumns() {
// *sqlparser.StarExpr is not supported in this context.
if ae, ok := expr.(*sqlparser.AliasedExpr); ok {
fields = append(fields, &querypb.Field{
Name: ae.ColumnName(),
Type: querypb.Type_NULL_TYPE,
})
continue
}
return nil, 0, false
}
return fields, countArguments(stmt), true
}

func parseAndValidateQuery(query string, parser *sqlparser.Parser) (sqlparser.Statement, *sqlparser.ReservedVars, error) {
stmt, reserved, err := parser.Parse2(query)
if err != nil {
Expand Down
22 changes: 22 additions & 0 deletions go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3290,6 +3290,28 @@ func TestOnlyOptimizedPlan(t *testing.T) {
require.ErrorContains(t, sp.BaselineErr, "VT12001: unsupported: subquery with aggregation in order by")
}

// TestPrepareWithUnsupportedQuery tests that the fields returned by the query on unsupported query.
func TestPrepareWithUnsupportedQuery(t *testing.T) {
executor, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer())

sql := "select a, b, c, row_number() over (partition by x) from user where c1 = ? and c2 = ?"
session := econtext.NewAutocommitSession(&vtgatepb.Session{})
fields, paramsCount, err := executorPrepare(ctx, executor, session.Session, sql)
require.NoError(t, err)
assert.EqualValues(t, 2, paramsCount)
wantFields := []*querypb.Field{
{Name: "a", Type: querypb.Type_NULL_TYPE},
{Name: "b", Type: querypb.Type_NULL_TYPE},
{Name: "c", Type: querypb.Type_NULL_TYPE},
{Name: "row_number() over ( partition by x)", Type: querypb.Type_NULL_TYPE},
}
require.Equal(t, wantFields, fields)

sql = "select row_number over" // this is a syntax error. It should return an error.
_, _, err = executorPrepare(ctx, executor, session.Session, sql)
require.ErrorContains(t, err, "syntax error")
}

func TestSelectDatabasePrepare(t *testing.T) {
executor, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer())
logChan := executor.queryLogger.Subscribe("Test")
Expand Down
Loading