diff --git a/go/cmd/vtaclcheck/tableacl2.json b/go/cmd/vtaclcheck/tableacl2.json index a35be8ad278..68519b4bded 100644 --- a/go/cmd/vtaclcheck/tableacl2.json +++ b/go/cmd/vtaclcheck/tableacl2.json @@ -70,6 +70,20 @@ "writers": ["dev"], "admins": ["dev"] }, + { + "name": "vitess_message4", + "table_names_or_prefixes": ["vitess_message4"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + }, + { + "name": "vitess_message5", + "table_names_or_prefixes": ["vitess_message5"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + }, { "name": "vitess_message_auto", "table_names_or_prefixes": ["vitess_message_auto"], diff --git a/go/test/endtoend/messaging/main_test.go b/go/test/endtoend/messaging/main_test.go index d60b4468d32..49477ebe631 100644 --- a/go/test/endtoend/messaging/main_test.go +++ b/go/test/endtoend/messaging/main_test.go @@ -22,9 +22,8 @@ import ( "os" "testing" - _ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn" - "vitess.io/vitess/go/test/endtoend/cluster" + _ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn" ) var ( @@ -98,7 +97,8 @@ var ( "tables": { "unsharded_message": {}, "vitess_message": {}, - "vitess_message3": {} + "vitess_message3": {}, + "vitess_message4": {} } }` ) diff --git a/go/test/endtoend/messaging/message_test.go b/go/test/endtoend/messaging/message_test.go index 1f0156757c2..4169848a97f 100644 --- a/go/test/endtoend/messaging/message_test.go +++ b/go/test/endtoend/messaging/message_test.go @@ -25,22 +25,19 @@ import ( "testing" "time" - "vitess.io/vitess/go/test/endtoend/utils" - - cmp "vitess.io/vitess/go/test/utils" - - "vitess.io/vitess/go/vt/vtgate/evalengine" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/utils" + cmp "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/proto/query" querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/vtgate/evalengine" "vitess.io/vitess/go/vt/vtgate/vtgateconn" ) @@ -63,6 +60,7 @@ create table vitess_message( # required indexes primary key(id), + index next_idx(time_next), index poller_idx(time_acked, priority, time_next desc) # add any secondary indexes or foreign keys - no restrictions @@ -200,6 +198,7 @@ create table vitess_message3( # required indexes primary key(id), + index next_idx(time_next), index poller_idx(time_acked, priority, time_next desc) # add any secondary indexes or foreign keys - no restrictions @@ -274,6 +273,87 @@ func TestThreeColMessage(t *testing.T) { assert.Equal(t, uint64(1), qr.RowsAffected) } +var createSpecificStreamingColsMessage = `create table vitess_message4( + # required columns + id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence', + priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first', + epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling', + time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set', + time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set', + + # add as many custom fields here as required + # optional - these are suggestions + tenant_id bigint, + message json, + + # custom to this test + msg1 varchar(128), + msg2 bigint, + + # required indexes + primary key(id), + index next_idx(time_next), + index poller_idx(time_acked, priority, time_next desc) + + # add any secondary indexes or foreign keys - no restrictions +) comment 'vitess_message,vt_message_cols=id|msg1,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` + +func TestSpecificStreamingColsMessage(t *testing.T) { + ctx := context.Background() + + vtParams := mysql.ConnParams{ + Host: "localhost", + Port: clusterInstance.VtgateMySQLPort, + } + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + streamConn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer streamConn.Close() + + utils.Exec(t, conn, fmt.Sprintf("use %s", lookupKeyspace)) + utils.Exec(t, conn, createSpecificStreamingColsMessage) + defer utils.Exec(t, conn, "drop table vitess_message4") + + utils.Exec(t, streamConn, "set workload = 'olap'") + err = streamConn.ExecuteStreamFetch("stream * from vitess_message4") + require.NoError(t, err) + + wantFields := []*querypb.Field{{ + Name: "id", + Type: sqltypes.Int64, + }, { + Name: "msg1", + Type: sqltypes.VarChar, + }} + gotFields, err := streamConn.Fields() + for i, field := range gotFields { + // Remove other artifacts. + gotFields[i] = &querypb.Field{ + Name: field.Name, + Type: field.Type, + } + } + require.NoError(t, err) + cmp.MustMatch(t, wantFields, gotFields) + + utils.Exec(t, conn, "insert into vitess_message4(id, msg1, msg2) values(1, 'hello world', 3)") + + got, err := streamConn.FetchNext(nil) + require.NoError(t, err) + want := []sqltypes.Value{ + sqltypes.NewInt64(1), + sqltypes.NewVarChar("hello world"), + } + cmp.MustMatch(t, want, got) + + // Verify Ack. + qr := utils.Exec(t, conn, "update vitess_message4 set time_acked = 123, time_next = null where id = 1 and time_acked is null") + assert.Equal(t, uint64(1), qr.RowsAffected) +} + func getTimeEpoch(qr *sqltypes.Result) (int64, int64) { if len(qr.Rows) != 1 { return 0, 0 diff --git a/go/vt/vttablet/tabletserver/schema/load_table.go b/go/vt/vttablet/tabletserver/schema/load_table.go index f2c0a89e0ff..7dd6f0b90b8 100644 --- a/go/vt/vttablet/tabletserver/schema/load_table.go +++ b/go/vt/vttablet/tabletserver/schema/load_table.go @@ -23,10 +23,12 @@ import ( "strings" "time" + "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/mysqlctl" - + querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vtgate/evalengine" "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" ) @@ -64,21 +66,6 @@ func fetchColumns(ta *Table, conn *connpool.DBConn, databaseName, sqlTableName s } func loadMessageInfo(ta *Table, comment string) error { - hiddenCols := map[string]struct{}{ - "priority": {}, - "time_next": {}, - "epoch": {}, - "time_acked": {}, - } - - requiredCols := []string{ - "id", - "priority", - "time_next", - "epoch", - "time_acked", - } - ta.MessageInfo = &MessageInfo{} // Extract keyvalues. keyvals := make(map[string]string) @@ -117,6 +104,26 @@ func loadMessageInfo(ta *Table, comment string) error { ta.MessageInfo.MaxBackoff, _ = getDuration(keyvals, "vt_max_backoff") + // these columns are required for message manager to function properly, but only + // id is required to be streamed to subscribers + requiredCols := []string{ + "id", + "priority", + "time_next", + "epoch", + "time_acked", + } + + // by default, these columns are loaded for the message manager, but not sent to subscribers + // via stream * from msg_tbl + hiddenCols := map[string]struct{}{ + "priority": {}, + "time_next": {}, + "epoch": {}, + "time_acked": {}, + } + + // make sure required columns exist in the table schema for _, col := range requiredCols { num := ta.FindColumn(sqlparser.NewColIdent(col)) if num == -1 { @@ -124,13 +131,29 @@ func loadMessageInfo(ta *Table, comment string) error { } } - // Load user-defined columns. Any "unrecognized" column is user-defined. - for _, field := range ta.Fields { - if _, ok := hiddenCols[strings.ToLower(field.Name)]; ok { - continue + // check to see if the user has specified columns to stream to subscribers + specifiedCols := parseMessageCols(keyvals, "vt_message_cols") + + if len(specifiedCols) > 0 { + // make sure that all the specified columns exist in the table schema + for _, col := range specifiedCols { + num := ta.FindColumn(sqlparser.NewColIdent(col)) + if num == -1 { + return fmt.Errorf("%s missing from message table: %s", col, ta.Name.String()) + } + } + + // the original implementation in message_manager assumes id is the first column, as originally users + // could not restrict columns. As the PK, id is required, and by requiring it as the first column, + // we avoid the need to change the implementation. + if specifiedCols[0] != "id" { + return fmt.Errorf("vt_message_cols must begin with id: %s", ta.Name.String()) } - ta.MessageInfo.Fields = append(ta.MessageInfo.Fields, field) + ta.MessageInfo.Fields = getSpecifiedMessageFields(ta.Fields, specifiedCols) + } else { + ta.MessageInfo.Fields = getDefaultMessageFields(ta.Fields, hiddenCols) } + return nil } @@ -157,3 +180,43 @@ func getNum(in map[string]string, key string) (int, error) { } return v, nil } + +// parseMessageCols parses the vt_message_cols attribute. It doesn't error out if the attribute is not specified +// because the default behavior is to stream all columns to subscribers, and if done incorrectly, later checks +// to see if the columns exist in the table schema will fail. +func parseMessageCols(in map[string]string, key string) []string { + sv := in[key] + cols := strings.Split(sv, "|") + if len(cols) == 1 && strings.TrimSpace(cols[0]) == "" { + return nil + } + return cols +} + +func getDefaultMessageFields(tableFields []*querypb.Field, hiddenCols map[string]struct{}) []*querypb.Field { + fields := make([]*querypb.Field, 0, len(tableFields)) + // Load user-defined columns. Any "unrecognized" column is user-defined. + for _, field := range tableFields { + if _, ok := hiddenCols[strings.ToLower(field.Name)]; ok { + continue + } + + fields = append(fields, field) + } + return fields +} + +// we have already validated that all the specified columns exist in the table schema, so we don't need to +// check again and possibly return an error here. +func getSpecifiedMessageFields(tableFields []*querypb.Field, specifiedCols []string) []*querypb.Field { + fields := make([]*querypb.Field, 0, len(specifiedCols)) + for _, col := range specifiedCols { + for _, field := range tableFields { + if res, _ := evalengine.NullsafeCompare(sqltypes.NewVarChar(field.Name), sqltypes.NewVarChar(strings.TrimSpace(col)), collations.Default()); res == 0 { + fields = append(fields, field) + break + } + } + } + return fields +} diff --git a/go/vt/vttablet/tabletserver/schema/load_table_test.go b/go/vt/vttablet/tabletserver/schema/load_table_test.go index f5771f86981..d92e9ee420b 100644 --- a/go/vt/vttablet/tabletserver/schema/load_table_test.go +++ b/go/vt/vttablet/tabletserver/schema/load_table_test.go @@ -18,6 +18,7 @@ package schema import ( "context" + "errors" "reflect" "strings" "testing" @@ -28,11 +29,10 @@ import ( "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" - - querypb "vitess.io/vitess/go/vt/proto/query" ) func TestLoadTable(t *testing.T) { @@ -134,6 +134,59 @@ func TestLoadTableMessage(t *testing.T) { want.MessageInfo.MaxBackoff = 100 * time.Second assert.Equal(t, want, table) + // + // multiple tests for vt_message_cols + // + origFields := want.MessageInfo.Fields + + // Test loading id column from vt_message_cols + table, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_message_cols=id,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30,vt_min_backoff=10,vt_max_backoff=100", db) + require.NoError(t, err) + want.MessageInfo.Fields = []*querypb.Field{{ + Name: "id", + Type: sqltypes.Int64, + }} + assert.Equal(t, want, table) + + // Test loading message column from vt_message_cols + _, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_message_cols=message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30,vt_min_backoff=10,vt_max_backoff=100", db) + require.Equal(t, errors.New("vt_message_cols must begin with id: test_table"), err) + + // Test loading id & message columns from vt_message_cols + table, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_message_cols=id|message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30,vt_min_backoff=10,vt_max_backoff=100", db) + require.NoError(t, err) + want.MessageInfo.Fields = []*querypb.Field{{ + Name: "id", + Type: sqltypes.Int64, + }, { + Name: "message", + Type: sqltypes.VarBinary, + }} + assert.Equal(t, want, table) + + // Test loading id & message columns in reverse order from vt_message_cols + _, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_message_cols=message|id,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30,vt_min_backoff=10,vt_max_backoff=100", db) + require.Equal(t, errors.New("vt_message_cols must begin with id: test_table"), err) + + // Test setting zero columns on vt_message_cols, which is ignored and loads the default columns + table, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_message_cols,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30,vt_min_backoff=10,vt_max_backoff=100", db) + require.NoError(t, err) + want.MessageInfo.Fields = []*querypb.Field{{ + Name: "id", + Type: sqltypes.Int64, + }, { + Name: "message", + Type: sqltypes.VarBinary, + }} + assert.Equal(t, want, table) + + // reset fields after all vt_message_cols tests + want.MessageInfo.Fields = origFields + + // + // end vt_message_cols tests + // + // Missing property _, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_ack_wait=30", db) wanterr := "not specified for message table"