From f31eefa77103b7de815efa215a59109fce6f17ae Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 24 Apr 2022 12:07:59 -0400 Subject: [PATCH 01/16] Prevent deadlocks related to 0 receiver behavior Signed-off-by: Matt Lord --- .../tabletserver/messager/message_manager.go | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index b2ae445cff8..6f51bcf8a56 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -252,7 +252,9 @@ func newMessageManager(tsv TabletService, vs VStreamer, table *schema.Table, pos }}, } mm.readByPriorityAndTimeNext = sqlparser.BuildParsedQuery( - "select priority, time_next, epoch, time_acked, %s from %v where time_next < %a order by priority, time_next desc limit %a", + // There should be a poller_idx defined on (time_acked, priority, time_next desc) + // for this to be as effecient as possible + "select priority, time_next, epoch, time_acked, %s from %v where time_acked is null and time_next < %a order by priority, time_next desc limit %a", columnList, mm.name, ":time_next", ":max") mm.ackQuery = sqlparser.BuildParsedQuery( "update %v set time_acked = %a, time_next = null where id in %a and time_acked is null", @@ -749,14 +751,14 @@ func (mm *messageManager) processRowEvent(fields []*querypb.Field, rowEvent *bin } func (mm *messageManager) runPoller() { + mm.getExclusiveLock() + defer mm.releaseExclusiveLock() + // Fast-path. Skip all the work. - if mm.receiverCount() == 0 { + if len(mm.receivers) == 0 { return } - mm.streamMu.Lock() - defer mm.streamMu.Unlock() - ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), mm.pollerTicks.Interval()) defer func() { mm.tsv.LogError() @@ -773,9 +775,6 @@ func (mm *messageManager) runPoller() { return } - // Obtain mu lock to verify and preserve that len(receivers) != 0. - mm.mu.Lock() - defer mm.mu.Unlock() mm.messagesPending = false if len(qr.Rows) >= size { // There are probably more messages to be sent. @@ -949,3 +948,17 @@ func (mm *messageManager) readPending(ctx context.Context, bindVars map[string]* } return qr, err } + +// This grants the caller exclusive access to the message service. +// When this is needed for a function, you must get the mutexes in +// main mutex, stream mutex order to avoid deadlocks with other places +// that conditionally get the mutexes in this same order. +func (mm *messageManager) getExclusiveLock() { + mm.mu.Lock() + mm.streamMu.Lock() +} + +func (mm *messageManager) releaseExclusiveLock() { + mm.streamMu.Unlock() + mm.mu.Unlock() +} From 1d3cbdcbec9a7db8aed3ac94eb6210131a68ffe1 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 24 Apr 2022 17:56:57 -0400 Subject: [PATCH 02/16] Update test tables to use poller_idx Signed-off-by: Matt Lord --- go/test/endtoend/messaging/main_test.go | 6 ++---- go/test/endtoend/messaging/message_test.go | 16 +++++++--------- go/test/endtoend/vtgate/godriver/main_test.go | 6 +++--- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/go/test/endtoend/messaging/main_test.go b/go/test/endtoend/messaging/main_test.go index 168aea03858..632ddc77eac 100644 --- a/go/test/endtoend/messaging/main_test.go +++ b/go/test/endtoend/messaging/main_test.go @@ -45,8 +45,7 @@ var ( time_acked bigint, message varchar(128), primary key(id), - index next_idx(priority, time_next desc), - index ack_idx(time_acked) + index poller_idx (time_acked, priority, time_next desc), ) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` createUnshardedMessage = `create table unsharded_message( id bigint, @@ -56,8 +55,7 @@ var ( time_acked bigint, message varchar(128), primary key(id), - index next_idx(priority, time_next desc), - index ack_idx(time_acked) + index poller_idx (time_acked, priority, time_next desc), ) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` userVschema = `{ "sharded": true, diff --git a/go/test/endtoend/messaging/message_test.go b/go/test/endtoend/messaging/message_test.go index b9e59c02f2e..a3308c6a2e9 100644 --- a/go/test/endtoend/messaging/message_test.go +++ b/go/test/endtoend/messaging/message_test.go @@ -52,8 +52,7 @@ var createMessage = `create table vitess_message( time_acked bigint, message varchar(128), primary key(id), - index next_idx(priority, time_next desc), - index ack_idx(time_acked)) + index poller_idx (time_acked, priority, time_next desc), comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` func TestMessage(t *testing.T) { @@ -172,8 +171,7 @@ var createThreeColMessage = `create table vitess_message3( msg1 varchar(128), msg2 bigint, primary key(id), - index next_idx(priority, time_next desc), - index ack_idx(time_acked)) + index poller_idx (time_acked, priority, time_next desc), comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` func TestThreeColMessage(t *testing.T) { @@ -519,18 +517,18 @@ func assertClientCount(t *testing.T, expected int, vttablet *cluster.Vttablet) { } func parseDebugVars(t *testing.T, output interface{}, vttablet *cluster.Vttablet) { - debugVarUrl := fmt.Sprintf("http://%s:%d/debug/vars", vttablet.VttabletProcess.TabletHostname, vttablet.HTTPPort) - resp, err := http.Get(debugVarUrl) + debugVarURL := fmt.Sprintf("http://%s:%d/debug/vars", vttablet.VttabletProcess.TabletHostname, vttablet.HTTPPort) + resp, err := http.Get(debugVarURL) if err != nil { - t.Fatalf("failed to fetch %q: %v", debugVarUrl, err) + t.Fatalf("failed to fetch %q: %v", debugVarURL, err) } respByte, _ := io.ReadAll(resp.Body) if resp.StatusCode != 200 { - t.Fatalf("status code %d while fetching %q:\n%s", resp.StatusCode, debugVarUrl, respByte) + t.Fatalf("status code %d while fetching %q:\n%s", resp.StatusCode, debugVarURL, respByte) } if err := json.Unmarshal(respByte, output); err != nil { - t.Fatalf("failed to unmarshal JSON from %q: %v", debugVarUrl, err) + t.Fatalf("failed to unmarshal JSON from %q: %v", debugVarURL, err) } } diff --git a/go/test/endtoend/vtgate/godriver/main_test.go b/go/test/endtoend/vtgate/godriver/main_test.go index 3755d78d4f3..8eed5b3394a 100644 --- a/go/test/endtoend/vtgate/godriver/main_test.go +++ b/go/test/endtoend/vtgate/godriver/main_test.go @@ -44,15 +44,15 @@ var ( create table my_message( time_scheduled bigint, id bigint, - time_next bigint, + time_next bigint DEFAULT 0, epoch bigint, time_created bigint, time_acked bigint, message varchar(128), - priority tinyint NOT NULL DEFAULT '0', + priority tinyint NOT NULL DEFAULT 0, primary key(time_scheduled, id), unique index id_idx(id), - index next_idx(priority, time_next) + index poller_idx(time_acked, priority, time_next) ) comment 'vitess_message,vt_ack_wait=30,vt_purge_after=86400,vt_batch_size=10,vt_cache_size=10000,vt_poller_interval=30'; ` VSchema = ` From d177a21255e6123f3328c316cfbfa4b57766e26f Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 24 Apr 2022 20:17:55 -0400 Subject: [PATCH 03/16] Minor changes after mutex usage review in message manager + cache Signed-off-by: Matt Lord --- go/vt/vttablet/tabletserver/messager/message_manager.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index 6f51bcf8a56..54aa9a4ac95 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -469,9 +469,7 @@ func (mm *messageManager) rescanReceivers(start int) { // if successful. If the message is already present, // it still returns true. func (mm *messageManager) Add(mr *MessageRow) bool { - mm.mu.Lock() - defer mm.mu.Unlock() - if len(mm.receivers) == 0 { + if mm.receiverCount() == 0 { return false } // If cache is empty, we have to broadcast that we're not empty @@ -879,7 +877,7 @@ func (mm *messageManager) GeneratePurgeQuery(timeCutoff int64) (string, map[stri } } -// BuildMessageRow builds a MessageRow for a db row. +// BuildMessageRow builds a MessageRow from a db row. func BuildMessageRow(row []sqltypes.Value) (*MessageRow, error) { mr := &MessageRow{Row: row[4:]} if !row[0].IsNull() { From 742bb99dbb4920f375f80769a96ae7e750f06ac7 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 25 Apr 2022 11:36:04 -0400 Subject: [PATCH 04/16] Use atomics for receiver count and messages pending Signed-off-by: Matt Lord --- .../tabletserver/messager/message_manager.go | 53 +++++++++---------- .../messager/message_manager_test.go | 6 +-- 2 files changed, 29 insertions(+), 30 deletions(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index 54aa9a4ac95..da89bb34a11 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -183,11 +183,13 @@ type messageManager struct { isOpen bool // cond waits on curReceiver == -1 || cache.IsEmpty(): // No current receivers available or cache is empty. - cond sync.Cond - cache *cache - receivers []*receiverWithStatus - curReceiver int - messagesPending bool + cond sync.Cond + cache *cache + receivers []*receiverWithStatus + // Way to track the receiver count in a consistent way w/o locks + receiverCount sync2.AtomicInt64 + curReceiver int64 + messagesPending sync2.AtomicBool // streamMu keeps the cache and database consistent with each other. // Specifically: @@ -239,7 +241,7 @@ func newMessageManager(tsv TabletService, vs VStreamer, table *schema.Table, pos pollerTicks: timer.NewTimer(table.MessageInfo.PollInterval), purgeTicks: timer.NewTimer(table.MessageInfo.PollInterval), postponeSema: postponeSema, - messagesPending: true, + messagesPending: sync2.NewAtomicBool(true), } mm.cond.L = &mm.mu @@ -364,6 +366,7 @@ func (mm *messageManager) Close() { for _, rcvr := range mm.receivers { rcvr.receiver.cancel() } + mm.receiverCount.Set(0) mm.receivers = nil MessageStats.Set([]string{mm.name.String(), "ClientCount"}, 0) log.Infof("messageManager - clearing cache") @@ -403,11 +406,12 @@ func (mm *messageManager) Subscribe(ctx context.Context, send func(*sqltypes.Res withStatus := &receiverWithStatus{ receiver: receiver, } - if len(mm.receivers) == 0 { + if mm.receiverCount.Get() == 0 { mm.startVStream() } mm.receivers = append(mm.receivers, withStatus) - MessageStats.Set([]string{mm.name.String(), "ClientCount"}, int64(len(mm.receivers))) + mm.receiverCount.Add(1) + MessageStats.Set([]string{mm.name.String(), "ClientCount"}, mm.receiverCount.Get()) if mm.curReceiver == -1 { mm.rescanReceivers(-1) } @@ -428,16 +432,17 @@ func (mm *messageManager) unsubscribe(receiver *messageReceiver) { continue } // Delete the item at current position. - n := len(mm.receivers) + n := mm.receiverCount.Get() copy(mm.receivers[i:n-1], mm.receivers[i+1:n]) mm.receivers = mm.receivers[0 : n-1] - MessageStats.Set([]string{mm.name.String(), "ClientCount"}, int64(len(mm.receivers))) + mm.receiverCount.Add(-1) + MessageStats.Set([]string{mm.name.String(), "ClientCount"}, mm.receiverCount.Get()) break } // curReceiver is obsolete. Recompute. mm.rescanReceivers(-1) // If there are no receivers. Shut down the cache. - if len(mm.receivers) == 0 { + if mm.receiverCount.Get() == 0 { mm.stopVStream() mm.cache.Clear() } @@ -449,10 +454,10 @@ func (mm *messageManager) unsubscribe(receiver *messageReceiver) { // was previously -1, it broadcasts. If none was found, // curReceiver is set to -1. If there's no starting point, // it must be specified as -1. -func (mm *messageManager) rescanReceivers(start int) { +func (mm *messageManager) rescanReceivers(start int64) { cur := start for range mm.receivers { - cur = (cur + 1) % len(mm.receivers) + cur = (cur + 1) % mm.receiverCount.Get() if !mm.receivers[cur].busy { if mm.curReceiver == -1 { mm.cond.Broadcast() @@ -469,7 +474,7 @@ func (mm *messageManager) rescanReceivers(start int) { // if successful. If the message is already present, // it still returns true. func (mm *messageManager) Add(mr *MessageRow) bool { - if mm.receiverCount() == 0 { + if mm.receiverCount.Get() == 0 { return false } // If cache is empty, we have to broadcast that we're not empty @@ -479,7 +484,7 @@ func (mm *messageManager) Add(mr *MessageRow) bool { } if !mm.cache.Add(mr) { // Cache is full. Enter "messagesPending" mode. - mm.messagesPending = true + mm.messagesPending.Set(true) return false } return true @@ -510,7 +515,7 @@ func (mm *messageManager) runSend() { // If cache became empty, there are messages pending, and there are subscribed // receivers, we have to trigger the poller to fetch more. - if mm.cache.IsEmpty() && mm.messagesPending && len(mm.receivers) != 0 { + if mm.cache.IsEmpty() && mm.messagesPending.Get() && mm.receiverCount.Get() != 0 { // Do this as a separate goroutine. Otherwise, this could cause // the following deadlock: // 1. runSend obtains a lock @@ -753,7 +758,7 @@ func (mm *messageManager) runPoller() { defer mm.releaseExclusiveLock() // Fast-path. Skip all the work. - if len(mm.receivers) == 0 { + if mm.receiverCount.Get() == 0 { return } @@ -773,12 +778,12 @@ func (mm *messageManager) runPoller() { return } - mm.messagesPending = false + mm.messagesPending.Set(false) if len(qr.Rows) >= size { // There are probably more messages to be sent. - mm.messagesPending = true + mm.messagesPending.Set(true) } - if len(mm.receivers) == 0 { + if mm.receiverCount.Get() == 0 { // Almost never reachable because we just checked this. return } @@ -795,7 +800,7 @@ func (mm *messageManager) runPoller() { continue } if !mm.cache.Add(mr) { - mm.messagesPending = true + mm.messagesPending.Set(true) return } } @@ -911,12 +916,6 @@ func BuildMessageRow(row []sqltypes.Value) (*MessageRow, error) { return mr, nil } -func (mm *messageManager) receiverCount() int { - mm.mu.Lock() - defer mm.mu.Unlock() - return len(mm.receivers) -} - func (mm *messageManager) readPending(ctx context.Context, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { query, err := mm.readByPriorityAndTimeNext.GenerateQuery(bindVars, nil) if err != nil { diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index 58d9e32c91c..3e911d83fd6 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -148,12 +148,12 @@ func TestReceiverCancel(t *testing.T) { for i := 0; i < 10; i++ { runtime.Gosched() time.Sleep(10 * time.Millisecond) - if mm.receiverCount() != 0 { + if mm.receiverCount.Get() != 0 { continue } return } - t.Errorf("receivers were not cleared: %d", len(mm.receivers)) + t.Errorf("receivers were not cleared: %d", mm.receiverCount.Get()) } func TestMessageManagerState(t *testing.T) { @@ -281,7 +281,7 @@ func TestMessageManagerSend(t *testing.T) { runtime.Gosched() time.Sleep(10 * time.Millisecond) mm.mu.Lock() - if len(mm.receivers) != 1 { + if mm.receiverCount.Get() != 1 { mm.mu.Unlock() continue } From 910eaeb800f606d63818fce241242caa98cf3af1 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 25 Apr 2022 11:48:12 -0400 Subject: [PATCH 05/16] Don't take exclusive lock when in fast path Signed-off-by: Matt Lord --- .../tabletserver/messager/message_manager.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index da89bb34a11..bf64d587fb3 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -754,14 +754,14 @@ func (mm *messageManager) processRowEvent(fields []*querypb.Field, rowEvent *bin } func (mm *messageManager) runPoller() { - mm.getExclusiveLock() - defer mm.releaseExclusiveLock() - // Fast-path. Skip all the work. if mm.receiverCount.Get() == 0 { return } + mm.getExclusiveLock() + defer mm.releaseExclusiveLock() + ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), mm.pollerTicks.Interval()) defer func() { mm.tsv.LogError() @@ -773,6 +773,7 @@ func (mm *messageManager) runPoller() { "time_next": sqltypes.Int64BindVariable(time.Now().UnixNano()), "max": sqltypes.Int64BindVariable(int64(size)), } + qr, err := mm.readPending(ctx, bindVars) if err != nil { return @@ -947,9 +948,8 @@ func (mm *messageManager) readPending(ctx context.Context, bindVars map[string]* } // This grants the caller exclusive access to the message service. -// When this is needed for a function, you must get the mutexes in -// main mutex, stream mutex order to avoid deadlocks with other places -// that conditionally get the mutexes in this same order. +// When this is needed for a function, you can use this to +// enforce consistent locking order. func (mm *messageManager) getExclusiveLock() { mm.mu.Lock() mm.streamMu.Lock() From 73af9147a11ccfb02dcf340c884be97ec572192d Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 25 Apr 2022 14:33:25 -0400 Subject: [PATCH 06/16] Update tests to use the new recommended message table structure See: https://github.com/vitessio/website/pull/1015 Signed-off-by: Matt Lord --- go/test/endtoend/messaging/main_test.go | 50 +++++++++++----- go/test/endtoend/messaging/message_test.go | 59 +++++++++++++------ go/test/endtoend/vtgate/godriver/main_test.go | 30 ++++++---- 3 files changed, 94 insertions(+), 45 deletions(-) diff --git a/go/test/endtoend/messaging/main_test.go b/go/test/endtoend/messaging/main_test.go index 632ddc77eac..d60b4468d32 100644 --- a/go/test/endtoend/messaging/main_test.go +++ b/go/test/endtoend/messaging/main_test.go @@ -38,25 +38,43 @@ var ( userKeyspace = "user" lookupKeyspace = "lookup" createShardedMessage = `create table sharded_message( - id bigint, - priority bigint default 0, - time_next bigint default 0, - epoch bigint, - time_acked bigint, - message varchar(128), + # required columns + id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence', + priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first', + epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling', + time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set', + time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set', + + # add as many custom fields here as required + # optional - these are suggestions + tenant_id bigint, + message json, + + # required indexes primary key(id), - index poller_idx (time_acked, priority, time_next desc), - ) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` + index poller_idx(time_acked, priority, time_next desc) + + # add any secondary indexes or foreign keys - no restrictions + ) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` createUnshardedMessage = `create table unsharded_message( - id bigint, - priority bigint default 0, - time_next bigint default 0, - epoch bigint, - time_acked bigint, - message varchar(128), + # required columns + id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence', + priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first', + epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling', + time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set', + time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set', + + # add as many custom fields here as required + # optional - these are suggestions + tenant_id bigint, + message json, + + # required indexes primary key(id), - index poller_idx (time_acked, priority, time_next desc), - ) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` + index poller_idx(time_acked, priority, time_next desc) + + # add any secondary indexes or foreign keys - no restrictions + ) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` userVschema = `{ "sharded": true, "vindexes": { diff --git a/go/test/endtoend/messaging/message_test.go b/go/test/endtoend/messaging/message_test.go index a3308c6a2e9..5e81a5a6729 100644 --- a/go/test/endtoend/messaging/message_test.go +++ b/go/test/endtoend/messaging/message_test.go @@ -44,16 +44,27 @@ import ( "vitess.io/vitess/go/vt/vtgate/vtgateconn" ) -var createMessage = `create table vitess_message( - id bigint, - priority bigint default 0, - time_next bigint default 0, - epoch bigint, - time_acked bigint, - message varchar(128), +var createMessage = ` +create table my_message( + # required columns + id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence', + priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first', + epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling', + time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set', + time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set', + + # add as many custom fields here as required + # optional - these are suggestions + tenant_id bigint, + message json, + + # required indexes primary key(id), - index poller_idx (time_acked, priority, time_next desc), -comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` + index poller_idx(time_acked, priority, time_next desc) + + # add any secondary indexes or foreign keys - no restrictions +) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1' +` func TestMessage(t *testing.T) { ctx := context.Background() @@ -162,17 +173,31 @@ func TestMessage(t *testing.T) { assert.Equal(t, 0, len(qr.Rows)) } -var createThreeColMessage = `create table vitess_message3( - id bigint, - priority bigint default 0, - time_next bigint default 0, - epoch bigint, - time_acked bigint, +var createThreeColMessage = ` +create table my_message( + # required columns + id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence', + priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first', + epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling', + time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set', + time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set', + + # add as many custom fields here as required + # optional - these are suggestions + tenant_id bigint, + message json, + + # custom to this test msg1 varchar(128), msg2 bigint, + + # required indexes primary key(id), - index poller_idx (time_acked, priority, time_next desc), -comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` + index poller_idx(time_acked, priority, time_next desc) + + # add any secondary indexes or foreign keys - no restrictions +) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1' +` func TestThreeColMessage(t *testing.T) { ctx := context.Background() diff --git a/go/test/endtoend/vtgate/godriver/main_test.go b/go/test/endtoend/vtgate/godriver/main_test.go index 8eed5b3394a..f00244a40cb 100644 --- a/go/test/endtoend/vtgate/godriver/main_test.go +++ b/go/test/endtoend/vtgate/godriver/main_test.go @@ -42,18 +42,24 @@ var ( KeyspaceName = "customer" SchemaSQL = ` create table my_message( - time_scheduled bigint, - id bigint, - time_next bigint DEFAULT 0, - epoch bigint, - time_created bigint, - time_acked bigint, - message varchar(128), - priority tinyint NOT NULL DEFAULT 0, - primary key(time_scheduled, id), - unique index id_idx(id), - index poller_idx(time_acked, priority, time_next) -) comment 'vitess_message,vt_ack_wait=30,vt_purge_after=86400,vt_batch_size=10,vt_cache_size=10000,vt_poller_interval=30'; + # required columns + id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence', + priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first', + epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling', + time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set', + time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set', + + # add as many custom fields here as required + # optional - these are suggestions + tenant_id bigint, + message json, + + # required indexes + primary key(id), + index poller_idx(time_acked, priority, time_next desc) + + # add any secondary indexes or foreign keys - no restrictions +) comment 'vitess_message,vt_min_backoff=30,vt_max_backoff=3600,vt_ack_wait=30,vt_purge_after=86400,vt_batch_size=10,vt_cache_size=10000,vt_poller_interval=30' ` VSchema = ` { From c831896c41c689b65f83c15c1dffa522e31b52ae Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 25 Apr 2022 14:54:42 -0400 Subject: [PATCH 07/16] Correct tests Signed-off-by: Matt Lord --- go/test/endtoend/messaging/message_test.go | 4 ++-- go/test/endtoend/vtgate/godriver/main_test.go | 4 +--- .../tabletserver/planbuilder/testdata/schema_test.json | 6 ++---- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/go/test/endtoend/messaging/message_test.go b/go/test/endtoend/messaging/message_test.go index 5e81a5a6729..b30a57450eb 100644 --- a/go/test/endtoend/messaging/message_test.go +++ b/go/test/endtoend/messaging/message_test.go @@ -45,7 +45,7 @@ import ( ) var createMessage = ` -create table my_message( +create table vitess_message( # required columns id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence', priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first', @@ -174,7 +174,7 @@ func TestMessage(t *testing.T) { } var createThreeColMessage = ` -create table my_message( +create table vitess_message3( # required columns id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence', priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first', diff --git a/go/test/endtoend/vtgate/godriver/main_test.go b/go/test/endtoend/vtgate/godriver/main_test.go index f00244a40cb..34977f9716c 100644 --- a/go/test/endtoend/vtgate/godriver/main_test.go +++ b/go/test/endtoend/vtgate/godriver/main_test.go @@ -17,7 +17,6 @@ limitations under the License. package godriver import ( - "database/sql" "flag" "os" "strconv" @@ -142,8 +141,7 @@ func TestStreamMessaging(t *testing.T) { defer db.Close() // Exec not allowed in streaming - timenow := time.Now().Add(time.Second * 60).UnixNano() - _, err = db.Exec("insert into my_message(id, message, time_scheduled) values(1, 'hello world', :curr_time)", sql.Named("curr_time", timenow)) + _, err = db.Exec("insert into my_message(id, message) values(1, 'hello world')") require.NoError(t, err) // for streaming messages diff --git a/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json b/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json index 00243d4b08b..7cfd1701d79 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json +++ b/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json @@ -336,7 +336,6 @@ "Name": "PRIMARY", "Unique": true, "Columns": [ - "time_scheduled", "id" ], "Cardinality": [ @@ -347,10 +346,9 @@ } ], "PKColumns": [ - 0, - 1 + 0 ], - "Type": 2 + "Type": 0 }, { "Name": "dual", From 9df3a39362c642bfb92e54ae81013e6408ea0d3a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 25 Apr 2022 18:18:35 -0400 Subject: [PATCH 08/16] Update e2e test to use new recommended table structure Signed-off-by: Matt Lord --- go/test/endtoend/messaging/message_test.go | 49 +++++++++++++++------- 1 file changed, 35 insertions(+), 14 deletions(-) diff --git a/go/test/endtoend/messaging/message_test.go b/go/test/endtoend/messaging/message_test.go index b30a57450eb..1f0156757c2 100644 --- a/go/test/endtoend/messaging/message_test.go +++ b/go/test/endtoend/messaging/message_test.go @@ -44,6 +44,9 @@ import ( "vitess.io/vitess/go/vt/vtgate/vtgateconn" ) +var testMessage = "{\"message\":\"hello world\"}" +var testShardedMessagef = "{\"message\": \"hello world\", \"id\": %d}" + var createMessage = ` create table vitess_message( # required columns @@ -94,9 +97,12 @@ func TestMessage(t *testing.T) { wantFields := []*querypb.Field{{ Name: "id", Type: sqltypes.Int64, + }, { + Name: "tenant_id", + Type: sqltypes.Int64, }, { Name: "message", - Type: sqltypes.VarChar, + Type: sqltypes.TypeJSON, }} gotFields, err := streamConn.Fields() for i, field := range gotFields { @@ -109,7 +115,7 @@ func TestMessage(t *testing.T) { require.NoError(t, err) cmp.MustMatch(t, wantFields, gotFields) - utils.Exec(t, conn, "insert into vitess_message(id, message) values(1, 'hello world')") + utils.Exec(t, conn, fmt.Sprintf("insert into vitess_message(id, tenant_id, message) values(1, 1, '%s')", testMessage)) // account for jitter in timings, maxJitter uses the current hardcoded value for jitter in message_manager.go jitter := int64(0) @@ -122,7 +128,8 @@ func TestMessage(t *testing.T) { want := []sqltypes.Value{ sqltypes.NewInt64(1), - sqltypes.NewVarChar("hello world"), + sqltypes.NewInt64(1), + sqltypes.TestValue(sqltypes.TypeJSON, testMessage), } cmp.MustMatch(t, want, got) @@ -225,6 +232,12 @@ func TestThreeColMessage(t *testing.T) { wantFields := []*querypb.Field{{ Name: "id", Type: sqltypes.Int64, + }, { + Name: "tenant_id", + Type: sqltypes.Int64, + }, { + Name: "message", + Type: sqltypes.TypeJSON, }, { Name: "msg1", Type: sqltypes.VarChar, @@ -243,12 +256,14 @@ func TestThreeColMessage(t *testing.T) { require.NoError(t, err) cmp.MustMatch(t, wantFields, gotFields) - utils.Exec(t, conn, "insert into vitess_message3(id, msg1, msg2) values(1, 'hello world', 3)") + utils.Exec(t, conn, fmt.Sprintf("insert into vitess_message3(id, tenant_id, message, msg1, msg2) values(1, 3, '%s', 'hello world', 3)", testMessage)) got, err := streamConn.FetchNext(nil) require.NoError(t, err) want := []sqltypes.Value{ sqltypes.NewInt64(1), + sqltypes.NewInt64(3), + sqltypes.TestValue(sqltypes.TypeJSON, testMessage), sqltypes.NewVarChar("hello world"), sqltypes.NewInt64(3), } @@ -315,7 +330,8 @@ func TestReparenting(t *testing.T) { assertClientCount(t, 1, shard0Replica) assertClientCount(t, 1, shard1Primary) session := stream.Session("@primary", nil) - cluster.ExecuteQueriesUsingVtgate(t, session, "insert into sharded_message (id, message) values (3,'hello world 3')") + msg3 := fmt.Sprintf(testShardedMessagef, 3) + cluster.ExecuteQueriesUsingVtgate(t, session, fmt.Sprintf("insert into sharded_message (id, tenant_id, message) values (3,3,'%s')", msg3)) // validate that we have received inserted message stream.Next() @@ -376,8 +392,10 @@ func TestConnection(t *testing.T) { // in message stream session := stream.Session("@primary", nil) // insert data in primary - cluster.ExecuteQueriesUsingVtgate(t, session, "insert into sharded_message (id, message) values (2,'hello world 2')") - cluster.ExecuteQueriesUsingVtgate(t, session, "insert into sharded_message (id, message) values (5,'hello world 5')") + msg2 := fmt.Sprintf(testShardedMessagef, 2) + msg5 := fmt.Sprintf(testShardedMessagef, 5) + cluster.ExecuteQueriesUsingVtgate(t, session, fmt.Sprintf("insert into sharded_message (id, tenant_id, message) values (2,2,'%s')", msg2)) + cluster.ExecuteQueriesUsingVtgate(t, session, fmt.Sprintf("insert into sharded_message (id, tenant_id, message) values (5,5,'%s')", msg5)) // validate in msg stream _, err = stream.Next() require.Nil(t, err) @@ -403,15 +421,18 @@ func testMessaging(t *testing.T, name, ks string) { defer stream.Close() session := stream.Session("@primary", nil) - cluster.ExecuteQueriesUsingVtgate(t, session, "insert into "+name+" (id, message) values (4,'hello world 4')") - cluster.ExecuteQueriesUsingVtgate(t, session, "insert into "+name+" (id, message) values (1,'hello world 1')") + msg4 := fmt.Sprintf(testShardedMessagef, 4) + msg1 := fmt.Sprintf(testShardedMessagef, 1) + cluster.ExecuteQueriesUsingVtgate(t, session, fmt.Sprintf("insert into "+name+" (id, tenant_id, message) values (4,4,'%s')", msg4)) + cluster.ExecuteQueriesUsingVtgate(t, session, fmt.Sprintf("insert into "+name+" (id, tenant_id, message) values (1,1,'%s')", msg1)) // validate fields res, err := stream.MessageStream(ks, "", nil, name) require.Nil(t, err) - require.Equal(t, 2, len(res.Fields)) + require.Equal(t, 3, len(res.Fields)) validateField(t, res.Fields[0], "id", query.Type_INT64) - validateField(t, res.Fields[1], "message", query.Type_VARCHAR) + validateField(t, res.Fields[1], "tenant_id", query.Type_INT64) + validateField(t, res.Fields[2], "message", query.Type_JSON) // validate recieved msgs resMap := make(map[string]string) @@ -429,8 +450,8 @@ func testMessaging(t *testing.T, name, ks string) { } } - assert.Equal(t, "hello world 1", resMap["1"]) - assert.Equal(t, "hello world 4", resMap["4"]) + assert.Equal(t, "1", resMap["1"]) + assert.Equal(t, "4", resMap["4"]) resMap = make(map[string]string) stream.ClearMem() @@ -445,7 +466,7 @@ func testMessaging(t *testing.T, name, ks string) { } } - assert.Equal(t, "hello world 1", resMap["1"]) + assert.Equal(t, "1", resMap["1"]) // validate message ack with 1 and 4, only 1 should be ack qr, err = session.Execute(context.Background(), "update "+name+" set time_acked = 1, time_next = null where id in (1, 4) and time_acked is null", nil) From 1047836b784ea0d2e0209a72e3570dfed3d01a28 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 25 Apr 2022 18:30:37 -0400 Subject: [PATCH 09/16] Fix TestMessageStreamingPlan test Signed-off-by: Matt Lord --- .../tabletserver/planbuilder/testdata/schema_test.json | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json b/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json index 7cfd1701d79..b641a8f2d68 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json +++ b/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json @@ -319,14 +319,17 @@ "Name": "priority" }, { - "Name": "time_next" + "Name": "epoch" }, { - "Name": "epoch" + "Name": "time_next" }, { "Name": "time_acked" }, + { + "Name": "tenant_id" + }, { "Name": "message" } @@ -348,7 +351,7 @@ "PKColumns": [ 0 ], - "Type": 0 + "Type": 2 }, { "Name": "dual", From 45c91527d338dab34165f23c60b296b628fa0cf6 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 25 Apr 2022 18:41:02 -0400 Subject: [PATCH 10/16] Fix godriver/TestStreamMessaging test Signed-off-by: Matt Lord --- go/test/endtoend/vtgate/godriver/main_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/vtgate/godriver/main_test.go b/go/test/endtoend/vtgate/godriver/main_test.go index 34977f9716c..bbc5536a3fa 100644 --- a/go/test/endtoend/vtgate/godriver/main_test.go +++ b/go/test/endtoend/vtgate/godriver/main_test.go @@ -18,6 +18,7 @@ package godriver import ( "flag" + "fmt" "os" "strconv" "testing" @@ -80,6 +81,8 @@ create table my_message( } } ` + + testMessage = "{\"message\":\"hello world\"}" ) func TestMain(m *testing.M) { @@ -141,7 +144,7 @@ func TestStreamMessaging(t *testing.T) { defer db.Close() // Exec not allowed in streaming - _, err = db.Exec("insert into my_message(id, message) values(1, 'hello world')") + _, err = db.Exec(fmt.Sprintf("insert into my_message(id, message) values(1, '%s')", testMessage)) require.NoError(t, err) // for streaming messages From 4fb8a83705af7ca292b6f393d13ba77c6902140c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 25 Apr 2022 19:50:13 -0400 Subject: [PATCH 11/16] Split streamMu into streamProcessingMu and lastPollPositionMu Signed-off-by: Matt Lord --- .../tabletserver/messager/message_manager.go | 114 +++++++++--------- .../messager/message_manager_test.go | 10 +- 2 files changed, 63 insertions(+), 61 deletions(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index bf64d587fb3..b784fb23961 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -183,16 +183,24 @@ type messageManager struct { isOpen bool // cond waits on curReceiver == -1 || cache.IsEmpty(): // No current receivers available or cache is empty. - cond sync.Cond - cache *cache - receivers []*receiverWithStatus - // Way to track the receiver count in a consistent way w/o locks - receiverCount sync2.AtomicInt64 - curReceiver int64 - messagesPending sync2.AtomicBool - - // streamMu keeps the cache and database consistent with each other. - // Specifically: + cond sync.Cond + cache *cache + receivers []*receiverWithStatus + curReceiver int + messagesPending bool + + // lastPostionMu protects the lastPollPosition variable which is the main + // point of coordination between the poller and the binlog streamer to + // ensure that we are not re-processing older events. + lastPollPositionMu sync.Mutex + lastPollPosition *mysql.Position + + // streamProcessingMu keeps the cache and database consistent with + // each other by ensuring that only one of the streams is processing + // changes at a time. The Poller uses a results streamer to pull directly + // from the message table and the message manager uses a binlog streamer + // to process change events. This mutex ensures that only one of them are + // updating the cache at any one time. // It prevents items from being removed from cache while the poller // reads from the db and adds items to it. Otherwise, the poller // might add an older snapshot of a row that was just postponed. @@ -201,12 +209,11 @@ type messageManager struct { // lastPollPosition must be ignored by the vstream. It consequently // also blocks vstream from updating the cache while the poller is // active. - streamMu sync.Mutex + streamProcessingMu sync.Mutex // streamCancel is set when a vstream is running, and is reset - // to nil after a cancel. This allows for startVStream and stopVstream + // to nil after a cancel. This allows for startVStream and stopVStream // to be idempotent. - streamCancel func() - lastPollPosition *mysql.Position + streamCancel func() // wg is for ensuring all running goroutines have returned // before we can close the manager. You need to Add before @@ -241,7 +248,7 @@ func newMessageManager(tsv TabletService, vs VStreamer, table *schema.Table, pos pollerTicks: timer.NewTimer(table.MessageInfo.PollInterval), purgeTicks: timer.NewTimer(table.MessageInfo.PollInterval), postponeSema: postponeSema, - messagesPending: sync2.NewAtomicBool(true), + messagesPending: true, } mm.cond.L = &mm.mu @@ -366,7 +373,6 @@ func (mm *messageManager) Close() { for _, rcvr := range mm.receivers { rcvr.receiver.cancel() } - mm.receiverCount.Set(0) mm.receivers = nil MessageStats.Set([]string{mm.name.String(), "ClientCount"}, 0) log.Infof("messageManager - clearing cache") @@ -406,12 +412,11 @@ func (mm *messageManager) Subscribe(ctx context.Context, send func(*sqltypes.Res withStatus := &receiverWithStatus{ receiver: receiver, } - if mm.receiverCount.Get() == 0 { + if len(mm.receivers) == 0 { mm.startVStream() } mm.receivers = append(mm.receivers, withStatus) - mm.receiverCount.Add(1) - MessageStats.Set([]string{mm.name.String(), "ClientCount"}, mm.receiverCount.Get()) + MessageStats.Set([]string{mm.name.String(), "ClientCount"}, int64(len(mm.receivers))) if mm.curReceiver == -1 { mm.rescanReceivers(-1) } @@ -432,17 +437,16 @@ func (mm *messageManager) unsubscribe(receiver *messageReceiver) { continue } // Delete the item at current position. - n := mm.receiverCount.Get() + n := len(mm.receivers) copy(mm.receivers[i:n-1], mm.receivers[i+1:n]) mm.receivers = mm.receivers[0 : n-1] - mm.receiverCount.Add(-1) - MessageStats.Set([]string{mm.name.String(), "ClientCount"}, mm.receiverCount.Get()) + MessageStats.Set([]string{mm.name.String(), "ClientCount"}, int64(len(mm.receivers))) break } // curReceiver is obsolete. Recompute. mm.rescanReceivers(-1) // If there are no receivers. Shut down the cache. - if mm.receiverCount.Get() == 0 { + if len(mm.receivers) == 0 { mm.stopVStream() mm.cache.Clear() } @@ -454,10 +458,10 @@ func (mm *messageManager) unsubscribe(receiver *messageReceiver) { // was previously -1, it broadcasts. If none was found, // curReceiver is set to -1. If there's no starting point, // it must be specified as -1. -func (mm *messageManager) rescanReceivers(start int64) { +func (mm *messageManager) rescanReceivers(start int) { cur := start for range mm.receivers { - cur = (cur + 1) % mm.receiverCount.Get() + cur = (cur + 1) % len(mm.receivers) if !mm.receivers[cur].busy { if mm.curReceiver == -1 { mm.cond.Broadcast() @@ -474,7 +478,7 @@ func (mm *messageManager) rescanReceivers(start int64) { // if successful. If the message is already present, // it still returns true. func (mm *messageManager) Add(mr *MessageRow) bool { - if mm.receiverCount.Get() == 0 { + if mm.getReceiverCount() == 0 { return false } // If cache is empty, we have to broadcast that we're not empty @@ -484,7 +488,7 @@ func (mm *messageManager) Add(mr *MessageRow) bool { } if !mm.cache.Add(mr) { // Cache is full. Enter "messagesPending" mode. - mm.messagesPending.Set(true) + mm.messagesPending = true return false } return true @@ -515,7 +519,7 @@ func (mm *messageManager) runSend() { // If cache became empty, there are messages pending, and there are subscribed // receivers, we have to trigger the poller to fetch more. - if mm.cache.IsEmpty() && mm.messagesPending.Get() && mm.receiverCount.Get() != 0 { + if mm.cache.IsEmpty() && mm.messagesPending && len(mm.receivers) != 0 { // Do this as a separate goroutine. Otherwise, this could cause // the following deadlock: // 1. runSend obtains a lock @@ -575,12 +579,12 @@ func (mm *messageManager) send(receiver *receiverWithStatus, qr *sqltypes.Result } defer func() { - // Hold streamMu to prevent the ids from being discarded + // Hold streamProcessingMu to prevent the ids from being discarded // if poller is active. Otherwise, it could have read a // snapshot of a row before the postponement and requeue // the message. - mm.streamMu.Lock() - defer mm.streamMu.Unlock() + mm.streamProcessingMu.Lock() + defer mm.streamProcessingMu.Unlock() mm.cache.Discard(ids) }() @@ -621,8 +625,6 @@ func (mm *messageManager) postpone(tsv TabletService, ackWaitTime time.Duration, } func (mm *messageManager) startVStream() { - mm.streamMu.Lock() - defer mm.streamMu.Unlock() if mm.streamCancel != nil { return } @@ -632,10 +634,6 @@ func (mm *messageManager) startVStream() { } func (mm *messageManager) stopVStream() { - log.Infof("messageManager - stopVStream called. Acquiring streamMu lock") - mm.streamMu.Lock() - log.Infof("messageManager - acquired streamMu lock") - defer mm.streamMu.Unlock() log.Infof("messageManager - calling stream cancel") if mm.streamCancel != nil { mm.streamCancel() @@ -670,8 +668,8 @@ func (mm *messageManager) runOneVStream(ctx context.Context) error { var fields []*querypb.Field err := mm.vs.Stream(ctx, "current", nil, mm.vsFilter, func(events []*binlogdatapb.VEvent) error { - mm.streamMu.Lock() - defer mm.streamMu.Unlock() + mm.streamProcessingMu.Lock() + defer mm.streamProcessingMu.Unlock() select { case <-ctx.Done(): @@ -680,6 +678,8 @@ func (mm *messageManager) runOneVStream(ctx context.Context) error { } mustSkip := func() (bool, error) { + mm.lastPollPositionMu.Lock() + defer mm.lastPollPositionMu.Unlock() if mm.lastPollPosition == nil { return false, nil } @@ -754,14 +754,14 @@ func (mm *messageManager) processRowEvent(fields []*querypb.Field, rowEvent *bin } func (mm *messageManager) runPoller() { + mm.mu.Lock() + defer mm.mu.Unlock() + // Fast-path. Skip all the work. - if mm.receiverCount.Get() == 0 { + if len(mm.receivers) == 0 { return } - mm.getExclusiveLock() - defer mm.releaseExclusiveLock() - ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), mm.pollerTicks.Interval()) defer func() { mm.tsv.LogError() @@ -774,17 +774,20 @@ func (mm *messageManager) runPoller() { "max": sqltypes.Int64BindVariable(int64(size)), } + mm.streamProcessingMu.Lock() + defer mm.streamProcessingMu.Unlock() + qr, err := mm.readPending(ctx, bindVars) if err != nil { return } - mm.messagesPending.Set(false) + mm.messagesPending = false if len(qr.Rows) >= size { // There are probably more messages to be sent. - mm.messagesPending.Set(true) + mm.messagesPending = true } - if mm.receiverCount.Get() == 0 { + if len(mm.receivers) == 0 { // Almost never reachable because we just checked this. return } @@ -801,7 +804,7 @@ func (mm *messageManager) runPoller() { continue } if !mm.cache.Add(mr) { - mm.messagesPending.Set(true) + mm.messagesPending = true return } } @@ -926,6 +929,8 @@ func (mm *messageManager) readPending(ctx context.Context, bindVars map[string]* } qr := &sqltypes.Result{} err = mm.vs.StreamResults(ctx, query, func(response *binlogdatapb.VStreamResultsResponse) error { + mm.lastPollPositionMu.Lock() + defer mm.lastPollPositionMu.Unlock() if response.Fields != nil { qr.Fields = response.Fields } @@ -947,15 +952,14 @@ func (mm *messageManager) readPending(ctx context.Context, bindVars map[string]* return qr, err } -// This grants the caller exclusive access to the message service. -// When this is needed for a function, you can use this to -// enforce consistent locking order. -func (mm *messageManager) getExclusiveLock() { +func (mm *messageManager) getReceiverCount() int { mm.mu.Lock() - mm.streamMu.Lock() + defer mm.mu.Unlock() + return len(mm.receivers) } -func (mm *messageManager) releaseExclusiveLock() { - mm.streamMu.Unlock() - mm.mu.Unlock() +func (mm *messageManager) getLastPollPosition() *mysql.Position { + mm.lastPollPositionMu.Lock() + defer mm.lastPollPositionMu.Unlock() + return mm.lastPollPosition } diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index 3e911d83fd6..4d76bece329 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -148,12 +148,12 @@ func TestReceiverCancel(t *testing.T) { for i := 0; i < 10; i++ { runtime.Gosched() time.Sleep(10 * time.Millisecond) - if mm.receiverCount.Get() != 0 { + if len(mm.receivers) != 0 { continue } return } - t.Errorf("receivers were not cleared: %d", mm.receiverCount.Get()) + t.Errorf("receivers were not cleared: %d", len(mm.receivers)) } func TestMessageManagerState(t *testing.T) { @@ -281,7 +281,7 @@ func TestMessageManagerSend(t *testing.T) { runtime.Gosched() time.Sleep(10 * time.Millisecond) mm.mu.Lock() - if mm.receiverCount.Get() != 1 { + if len(mm.receivers) != 1 { mm.mu.Unlock() continue } @@ -503,9 +503,7 @@ func TestMessageManagerStreamerAndPoller(t *testing.T) { for { runtime.Gosched() time.Sleep(10 * time.Millisecond) - mm.streamMu.Lock() - pos := mm.lastPollPosition - mm.streamMu.Unlock() + pos := mm.getLastPollPosition() if pos != nil { break } From 5bbfaf7f2a067680d1440dcb3abf145822b9a37d Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 25 Apr 2022 21:38:35 -0400 Subject: [PATCH 12/16] Poller cannot take main lock w/o having X stream processing lock Signed-off-by: Matt Lord --- go/vt/vttablet/tabletserver/messager/message_manager.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index b784fb23961..0daa051f860 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -189,7 +189,7 @@ type messageManager struct { curReceiver int messagesPending bool - // lastPostionMu protects the lastPollPosition variable which is the main + // lastPollPostionMu protects the lastPollPosition variable which is the main // point of coordination between the poller and the binlog streamer to // ensure that we are not re-processing older events. lastPollPositionMu sync.Mutex @@ -754,6 +754,8 @@ func (mm *messageManager) processRowEvent(fields []*querypb.Field, rowEvent *bin } func (mm *messageManager) runPoller() { + mm.streamProcessingMu.Lock() + defer mm.streamProcessingMu.Unlock() mm.mu.Lock() defer mm.mu.Unlock() @@ -774,9 +776,6 @@ func (mm *messageManager) runPoller() { "max": sqltypes.Int64BindVariable(int64(size)), } - mm.streamProcessingMu.Lock() - defer mm.streamProcessingMu.Unlock() - qr, err := mm.readPending(ctx, bindVars) if err != nil { return From 9fa605d2a15eece55dd8846dd7f77642a1c9da98 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 25 Apr 2022 23:02:08 -0400 Subject: [PATCH 13/16] Improve the comments a bit Signed-off-by: Matt Lord --- .../tabletserver/messager/message_manager.go | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index 0daa051f860..14bb368729c 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -189,26 +189,27 @@ type messageManager struct { curReceiver int messagesPending bool - // lastPollPostionMu protects the lastPollPosition variable which is the main - // point of coordination between the poller and the binlog streamer to - // ensure that we are not re-processing older events. + // lastPollPositionMu protects the lastPollPosition variable which is the + // main point of coordination between the poller and the binlog streamer to + // ensure that we are not re-processing older events and moving along + // linearly in the shared virtual stream within the message manager. lastPollPositionMu sync.Mutex lastPollPosition *mysql.Position - // streamProcessingMu keeps the cache and database consistent with - // each other by ensuring that only one of the streams is processing - // changes at a time. The Poller uses a results streamer to pull directly - // from the message table and the message manager uses a binlog streamer - // to process change events. This mutex ensures that only one of them are - // updating the cache at any one time. + // streamProcessingMu keeps the cache and database consistent with each + // other by ensuring that only one of the streams is processing messages + // and updating the cache at a time. The poller uses a results streamer to + // pull directly from the message table and the message manager uses a + // binlog streamer to process change events. This mutex ensures that only + // one of them are updating the cache at any one time. // It prevents items from being removed from cache while the poller // reads from the db and adds items to it. Otherwise, the poller // might add an older snapshot of a row that was just postponed. - // It blocks vstream from receiving messages while the poller - // reads a snapshot and updates lastPollPosition. Any events older than - // lastPollPosition must be ignored by the vstream. It consequently - // also blocks vstream from updating the cache while the poller is - // active. + // It blocks the vstream (binlog streamer) from receiving messages while + // the poller reads a snapshot and updates lastPollPosition. Any events + // older than lastPollPosition must be ignored by the vstream. It + // consequently also blocks vstream from updating the cache while the + // poller is active. streamProcessingMu sync.Mutex // streamCancel is set when a vstream is running, and is reset // to nil after a cancel. This allows for startVStream and stopVStream From 9a78491bb4b49463a2a295f35d97102bd7753dda Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 25 Apr 2022 23:30:01 -0400 Subject: [PATCH 14/16] Hold the main mutex during Add This is for safe concurrency with the last receiver unsubscribing Signed-off-by: Matt Lord --- go/vt/vttablet/tabletserver/messager/message_manager.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index 14bb368729c..1d95c6de165 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -479,7 +479,9 @@ func (mm *messageManager) rescanReceivers(start int) { // if successful. If the message is already present, // it still returns true. func (mm *messageManager) Add(mr *MessageRow) bool { - if mm.getReceiverCount() == 0 { + mm.mu.Lock() + defer mm.mu.Unlock() + if len(mm.receivers) == 0 { return false } // If cache is empty, we have to broadcast that we're not empty From 937c61dd48a7c44effc2bf0ee09318d67d8f0ed0 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 27 Apr 2022 15:18:07 -0400 Subject: [PATCH 15/16] Changes after pair reviewing with Sugu Signed-off-by: Matt Lord --- .../tabletserver/messager/message_manager.go | 63 ++++++++++--------- 1 file changed, 33 insertions(+), 30 deletions(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index 1d95c6de165..8894449400e 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -188,15 +188,14 @@ type messageManager struct { receivers []*receiverWithStatus curReceiver int messagesPending bool + // streamCancel is set when a vstream is running, and is reset + // to nil after a cancel. This allows for startVStream and stopVStream + // to be idempotent. + // This is implicitly protected by the main mutex because startVStream + // and stopVStream are called while holding the main mutex. + streamCancel func() - // lastPollPositionMu protects the lastPollPosition variable which is the - // main point of coordination between the poller and the binlog streamer to - // ensure that we are not re-processing older events and moving along - // linearly in the shared virtual stream within the message manager. - lastPollPositionMu sync.Mutex - lastPollPosition *mysql.Position - - // streamProcessingMu keeps the cache and database consistent with each + // cacheManagementMu keeps the cache and database consistent with each // other by ensuring that only one of the streams is processing messages // and updating the cache at a time. The poller uses a results streamer to // pull directly from the message table and the message manager uses a @@ -210,11 +209,19 @@ type messageManager struct { // older than lastPollPosition must be ignored by the vstream. It // consequently also blocks vstream from updating the cache while the // poller is active. - streamProcessingMu sync.Mutex - // streamCancel is set when a vstream is running, and is reset - // to nil after a cancel. This allows for startVStream and stopVStream - // to be idempotent. - streamCancel func() + // TODO(mattalord): since this is primarily a flow control mechanism, we + // should do it in a more idiomatic go way using channels or cond vars. + cacheManagementMu sync.Mutex + // The lastPollPosition variable is the main point of coordination + // between the poller and the binlog streamer to ensure that we are + // not re-processing older events and moving along linearly in the + // shared virtual GTID stream within the message manager. + // It is theoretically possible for the binlog streamer to be ahead of + // the lastPollPosition. This is because of how semi-sync works today + // where a replica could have received and processed a GTID that the primary + // may not have yet commited; but this is harmless because any events missed + // will be picked up during the next poller run. + lastPollPosition *mysql.Position // wg is for ensuring all running goroutines have returned // before we can close the manager. You need to Add before @@ -582,12 +589,12 @@ func (mm *messageManager) send(receiver *receiverWithStatus, qr *sqltypes.Result } defer func() { - // Hold streamProcessingMu to prevent the ids from being discarded + // Hold cacheManagementMu to prevent the ids from being discarded // if poller is active. Otherwise, it could have read a // snapshot of a row before the postponement and requeue // the message. - mm.streamProcessingMu.Lock() - defer mm.streamProcessingMu.Unlock() + mm.cacheManagementMu.Lock() + defer mm.cacheManagementMu.Unlock() mm.cache.Discard(ids) }() @@ -671,8 +678,9 @@ func (mm *messageManager) runOneVStream(ctx context.Context) error { var fields []*querypb.Field err := mm.vs.Stream(ctx, "current", nil, mm.vsFilter, func(events []*binlogdatapb.VEvent) error { - mm.streamProcessingMu.Lock() - defer mm.streamProcessingMu.Unlock() + // We need to get the flow control lock + mm.cacheManagementMu.Lock() + defer mm.cacheManagementMu.Unlock() select { case <-ctx.Done(): @@ -681,8 +689,6 @@ func (mm *messageManager) runOneVStream(ctx context.Context) error { } mustSkip := func() (bool, error) { - mm.lastPollPositionMu.Lock() - defer mm.lastPollPositionMu.Unlock() if mm.lastPollPosition == nil { return false, nil } @@ -757,8 +763,11 @@ func (mm *messageManager) processRowEvent(fields []*querypb.Field, rowEvent *bin } func (mm *messageManager) runPoller() { - mm.streamProcessingMu.Lock() - defer mm.streamProcessingMu.Unlock() + // We need to get the flow control lock first + mm.cacheManagementMu.Lock() + defer mm.cacheManagementMu.Unlock() + // Now we can get the main/structure lock and ensure e.g. that the + // the receiver count does not change during the run mm.mu.Lock() defer mm.mu.Unlock() @@ -789,10 +798,6 @@ func (mm *messageManager) runPoller() { // There are probably more messages to be sent. mm.messagesPending = true } - if len(mm.receivers) == 0 { - // Almost never reachable because we just checked this. - return - } if len(qr.Rows) != 0 { // We've most likely added items. // Wake up the sender. @@ -931,8 +936,6 @@ func (mm *messageManager) readPending(ctx context.Context, bindVars map[string]* } qr := &sqltypes.Result{} err = mm.vs.StreamResults(ctx, query, func(response *binlogdatapb.VStreamResultsResponse) error { - mm.lastPollPositionMu.Lock() - defer mm.lastPollPositionMu.Unlock() if response.Fields != nil { qr.Fields = response.Fields } @@ -961,7 +964,7 @@ func (mm *messageManager) getReceiverCount() int { } func (mm *messageManager) getLastPollPosition() *mysql.Position { - mm.lastPollPositionMu.Lock() - defer mm.lastPollPositionMu.Unlock() + mm.cacheManagementMu.Lock() + defer mm.cacheManagementMu.Unlock() return mm.lastPollPosition } From a1040f5ff13f545d5b0207e96ac9b9bb5c1d9db8 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 27 Apr 2022 15:52:34 -0400 Subject: [PATCH 16/16] Use my GitHub handle for the self reference Signed-off-by: Matt Lord --- go/vt/vttablet/tabletserver/messager/message_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index 8894449400e..70b2976be98 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -209,7 +209,7 @@ type messageManager struct { // older than lastPollPosition must be ignored by the vstream. It // consequently also blocks vstream from updating the cache while the // poller is active. - // TODO(mattalord): since this is primarily a flow control mechanism, we + // TODO(mattlord): since this is primarily a flow control mechanism, we // should do it in a more idiomatic go way using channels or cond vars. cacheManagementMu sync.Mutex // The lastPollPosition variable is the main point of coordination