Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions go/cmd/vtaclcheck/tableacl2.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,20 @@
"writers": ["dev"],
"admins": ["dev"]
},
{
"name": "vitess_message4",
"table_names_or_prefixes": ["vitess_message4"],
"readers": ["dev"],
"writers": ["dev"],
"admins": ["dev"]
},
{
"name": "vitess_message5",
"table_names_or_prefixes": ["vitess_message5"],
"readers": ["dev"],
"writers": ["dev"],
"admins": ["dev"]
},
{
"name": "vitess_message_auto",
"table_names_or_prefixes": ["vitess_message_auto"],
Expand Down
6 changes: 3 additions & 3 deletions go/test/endtoend/messaging/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import (
"os"
"testing"

_ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"

"vitess.io/vitess/go/test/endtoend/cluster"
_ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"
)

var (
Expand Down Expand Up @@ -98,7 +97,8 @@ var (
"tables": {
"unsharded_message": {},
"vitess_message": {},
"vitess_message3": {}
"vitess_message3": {},
"vitess_message4": {}
}
}`
)
Expand Down
92 changes: 86 additions & 6 deletions go/test/endtoend/messaging/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,19 @@ import (
"testing"
"time"

"vitess.io/vitess/go/test/endtoend/utils"

cmp "vitess.io/vitess/go/test/utils"

"vitess.io/vitess/go/vt/vtgate/evalengine"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
cmp "vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/query"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vtgate/evalengine"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
)

Expand All @@ -63,6 +60,7 @@ create table vitess_message(

# required indexes
primary key(id),
index next_idx(time_next),
index poller_idx(time_acked, priority, time_next desc)

# add any secondary indexes or foreign keys - no restrictions
Expand Down Expand Up @@ -200,6 +198,7 @@ create table vitess_message3(

# required indexes
primary key(id),
index next_idx(time_next),
index poller_idx(time_acked, priority, time_next desc)

# add any secondary indexes or foreign keys - no restrictions
Expand Down Expand Up @@ -274,6 +273,87 @@ func TestThreeColMessage(t *testing.T) {
assert.Equal(t, uint64(1), qr.RowsAffected)
}

var createSpecificStreamingColsMessage = `create table vitess_message4(
# required columns
id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence',
priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first',
epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling',
time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set',
time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set',

# add as many custom fields here as required
# optional - these are suggestions
tenant_id bigint,
message json,

# custom to this test
msg1 varchar(128),
msg2 bigint,

# required indexes
primary key(id),
index next_idx(time_next),
index poller_idx(time_acked, priority, time_next desc)

# add any secondary indexes or foreign keys - no restrictions
) comment 'vitess_message,vt_message_cols=id|msg1,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'`

func TestSpecificStreamingColsMessage(t *testing.T) {
ctx := context.Background()

vtParams := mysql.ConnParams{
Host: "localhost",
Port: clusterInstance.VtgateMySQLPort,
}
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

streamConn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer streamConn.Close()

utils.Exec(t, conn, fmt.Sprintf("use %s", lookupKeyspace))
utils.Exec(t, conn, createSpecificStreamingColsMessage)
defer utils.Exec(t, conn, "drop table vitess_message4")

utils.Exec(t, streamConn, "set workload = 'olap'")
err = streamConn.ExecuteStreamFetch("stream * from vitess_message4")
require.NoError(t, err)

wantFields := []*querypb.Field{{
Name: "id",
Type: sqltypes.Int64,
}, {
Name: "msg1",
Type: sqltypes.VarChar,
}}
gotFields, err := streamConn.Fields()
for i, field := range gotFields {
// Remove other artifacts.
gotFields[i] = &querypb.Field{
Name: field.Name,
Type: field.Type,
}
}
require.NoError(t, err)
cmp.MustMatch(t, wantFields, gotFields)

utils.Exec(t, conn, "insert into vitess_message4(id, msg1, msg2) values(1, 'hello world', 3)")

got, err := streamConn.FetchNext(nil)
require.NoError(t, err)
want := []sqltypes.Value{
sqltypes.NewInt64(1),
sqltypes.NewVarChar("hello world"),
}
cmp.MustMatch(t, want, got)

// Verify Ack.
qr := utils.Exec(t, conn, "update vitess_message4 set time_acked = 123, time_next = null where id = 1 and time_acked is null")
assert.Equal(t, uint64(1), qr.RowsAffected)
}

func getTimeEpoch(qr *sqltypes.Result) (int64, int64) {
if len(qr.Rows) != 1 {
return 0, 0
Expand Down
105 changes: 84 additions & 21 deletions go/vt/vttablet/tabletserver/schema/load_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import (
"strings"
"time"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/mysqlctl"

querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/evalengine"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
)

Expand Down Expand Up @@ -64,21 +66,6 @@ func fetchColumns(ta *Table, conn *connpool.DBConn, databaseName, sqlTableName s
}

func loadMessageInfo(ta *Table, comment string) error {
hiddenCols := map[string]struct{}{
"priority": {},
"time_next": {},
"epoch": {},
"time_acked": {},
}

requiredCols := []string{
"id",
"priority",
"time_next",
"epoch",
"time_acked",
}

ta.MessageInfo = &MessageInfo{}
// Extract keyvalues.
keyvals := make(map[string]string)
Expand Down Expand Up @@ -117,20 +104,56 @@ func loadMessageInfo(ta *Table, comment string) error {

ta.MessageInfo.MaxBackoff, _ = getDuration(keyvals, "vt_max_backoff")

// these columns are required for message manager to function properly, but only
// id is required to be streamed to subscribers
requiredCols := []string{
"id",
"priority",
"time_next",
"epoch",
"time_acked",
}

// by default, these columns are loaded for the message manager, but not sent to subscribers
// via stream * from msg_tbl
hiddenCols := map[string]struct{}{
"priority": {},
"time_next": {},
"epoch": {},
"time_acked": {},
}

// make sure required columns exist in the table schema
for _, col := range requiredCols {
num := ta.FindColumn(sqlparser.NewColIdent(col))
if num == -1 {
return fmt.Errorf("%s missing from message table: %s", col, ta.Name.String())
}
}

// Load user-defined columns. Any "unrecognized" column is user-defined.
for _, field := range ta.Fields {
if _, ok := hiddenCols[strings.ToLower(field.Name)]; ok {
continue
// check to see if the user has specified columns to stream to subscribers
specifiedCols := parseMessageCols(keyvals, "vt_message_cols")

if len(specifiedCols) > 0 {
// make sure that all the specified columns exist in the table schema
for _, col := range specifiedCols {
num := ta.FindColumn(sqlparser.NewColIdent(col))
if num == -1 {
return fmt.Errorf("%s missing from message table: %s", col, ta.Name.String())
}
}

// the original implementation in message_manager assumes id is the first column, as originally users
// could not restrict columns. As the PK, id is required, and by requiring it as the first column,
// we avoid the need to change the implementation.
if specifiedCols[0] != "id" {
return fmt.Errorf("vt_message_cols must begin with id: %s", ta.Name.String())
}
ta.MessageInfo.Fields = append(ta.MessageInfo.Fields, field)
ta.MessageInfo.Fields = getSpecifiedMessageFields(ta.Fields, specifiedCols)
} else {
ta.MessageInfo.Fields = getDefaultMessageFields(ta.Fields, hiddenCols)
}

return nil
}

Expand All @@ -157,3 +180,43 @@ func getNum(in map[string]string, key string) (int, error) {
}
return v, nil
}

// parseMessageCols parses the vt_message_cols attribute. It doesn't error out if the attribute is not specified
// because the default behavior is to stream all columns to subscribers, and if done incorrectly, later checks
// to see if the columns exist in the table schema will fail.
func parseMessageCols(in map[string]string, key string) []string {
sv := in[key]
cols := strings.Split(sv, "|")
if len(cols) == 1 && strings.TrimSpace(cols[0]) == "" {
return nil
}
return cols
}

func getDefaultMessageFields(tableFields []*querypb.Field, hiddenCols map[string]struct{}) []*querypb.Field {
fields := make([]*querypb.Field, 0, len(tableFields))
// Load user-defined columns. Any "unrecognized" column is user-defined.
for _, field := range tableFields {
if _, ok := hiddenCols[strings.ToLower(field.Name)]; ok {
continue
}

fields = append(fields, field)
}
return fields
}

// we have already validated that all the specified columns exist in the table schema, so we don't need to
// check again and possibly return an error here.
func getSpecifiedMessageFields(tableFields []*querypb.Field, specifiedCols []string) []*querypb.Field {
fields := make([]*querypb.Field, 0, len(specifiedCols))
for _, col := range specifiedCols {
for _, field := range tableFields {
if res, _ := evalengine.NullsafeCompare(sqltypes.NewVarChar(field.Name), sqltypes.NewVarChar(strings.TrimSpace(col)), collations.Default()); res == 0 {
fields = append(fields, field)
break
}
}
}
return fields
}
57 changes: 55 additions & 2 deletions go/vt/vttablet/tabletserver/schema/load_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package schema

import (
"context"
"errors"
"reflect"
"strings"
"testing"
Expand All @@ -28,11 +29,10 @@ import (

"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

querypb "vitess.io/vitess/go/vt/proto/query"
)

func TestLoadTable(t *testing.T) {
Expand Down Expand Up @@ -134,6 +134,59 @@ func TestLoadTableMessage(t *testing.T) {
want.MessageInfo.MaxBackoff = 100 * time.Second
assert.Equal(t, want, table)

//
// multiple tests for vt_message_cols
//
origFields := want.MessageInfo.Fields

// Test loading id column from vt_message_cols
table, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_message_cols=id,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30,vt_min_backoff=10,vt_max_backoff=100", db)
require.NoError(t, err)
want.MessageInfo.Fields = []*querypb.Field{{
Name: "id",
Type: sqltypes.Int64,
}}
assert.Equal(t, want, table)

// Test loading message column from vt_message_cols
_, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_message_cols=message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30,vt_min_backoff=10,vt_max_backoff=100", db)
require.Equal(t, errors.New("vt_message_cols must begin with id: test_table"), err)

// Test loading id & message columns from vt_message_cols
table, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_message_cols=id|message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30,vt_min_backoff=10,vt_max_backoff=100", db)
require.NoError(t, err)
want.MessageInfo.Fields = []*querypb.Field{{
Name: "id",
Type: sqltypes.Int64,
}, {
Name: "message",
Type: sqltypes.VarBinary,
}}
assert.Equal(t, want, table)

// Test loading id & message columns in reverse order from vt_message_cols
_, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_message_cols=message|id,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30,vt_min_backoff=10,vt_max_backoff=100", db)
require.Equal(t, errors.New("vt_message_cols must begin with id: test_table"), err)

// Test setting zero columns on vt_message_cols, which is ignored and loads the default columns
table, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_message_cols,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30,vt_min_backoff=10,vt_max_backoff=100", db)
require.NoError(t, err)
want.MessageInfo.Fields = []*querypb.Field{{
Name: "id",
Type: sqltypes.Int64,
}, {
Name: "message",
Type: sqltypes.VarBinary,
}}
assert.Equal(t, want, table)

// reset fields after all vt_message_cols tests
want.MessageInfo.Fields = origFields

//
// end vt_message_cols tests
//

// Missing property
_, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_ack_wait=30", db)
wanterr := "not specified for message table"
Expand Down