diff --git a/go/test/endtoend/messaging/main_test.go b/go/test/endtoend/messaging/main_test.go index 168aea03858..d60b4468d32 100644 --- a/go/test/endtoend/messaging/main_test.go +++ b/go/test/endtoend/messaging/main_test.go @@ -38,27 +38,43 @@ var ( userKeyspace = "user" lookupKeyspace = "lookup" createShardedMessage = `create table sharded_message( - id bigint, - priority bigint default 0, - time_next bigint default 0, - epoch bigint, - time_acked bigint, - message varchar(128), + # required columns + id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence', + priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first', + epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling', + time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set', + time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set', + + # add as many custom fields here as required + # optional - these are suggestions + tenant_id bigint, + message json, + + # required indexes primary key(id), - index next_idx(priority, time_next desc), - index ack_idx(time_acked) - ) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` + index poller_idx(time_acked, priority, time_next desc) + + # add any secondary indexes or foreign keys - no restrictions + ) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` createUnshardedMessage = `create table unsharded_message( - id bigint, - priority bigint default 0, - time_next bigint default 0, - epoch bigint, - time_acked bigint, - message varchar(128), + # required columns + id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence', + priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first', + epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling', + time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set', + time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set', + + # add as many custom fields here as required + # optional - these are suggestions + tenant_id bigint, + message json, + + # required indexes primary key(id), - index next_idx(priority, time_next desc), - index ack_idx(time_acked) - ) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` + index poller_idx(time_acked, priority, time_next desc) + + # add any secondary indexes or foreign keys - no restrictions + ) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` userVschema = `{ "sharded": true, "vindexes": { diff --git a/go/test/endtoend/messaging/message_test.go b/go/test/endtoend/messaging/message_test.go index b9e59c02f2e..1f0156757c2 100644 --- a/go/test/endtoend/messaging/message_test.go +++ b/go/test/endtoend/messaging/message_test.go @@ -44,17 +44,30 @@ import ( "vitess.io/vitess/go/vt/vtgate/vtgateconn" ) -var createMessage = `create table vitess_message( - id bigint, - priority bigint default 0, - time_next bigint default 0, - epoch bigint, - time_acked bigint, - message varchar(128), +var testMessage = "{\"message\":\"hello world\"}" +var testShardedMessagef = "{\"message\": \"hello world\", \"id\": %d}" + +var createMessage = ` +create table vitess_message( + # required columns + id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence', + priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first', + epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling', + time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set', + time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set', + + # add as many custom fields here as required + # optional - these are suggestions + tenant_id bigint, + message json, + + # required indexes primary key(id), - index next_idx(priority, time_next desc), - index ack_idx(time_acked)) -comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` + index poller_idx(time_acked, priority, time_next desc) + + # add any secondary indexes or foreign keys - no restrictions +) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1' +` func TestMessage(t *testing.T) { ctx := context.Background() @@ -84,9 +97,12 @@ func TestMessage(t *testing.T) { wantFields := []*querypb.Field{{ Name: "id", Type: sqltypes.Int64, + }, { + Name: "tenant_id", + Type: sqltypes.Int64, }, { Name: "message", - Type: sqltypes.VarChar, + Type: sqltypes.TypeJSON, }} gotFields, err := streamConn.Fields() for i, field := range gotFields { @@ -99,7 +115,7 @@ func TestMessage(t *testing.T) { require.NoError(t, err) cmp.MustMatch(t, wantFields, gotFields) - utils.Exec(t, conn, "insert into vitess_message(id, message) values(1, 'hello world')") + utils.Exec(t, conn, fmt.Sprintf("insert into vitess_message(id, tenant_id, message) values(1, 1, '%s')", testMessage)) // account for jitter in timings, maxJitter uses the current hardcoded value for jitter in message_manager.go jitter := int64(0) @@ -112,7 +128,8 @@ func TestMessage(t *testing.T) { want := []sqltypes.Value{ sqltypes.NewInt64(1), - sqltypes.NewVarChar("hello world"), + sqltypes.NewInt64(1), + sqltypes.TestValue(sqltypes.TypeJSON, testMessage), } cmp.MustMatch(t, want, got) @@ -163,18 +180,31 @@ func TestMessage(t *testing.T) { assert.Equal(t, 0, len(qr.Rows)) } -var createThreeColMessage = `create table vitess_message3( - id bigint, - priority bigint default 0, - time_next bigint default 0, - epoch bigint, - time_acked bigint, +var createThreeColMessage = ` +create table vitess_message3( + # required columns + id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence', + priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first', + epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling', + time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set', + time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set', + + # add as many custom fields here as required + # optional - these are suggestions + tenant_id bigint, + message json, + + # custom to this test msg1 varchar(128), msg2 bigint, + + # required indexes primary key(id), - index next_idx(priority, time_next desc), - index ack_idx(time_acked)) -comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` + index poller_idx(time_acked, priority, time_next desc) + + # add any secondary indexes or foreign keys - no restrictions +) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1' +` func TestThreeColMessage(t *testing.T) { ctx := context.Background() @@ -202,6 +232,12 @@ func TestThreeColMessage(t *testing.T) { wantFields := []*querypb.Field{{ Name: "id", Type: sqltypes.Int64, + }, { + Name: "tenant_id", + Type: sqltypes.Int64, + }, { + Name: "message", + Type: sqltypes.TypeJSON, }, { Name: "msg1", Type: sqltypes.VarChar, @@ -220,12 +256,14 @@ func TestThreeColMessage(t *testing.T) { require.NoError(t, err) cmp.MustMatch(t, wantFields, gotFields) - utils.Exec(t, conn, "insert into vitess_message3(id, msg1, msg2) values(1, 'hello world', 3)") + utils.Exec(t, conn, fmt.Sprintf("insert into vitess_message3(id, tenant_id, message, msg1, msg2) values(1, 3, '%s', 'hello world', 3)", testMessage)) got, err := streamConn.FetchNext(nil) require.NoError(t, err) want := []sqltypes.Value{ sqltypes.NewInt64(1), + sqltypes.NewInt64(3), + sqltypes.TestValue(sqltypes.TypeJSON, testMessage), sqltypes.NewVarChar("hello world"), sqltypes.NewInt64(3), } @@ -292,7 +330,8 @@ func TestReparenting(t *testing.T) { assertClientCount(t, 1, shard0Replica) assertClientCount(t, 1, shard1Primary) session := stream.Session("@primary", nil) - cluster.ExecuteQueriesUsingVtgate(t, session, "insert into sharded_message (id, message) values (3,'hello world 3')") + msg3 := fmt.Sprintf(testShardedMessagef, 3) + cluster.ExecuteQueriesUsingVtgate(t, session, fmt.Sprintf("insert into sharded_message (id, tenant_id, message) values (3,3,'%s')", msg3)) // validate that we have received inserted message stream.Next() @@ -353,8 +392,10 @@ func TestConnection(t *testing.T) { // in message stream session := stream.Session("@primary", nil) // insert data in primary - cluster.ExecuteQueriesUsingVtgate(t, session, "insert into sharded_message (id, message) values (2,'hello world 2')") - cluster.ExecuteQueriesUsingVtgate(t, session, "insert into sharded_message (id, message) values (5,'hello world 5')") + msg2 := fmt.Sprintf(testShardedMessagef, 2) + msg5 := fmt.Sprintf(testShardedMessagef, 5) + cluster.ExecuteQueriesUsingVtgate(t, session, fmt.Sprintf("insert into sharded_message (id, tenant_id, message) values (2,2,'%s')", msg2)) + cluster.ExecuteQueriesUsingVtgate(t, session, fmt.Sprintf("insert into sharded_message (id, tenant_id, message) values (5,5,'%s')", msg5)) // validate in msg stream _, err = stream.Next() require.Nil(t, err) @@ -380,15 +421,18 @@ func testMessaging(t *testing.T, name, ks string) { defer stream.Close() session := stream.Session("@primary", nil) - cluster.ExecuteQueriesUsingVtgate(t, session, "insert into "+name+" (id, message) values (4,'hello world 4')") - cluster.ExecuteQueriesUsingVtgate(t, session, "insert into "+name+" (id, message) values (1,'hello world 1')") + msg4 := fmt.Sprintf(testShardedMessagef, 4) + msg1 := fmt.Sprintf(testShardedMessagef, 1) + cluster.ExecuteQueriesUsingVtgate(t, session, fmt.Sprintf("insert into "+name+" (id, tenant_id, message) values (4,4,'%s')", msg4)) + cluster.ExecuteQueriesUsingVtgate(t, session, fmt.Sprintf("insert into "+name+" (id, tenant_id, message) values (1,1,'%s')", msg1)) // validate fields res, err := stream.MessageStream(ks, "", nil, name) require.Nil(t, err) - require.Equal(t, 2, len(res.Fields)) + require.Equal(t, 3, len(res.Fields)) validateField(t, res.Fields[0], "id", query.Type_INT64) - validateField(t, res.Fields[1], "message", query.Type_VARCHAR) + validateField(t, res.Fields[1], "tenant_id", query.Type_INT64) + validateField(t, res.Fields[2], "message", query.Type_JSON) // validate recieved msgs resMap := make(map[string]string) @@ -406,8 +450,8 @@ func testMessaging(t *testing.T, name, ks string) { } } - assert.Equal(t, "hello world 1", resMap["1"]) - assert.Equal(t, "hello world 4", resMap["4"]) + assert.Equal(t, "1", resMap["1"]) + assert.Equal(t, "4", resMap["4"]) resMap = make(map[string]string) stream.ClearMem() @@ -422,7 +466,7 @@ func testMessaging(t *testing.T, name, ks string) { } } - assert.Equal(t, "hello world 1", resMap["1"]) + assert.Equal(t, "1", resMap["1"]) // validate message ack with 1 and 4, only 1 should be ack qr, err = session.Execute(context.Background(), "update "+name+" set time_acked = 1, time_next = null where id in (1, 4) and time_acked is null", nil) @@ -519,18 +563,18 @@ func assertClientCount(t *testing.T, expected int, vttablet *cluster.Vttablet) { } func parseDebugVars(t *testing.T, output interface{}, vttablet *cluster.Vttablet) { - debugVarUrl := fmt.Sprintf("http://%s:%d/debug/vars", vttablet.VttabletProcess.TabletHostname, vttablet.HTTPPort) - resp, err := http.Get(debugVarUrl) + debugVarURL := fmt.Sprintf("http://%s:%d/debug/vars", vttablet.VttabletProcess.TabletHostname, vttablet.HTTPPort) + resp, err := http.Get(debugVarURL) if err != nil { - t.Fatalf("failed to fetch %q: %v", debugVarUrl, err) + t.Fatalf("failed to fetch %q: %v", debugVarURL, err) } respByte, _ := io.ReadAll(resp.Body) if resp.StatusCode != 200 { - t.Fatalf("status code %d while fetching %q:\n%s", resp.StatusCode, debugVarUrl, respByte) + t.Fatalf("status code %d while fetching %q:\n%s", resp.StatusCode, debugVarURL, respByte) } if err := json.Unmarshal(respByte, output); err != nil { - t.Fatalf("failed to unmarshal JSON from %q: %v", debugVarUrl, err) + t.Fatalf("failed to unmarshal JSON from %q: %v", debugVarURL, err) } } diff --git a/go/test/endtoend/vtgate/godriver/main_test.go b/go/test/endtoend/vtgate/godriver/main_test.go index 3755d78d4f3..bbc5536a3fa 100644 --- a/go/test/endtoend/vtgate/godriver/main_test.go +++ b/go/test/endtoend/vtgate/godriver/main_test.go @@ -17,8 +17,8 @@ limitations under the License. package godriver import ( - "database/sql" "flag" + "fmt" "os" "strconv" "testing" @@ -42,18 +42,24 @@ var ( KeyspaceName = "customer" SchemaSQL = ` create table my_message( - time_scheduled bigint, - id bigint, - time_next bigint, - epoch bigint, - time_created bigint, - time_acked bigint, - message varchar(128), - priority tinyint NOT NULL DEFAULT '0', - primary key(time_scheduled, id), - unique index id_idx(id), - index next_idx(priority, time_next) -) comment 'vitess_message,vt_ack_wait=30,vt_purge_after=86400,vt_batch_size=10,vt_cache_size=10000,vt_poller_interval=30'; + # required columns + id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence', + priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first', + epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling', + time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set', + time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set', + + # add as many custom fields here as required + # optional - these are suggestions + tenant_id bigint, + message json, + + # required indexes + primary key(id), + index poller_idx(time_acked, priority, time_next desc) + + # add any secondary indexes or foreign keys - no restrictions +) comment 'vitess_message,vt_min_backoff=30,vt_max_backoff=3600,vt_ack_wait=30,vt_purge_after=86400,vt_batch_size=10,vt_cache_size=10000,vt_poller_interval=30' ` VSchema = ` { @@ -75,6 +81,8 @@ create table my_message( } } ` + + testMessage = "{\"message\":\"hello world\"}" ) func TestMain(m *testing.M) { @@ -136,8 +144,7 @@ func TestStreamMessaging(t *testing.T) { defer db.Close() // Exec not allowed in streaming - timenow := time.Now().Add(time.Second * 60).UnixNano() - _, err = db.Exec("insert into my_message(id, message, time_scheduled) values(1, 'hello world', :curr_time)", sql.Named("curr_time", timenow)) + _, err = db.Exec(fmt.Sprintf("insert into my_message(id, message) values(1, '%s')", testMessage)) require.NoError(t, err) // for streaming messages diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index b2ae445cff8..70b2976be98 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -188,22 +188,39 @@ type messageManager struct { receivers []*receiverWithStatus curReceiver int messagesPending bool - - // streamMu keeps the cache and database consistent with each other. - // Specifically: + // streamCancel is set when a vstream is running, and is reset + // to nil after a cancel. This allows for startVStream and stopVStream + // to be idempotent. + // This is implicitly protected by the main mutex because startVStream + // and stopVStream are called while holding the main mutex. + streamCancel func() + + // cacheManagementMu keeps the cache and database consistent with each + // other by ensuring that only one of the streams is processing messages + // and updating the cache at a time. The poller uses a results streamer to + // pull directly from the message table and the message manager uses a + // binlog streamer to process change events. This mutex ensures that only + // one of them are updating the cache at any one time. // It prevents items from being removed from cache while the poller // reads from the db and adds items to it. Otherwise, the poller // might add an older snapshot of a row that was just postponed. - // It blocks vstream from receiving messages while the poller - // reads a snapshot and updates lastPollPosition. Any events older than - // lastPollPosition must be ignored by the vstream. It consequently - // also blocks vstream from updating the cache while the poller is - // active. - streamMu sync.Mutex - // streamCancel is set when a vstream is running, and is reset - // to nil after a cancel. This allows for startVStream and stopVstream - // to be idempotent. - streamCancel func() + // It blocks the vstream (binlog streamer) from receiving messages while + // the poller reads a snapshot and updates lastPollPosition. Any events + // older than lastPollPosition must be ignored by the vstream. It + // consequently also blocks vstream from updating the cache while the + // poller is active. + // TODO(mattlord): since this is primarily a flow control mechanism, we + // should do it in a more idiomatic go way using channels or cond vars. + cacheManagementMu sync.Mutex + // The lastPollPosition variable is the main point of coordination + // between the poller and the binlog streamer to ensure that we are + // not re-processing older events and moving along linearly in the + // shared virtual GTID stream within the message manager. + // It is theoretically possible for the binlog streamer to be ahead of + // the lastPollPosition. This is because of how semi-sync works today + // where a replica could have received and processed a GTID that the primary + // may not have yet commited; but this is harmless because any events missed + // will be picked up during the next poller run. lastPollPosition *mysql.Position // wg is for ensuring all running goroutines have returned @@ -252,7 +269,9 @@ func newMessageManager(tsv TabletService, vs VStreamer, table *schema.Table, pos }}, } mm.readByPriorityAndTimeNext = sqlparser.BuildParsedQuery( - "select priority, time_next, epoch, time_acked, %s from %v where time_next < %a order by priority, time_next desc limit %a", + // There should be a poller_idx defined on (time_acked, priority, time_next desc) + // for this to be as effecient as possible + "select priority, time_next, epoch, time_acked, %s from %v where time_acked is null and time_next < %a order by priority, time_next desc limit %a", columnList, mm.name, ":time_next", ":max") mm.ackQuery = sqlparser.BuildParsedQuery( "update %v set time_acked = %a, time_next = null where id in %a and time_acked is null", @@ -570,12 +589,12 @@ func (mm *messageManager) send(receiver *receiverWithStatus, qr *sqltypes.Result } defer func() { - // Hold streamMu to prevent the ids from being discarded + // Hold cacheManagementMu to prevent the ids from being discarded // if poller is active. Otherwise, it could have read a // snapshot of a row before the postponement and requeue // the message. - mm.streamMu.Lock() - defer mm.streamMu.Unlock() + mm.cacheManagementMu.Lock() + defer mm.cacheManagementMu.Unlock() mm.cache.Discard(ids) }() @@ -616,8 +635,6 @@ func (mm *messageManager) postpone(tsv TabletService, ackWaitTime time.Duration, } func (mm *messageManager) startVStream() { - mm.streamMu.Lock() - defer mm.streamMu.Unlock() if mm.streamCancel != nil { return } @@ -627,10 +644,6 @@ func (mm *messageManager) startVStream() { } func (mm *messageManager) stopVStream() { - log.Infof("messageManager - stopVStream called. Acquiring streamMu lock") - mm.streamMu.Lock() - log.Infof("messageManager - acquired streamMu lock") - defer mm.streamMu.Unlock() log.Infof("messageManager - calling stream cancel") if mm.streamCancel != nil { mm.streamCancel() @@ -665,8 +678,9 @@ func (mm *messageManager) runOneVStream(ctx context.Context) error { var fields []*querypb.Field err := mm.vs.Stream(ctx, "current", nil, mm.vsFilter, func(events []*binlogdatapb.VEvent) error { - mm.streamMu.Lock() - defer mm.streamMu.Unlock() + // We need to get the flow control lock + mm.cacheManagementMu.Lock() + defer mm.cacheManagementMu.Unlock() select { case <-ctx.Done(): @@ -749,14 +763,19 @@ func (mm *messageManager) processRowEvent(fields []*querypb.Field, rowEvent *bin } func (mm *messageManager) runPoller() { + // We need to get the flow control lock first + mm.cacheManagementMu.Lock() + defer mm.cacheManagementMu.Unlock() + // Now we can get the main/structure lock and ensure e.g. that the + // the receiver count does not change during the run + mm.mu.Lock() + defer mm.mu.Unlock() + // Fast-path. Skip all the work. - if mm.receiverCount() == 0 { + if len(mm.receivers) == 0 { return } - mm.streamMu.Lock() - defer mm.streamMu.Unlock() - ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), mm.pollerTicks.Interval()) defer func() { mm.tsv.LogError() @@ -768,23 +787,17 @@ func (mm *messageManager) runPoller() { "time_next": sqltypes.Int64BindVariable(time.Now().UnixNano()), "max": sqltypes.Int64BindVariable(int64(size)), } + qr, err := mm.readPending(ctx, bindVars) if err != nil { return } - // Obtain mu lock to verify and preserve that len(receivers) != 0. - mm.mu.Lock() - defer mm.mu.Unlock() mm.messagesPending = false if len(qr.Rows) >= size { // There are probably more messages to be sent. mm.messagesPending = true } - if len(mm.receivers) == 0 { - // Almost never reachable because we just checked this. - return - } if len(qr.Rows) != 0 { // We've most likely added items. // Wake up the sender. @@ -880,7 +893,7 @@ func (mm *messageManager) GeneratePurgeQuery(timeCutoff int64) (string, map[stri } } -// BuildMessageRow builds a MessageRow for a db row. +// BuildMessageRow builds a MessageRow from a db row. func BuildMessageRow(row []sqltypes.Value) (*MessageRow, error) { mr := &MessageRow{Row: row[4:]} if !row[0].IsNull() { @@ -914,12 +927,6 @@ func BuildMessageRow(row []sqltypes.Value) (*MessageRow, error) { return mr, nil } -func (mm *messageManager) receiverCount() int { - mm.mu.Lock() - defer mm.mu.Unlock() - return len(mm.receivers) -} - func (mm *messageManager) readPending(ctx context.Context, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { query, err := mm.readByPriorityAndTimeNext.GenerateQuery(bindVars, nil) if err != nil { @@ -949,3 +956,15 @@ func (mm *messageManager) readPending(ctx context.Context, bindVars map[string]* } return qr, err } + +func (mm *messageManager) getReceiverCount() int { + mm.mu.Lock() + defer mm.mu.Unlock() + return len(mm.receivers) +} + +func (mm *messageManager) getLastPollPosition() *mysql.Position { + mm.cacheManagementMu.Lock() + defer mm.cacheManagementMu.Unlock() + return mm.lastPollPosition +} diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index 58d9e32c91c..4d76bece329 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -148,7 +148,7 @@ func TestReceiverCancel(t *testing.T) { for i := 0; i < 10; i++ { runtime.Gosched() time.Sleep(10 * time.Millisecond) - if mm.receiverCount() != 0 { + if len(mm.receivers) != 0 { continue } return @@ -503,9 +503,7 @@ func TestMessageManagerStreamerAndPoller(t *testing.T) { for { runtime.Gosched() time.Sleep(10 * time.Millisecond) - mm.streamMu.Lock() - pos := mm.lastPollPosition - mm.streamMu.Unlock() + pos := mm.getLastPollPosition() if pos != nil { break } diff --git a/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json b/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json index 00243d4b08b..b641a8f2d68 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json +++ b/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json @@ -319,14 +319,17 @@ "Name": "priority" }, { - "Name": "time_next" + "Name": "epoch" }, { - "Name": "epoch" + "Name": "time_next" }, { "Name": "time_acked" }, + { + "Name": "tenant_id" + }, { "Name": "message" } @@ -336,7 +339,6 @@ "Name": "PRIMARY", "Unique": true, "Columns": [ - "time_scheduled", "id" ], "Cardinality": [ @@ -347,8 +349,7 @@ } ], "PKColumns": [ - 0, - 1 + 0 ], "Type": 2 },