Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1181,7 +1181,7 @@ func testScheduler(t *testing.T) {
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusCancelled)
})

// now, we submit the exact same migratoin again: same UUID, same migration context.
// now, we submit the exact same migration again: same UUID, same migration context.
t.Run("resubmit migration", func(t *testing.T) {
executedUUID := testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy, "vtctl", "", "", true)) // skip wait
require.Equal(t, uuid, executedUUID)
Expand Down
60 changes: 60 additions & 0 deletions go/test/endtoend/vtgate/grpc_api/prepare_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
Copyright 2025 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package grpc_api

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/cluster"
)

// TestTransactionsWithGRPCAPI test the transaction queries through vtgate grpc apis.
// It is done through both streaming api and non-streaming api.
func TestPrepareWithGRPCAPI(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "user_with_access", "test_password")
require.NoError(t, err)
defer vtgateConn.Close()

query := `SELECT DISTINCT
BINARY table_info.table_name AS table_name,
table_info.create_options AS create_options,
table_info.table_comment AS table_comment
FROM information_schema.tables AS table_info
JOIN information_schema.columns AS column_info
ON BINARY column_info.table_name = BINARY table_info.table_name
WHERE
table_info.table_schema = ?
AND column_info.table_schema = ?
-- Exclude views.
AND table_info.table_type = 'BASE TABLE'
ORDER BY BINARY table_info.table_name`

vtSession := vtgateConn.Session(keyspaceName, nil)
fields, paramsCount, err := vtSession.Prepare(t.Context(), query)
require.NoError(t, err)
assert.Equal(t, `[name:"table_name" type:VARBINARY name:"create_options" type:VARCHAR name:"table_comment" type:VARCHAR]`, fmt.Sprintf("%v", fields))
assert.EqualValues(t, 2, paramsCount)

}
21 changes: 21 additions & 0 deletions go/test/endtoend/vtgate/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,27 @@ func TestFilterAfterLeftJoin(t *testing.T) {
utils.AssertMatches(t, conn, query, `[[INT64(1) INT64(10)]]`)
}

func TestFilterWithINAfterLeftJoin(t *testing.T) {
conn, closer := start(t)
defer closer()

utils.Exec(t, conn, "insert into t1 (id1,id2) values (1, 10)")
utils.Exec(t, conn, "insert into t1 (id1,id2) values (2, 3)")
utils.Exec(t, conn, "insert into t1 (id1,id2) values (3, 2)")
utils.Exec(t, conn, "insert into t1 (id1,id2) values (4, 5)")

query := "select a.id1, b.id3 from t1 as a left outer join t2 as b on a.id2 = b.id4 WHERE a.id2 = 10 AND (b.id3 IS NULL OR b.id3 IN (1))"
utils.AssertMatches(t, conn, query, `[[INT64(1) NULL]]`)

utils.Exec(t, conn, "insert into t2 (id3,id4) values (1, 10)")

query = "select a.id1, b.id3 from t1 as a left outer join t2 as b on a.id2 = b.id4 WHERE a.id2 = 10 AND (b.id3 IS NULL OR b.id3 IN (1))"
utils.AssertMatches(t, conn, query, `[[INT64(1) INT64(1)]]`)

query = "select a.id1, b.id3 from t1 as a left outer join t2 as b on a.id2 = b.id4 WHERE a.id2 = 10 AND (b.id3 IS NULL OR (b.id3, b.id4) IN ((1, 10)))"
utils.AssertMatches(t, conn, query, `[[INT64(1) INT64(1)]]`)
}

func TestDescribeVindex(t *testing.T) {
conn, closer := start(t)
defer closer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ func TestAggrWithLimit(t *testing.T) {
mcmp.Exec(fmt.Sprintf("insert into aggr_test(id, val1, val2) values(%d, 'a', %d)", i, r))
}
mcmp.Exec("select val2, count(*) from aggr_test group by val2 order by count(*), val2 limit 10")
if utils.BinaryIsAtLeastAtVersion(23, "vtgate") {
mcmp.Exec("SELECT 1 AS `id`, COUNT(*) FROM (SELECT `id` FROM aggr_test WHERE val1 = 1 LIMIT 100) `t`")
}
}

func TestAggregateTypes(t *testing.T) {
Expand Down
9 changes: 9 additions & 0 deletions go/test/endtoend/vtgate/queries/orderby/orderby_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ func TestSimpleOrderBy(t *testing.T) {
mcmp.AssertMatches(`SELECT id2 FROM t1 ORDER BY id2 ASC`, `[[INT64(5)] [INT64(6)] [INT64(7)] [INT64(8)] [INT64(9)] [INT64(10)]]`)
}

// TestQueryWithDBQualifier tests that we remove the db qualifier in the plan output that is sent down to the database.
func TestQueryWithDBQualifier(t *testing.T) {
mcmp, closer := start(t)
defer closer()

mcmp.Exec("insert into t1(id1, id2) values (0,10),(1,9),(2,8),(3,7),(4,6),(5,5)")
mcmp.Exec(`SELECT ks_orderby.t1.id1, ks_orderby.t1.id2 FROM ks_orderby.t1 ORDER BY ks_orderby.t1.id2 ASC, ks_orderby.t1.id1 desc`)
}

func TestOrderBy(t *testing.T) {
mcmp, closer := start(t)
defer closer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ CREATE TABLE `t2`
(
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`t1_id` int unsigned NOT NULL,
`col` int unsigned,
PRIMARY KEY (`id`)
) ENGINE InnoDB,
CHARSET utf8mb4,
Expand All @@ -31,10 +32,10 @@ values (1, 'A'),
(3, 'C'),
(4, 'D');

insert into t2 (id, t1_id)
values (1, 1),
(2, 2),
(3, 3);
insert into t2 (id, t1_id, col)
values (1, 1, 1),
(2, 2, 2),
(3, 3, 3);

insert into t3 (id, name)
values (1, 'A'),
Expand Down Expand Up @@ -64,4 +65,10 @@ from (select id, count(*) as num_segments from t1 group by 1 order by 2 desc lim
join t2 u on u.id = t.id;

select name
from (select name from t1 group by name having count(t1.id) > 1) t1;
from (select name from t1 group by name having count(t1.id) > 1) t1;

select t1_id
from (select t1_id, col
from t2
group by 1, 2) t
group by 1;
2 changes: 1 addition & 1 deletion go/vt/sidecardb/schema/vreplication/copy_state.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ CREATE TABLE IF NOT EXISTS copy_state
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`vrepl_id` int NOT NULL,
`table_name` varbinary(128) NOT NULL,
`lastpk` varbinary(2000) DEFAULT NULL,
`lastpk` mediumblob DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `vrepl_id` (`vrepl_id`,`table_name`)
) ENGINE = InnoDB CHARSET = utf8mb4
15 changes: 15 additions & 0 deletions go/vt/sqlparser/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sqlparser
// analyzer.go contains utility analysis functions.

import (
"errors"
"fmt"
"strings"
"unicode"
Expand Down Expand Up @@ -379,6 +380,20 @@ func IsColName(node Expr) bool {
return ok
}

var errNotStatic = errors.New("not static")

// IsConstant returns true if the Expr can be evaluated without input or access to tables.
func IsConstant(node Expr) bool {
err := Walk(func(node SQLNode) (kontinue bool, err error) {
switch node.(type) {
case *ColName, *Subquery:
return false, errNotStatic
}
return true, nil
}, node)
return err == nil
}

// IsValue returns true if the Expr is a string, integral or value arg.
// NULL is not considered to be a value.
func IsValue(node Expr) bool {
Expand Down
18 changes: 12 additions & 6 deletions go/vt/srvtopo/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,18 @@ func (entry *watchEntry) update(ctx context.Context, value any, err error, init
entry.onValueLocked(value)
}

listeners := entry.listeners
entry.listeners = entry.listeners[:0]

for _, callback := range listeners {
if callback(entry.value, entry.lastError) {
entry.listeners = append(entry.listeners, callback)
// Only notify listeners on success or when no cached value exists after error processing.
// This prevents unnecessary notifications during topo outages when cached data is available.
shouldNotifyListeners := err == nil || entry.value == nil

if shouldNotifyListeners {
listeners := entry.listeners
entry.listeners = entry.listeners[:0]

for _, callback := range listeners {
if callback(entry.value, entry.lastError) {
entry.listeners = append(entry.listeners, callback)
}
}
}
}
Expand Down
Loading
Loading