From 0eb97e40006576fd601c189e861f2bc0c5eb90d7 Mon Sep 17 00:00:00 2001 From: Michael Berlin Date: Mon, 28 Aug 2017 17:28:18 +0200 Subject: [PATCH 1/4] vttablet: Hot Row Protection: Make the transaction concurrency for a hot row configurable. By default, we will allow up to 5 transactions through to the tx pool to have a better pipelining effect and ensure that MySQL is always busy. --- go/vt/vttablet/tabletserver/query_engine.go | 4 +- .../vttablet/tabletserver/tabletenv/config.go | 16 +- .../tabletserver/tabletserver_test.go | 3 + .../txserializer/tx_serializer.go | 91 +++++++---- .../txserializer/tx_serializer_test.go | 149 ++++++++++++++---- 5 files changed, 195 insertions(+), 68 deletions(-) diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go index 73a9450c071..8260cd6f382 100644 --- a/go/vt/vttablet/tabletserver/query_engine.go +++ b/go/vt/vttablet/tabletserver/query_engine.go @@ -181,7 +181,9 @@ func NewQueryEngine(checker connpool.MySQLChecker, se *schema.Engine, config tab qe.consolidator = sync2.NewConsolidator() qe.txSerializer = txserializer.New(config.EnableHotRowProtectionDryRun, - config.HotRowProtectionMaxQueueSize, config.HotRowProtectionMaxGlobalQueueSize) + config.HotRowProtectionMaxQueueSize, + config.HotRowProtectionMaxGlobalQueueSize, + config.HotRowProtectionConcurrentTransactions) qe.streamQList = NewQueryList() qe.autoCommit.Set(config.EnableAutoCommit) diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index 0fc3194c6f7..cd2007b8b52 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -79,6 +79,7 @@ func init() { flag.BoolVar(&Config.EnableHotRowProtectionDryRun, "enable_hot_row_protection_dry_run", DefaultQsConfig.EnableHotRowProtectionDryRun, "If true, hot row protection is not enforced but logs if transactions would have been queued.") flag.IntVar(&Config.HotRowProtectionMaxQueueSize, "hot_row_protection_max_queue_size", DefaultQsConfig.HotRowProtectionMaxQueueSize, "Maximum number of BeginExecute RPCs which will be queued for the same row (range).") flag.IntVar(&Config.HotRowProtectionMaxGlobalQueueSize, "hot_row_protection_max_global_queue_size", DefaultQsConfig.HotRowProtectionMaxGlobalQueueSize, "Global queue limit across all row (ranges). Useful to prevent that the queue can grow unbounded.") + flag.IntVar(&Config.HotRowProtectionConcurrentTransactions, "hot_row_protection_concurrent_transactions", DefaultQsConfig.HotRowProtectionConcurrentTransactions, "Number of concurrent transactions let through to the txpool/MySQL for the same hot row. Should be > 1 to have enough 'ready' transactions in MySQL and benefit from a pipelining effect.") flag.BoolVar(&Config.HeartbeatEnable, "heartbeat_enable", DefaultQsConfig.HeartbeatEnable, "If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the table _vt.heartbeat. The result is used to inform the serving state of the vttablet via healthchecks.") flag.DurationVar(&Config.HeartbeatInterval, "heartbeat_interval", DefaultQsConfig.HeartbeatInterval, "How frequently to read and write replication heartbeat.") @@ -125,10 +126,11 @@ type TabletConfig struct { TxThrottlerConfig string TxThrottlerHealthCheckCells []string - EnableHotRowProtection bool - EnableHotRowProtectionDryRun bool - HotRowProtectionMaxQueueSize int - HotRowProtectionMaxGlobalQueueSize int + EnableHotRowProtection bool + EnableHotRowProtectionDryRun bool + HotRowProtectionMaxQueueSize int + HotRowProtectionMaxGlobalQueueSize int + HotRowProtectionConcurrentTransactions int HeartbeatEnable bool HeartbeatInterval time.Duration @@ -180,6 +182,9 @@ var DefaultQsConfig = TabletConfig{ // Default value is the same as TransactionCap. HotRowProtectionMaxQueueSize: 20, HotRowProtectionMaxGlobalQueueSize: 1000, + // Allow more than 1 transaction for the same hot row through to have enough + // of them ready in MySQL and profit from a pipelining effect. + HotRowProtectionConcurrentTransactions: 5, HeartbeatEnable: false, HeartbeatInterval: 1 * time.Second, @@ -218,6 +223,9 @@ func VerifyConfig() error { if globalSize, size := Config.HotRowProtectionMaxGlobalQueueSize, Config.HotRowProtectionMaxQueueSize; globalSize < size { return fmt.Errorf("global queue size must be >= per row (range) queue size: -hot_row_protection_max_global_queue_size < hot_row_protection_max_queue_size (%v < %v)", globalSize, size) } + if v := Config.HotRowProtectionConcurrentTransactions; v <= 0 { + return fmt.Errorf("-hot_row_protection_concurrent_transactions must be > 0 (specified value: %v)", v) + } return nil } diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 820005390a3..4a230c4bdf3 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -1475,6 +1475,7 @@ func TestSerializeTransactionsSameRow(t *testing.T) { testUtils := newTestUtils() config := testUtils.newQueryServiceConfig() config.EnableHotRowProtection = true + config.HotRowProtectionConcurrentTransactions = 1 // Reduce the txpool to 2 because we should never consume more than two slots. config.TransactionCap = 2 tsv := NewTabletServerWithNilTopoServer(config) @@ -1607,6 +1608,7 @@ func TestSerializeTransactionsSameRow_TooManyPendingRequests(t *testing.T) { config := testUtils.newQueryServiceConfig() config.EnableHotRowProtection = true config.HotRowProtectionMaxQueueSize = 1 + config.HotRowProtectionConcurrentTransactions = 1 tsv := NewTabletServerWithNilTopoServer(config) dbconfigs := testUtils.newDBConfigs(db) target := querypb.Target{TabletType: topodatapb.TabletType_MASTER} @@ -1693,6 +1695,7 @@ func TestSerializeTransactionsSameRow_RequestCanceled(t *testing.T) { testUtils := newTestUtils() config := testUtils.newQueryServiceConfig() config.EnableHotRowProtection = true + config.HotRowProtectionConcurrentTransactions = 1 tsv := NewTabletServerWithNilTopoServer(config) dbconfigs := testUtils.newDBConfigs(db) target := querypb.Target{TabletType: topodatapb.TabletType_MASTER} diff --git a/go/vt/vttablet/tabletserver/txserializer/tx_serializer.go b/go/vt/vttablet/tabletserver/txserializer/tx_serializer.go index e708b38a38b..f9f955c137f 100644 --- a/go/vt/vttablet/tabletserver/txserializer/tx_serializer.go +++ b/go/vt/vttablet/tabletserver/txserializer/tx_serializer.go @@ -72,9 +72,10 @@ type TxSerializer struct { *sync2.ConsolidatorCache // Immutable fields. - dryRun bool - maxQueueSize int - maxGlobalQueueSize int + dryRun bool + maxQueueSize int + maxGlobalQueueSize int + concurrentTransactions int log *logutil.ThrottledLogger logDryRun *logutil.ThrottledLogger @@ -88,12 +89,13 @@ type TxSerializer struct { } // New returns a TxSerializer object. -func New(dryRun bool, maxQueueSize, maxGlobalQueueSize int) *TxSerializer { +func New(dryRun bool, maxQueueSize, maxGlobalQueueSize, concurrentTransactions int) *TxSerializer { return &TxSerializer{ - ConsolidatorCache: sync2.NewConsolidatorCache(1000), - dryRun: dryRun, - maxQueueSize: maxQueueSize, - maxGlobalQueueSize: maxGlobalQueueSize, + ConsolidatorCache: sync2.NewConsolidatorCache(1000), + dryRun: dryRun, + maxQueueSize: maxQueueSize, + maxGlobalQueueSize: maxGlobalQueueSize, + concurrentTransactions: concurrentTransactions, log: logutil.NewThrottledLogger("HotRowProtection", 5*time.Second), logDryRun: logutil.NewThrottledLogger("HotRowProtection DryRun", 5*time.Second), logWaitsDryRun: logutil.NewThrottledLogger("HotRowProtection Waits DryRun", 5*time.Second), @@ -129,13 +131,13 @@ func (t *TxSerializer) Wait(ctx context.Context, key, table string) (done DoneFu } // lockLocked queues this transaction. It will unblock immediately if this -// transaction is the first in the queue or when it got the token (queue.lock). +// transaction is the first in the queue or when it got a token. // The method has the suffix "Locked" to clarify that "t.mu" must be locked. func (t *TxSerializer) lockLocked(ctx context.Context, key, table string) (bool, error) { q, ok := t.queues[key] if !ok { // First transaction in the queue i.e. we don't wait and return immediately. - t.queues[key] = newQueue(t.maxQueueSize) + t.queues[key] = newQueue(t.concurrentTransactions) t.globalSize++ return false, nil } @@ -165,16 +167,28 @@ func (t *TxSerializer) lockLocked(ctx context.Context, key, table string) (bool, t.globalSize++ q.size++ q.count++ + if q.size == 2 && q.max == 1 { + // Hot row detected: A second, concurrent transaction is seen for the first + // time. + // The first transaction already holds the first token and will return it + // when it's done and calls "unlock". + // If more tokens are allowed, add them now. (We delayed adding the tokens + // until now as an optimization for the default case when there is no hot + // row.) + additionalTokens := t.concurrentTransactions - 1 + for i := 1; i <= additionalTokens; i++ { + q.tokens <- struct{}{} + } + + // Include first transaction in the count at /debug/hotrows. (It was not + // recorded on purpose because it did not wait.) + t.Record(key) + } if q.size > q.max { q.max = q.size } // Publish the number of waits at /debug/hotrows. t.Record(key) - if q.size == 2 { - // Include first transaction in the count. (It was not recorded on purpose - // because it did not wait.) - t.Record(key) - } if t.dryRun { waitsDryRun.Add(table, 1) @@ -183,13 +197,22 @@ func (t *TxSerializer) lockLocked(ctx context.Context, key, table string) (bool, } // Unlock before the wait and relock before returning because our caller - // Wait() hold the lock and assumes it still has it. + // Wait() holds the lock and assumes it still has it. t.mu.Unlock() defer t.mu.Lock() + // Non-blocking read of a token. + select { + case <-q.tokens: + // Return waited=false because a token was immediately available. + return false, nil + default: + } + + // Wait for the next available token. waits.Add(table, 1) select { - case <-q.lock: + case <-q.tokens: return true, nil case <-ctx.Done(): return true, ctx.Err() @@ -208,6 +231,7 @@ func (t *TxSerializer) unlockLocked(key string, returnToken bool) { q.size-- t.globalSize-- if q.size == 0 { + // This is the last transaction in flight. delete(t.queues, key) if q.max > 1 { @@ -217,16 +241,20 @@ func (t *TxSerializer) unlockLocked(key string, returnToken bool) { t.log.Infof("%v simultaneous transactions (%v in total) for the same row range (%v) were queued.", q.max, q.count, key) } } + + // Return early because the queue "q" for this "key" will not be used any + // more. + return } // Return token to queue. Wakes up the next queued transaction. if !t.dryRun && returnToken { - q.lock <- struct{}{} + q.tokens <- struct{}{} } } -// Pending returns the number of queued transactions (including the one which -// is currently in flight.) +// Pending returns the number of queued transactions (including the ones which +// are currently in flight.) func (t *TxSerializer) Pending(key string) int { t.mu.Lock() defer t.mu.Unlock() @@ -242,13 +270,14 @@ func (t *TxSerializer) Pending(key string) int { // // Note that we don't use a dedicated queue structure for all waiting // transactions. Instead, we leverage that Go routines waiting for a channel -// are woken up in the order they are queued up. The "lock" field is said -// channel which has exactly one element, a token. All queued transactions are -// competing for this token. +// are woken up in the order they are queued up. The "tokens" field is said +// channel which has n elements, "tokens", for the number of concurrent +// transactions which can access the tx pool. All queued transactions are +// competing for these tokens. type queue struct { // NOTE: The following fields are guarded by TxSerializer.mu. - // size counts how many transactions are queued (includes the one - // transaction which is not waiting.) + // size counts how many transactions are currently queued/in flight (includes + // the transactions which are not waiting.) size int // count is the same as "size", but never gets decremented. count int @@ -256,14 +285,20 @@ type queue struct { // were simultaneously queued for the same row range. max int - lock chan struct{} + // tokens holds one element for each allowed tx pool slot. E.g. if the channel + // has a size of 1, only one transaction at a time is allowed through. + tokens chan struct{} } -func newQueue(max int) *queue { +func newQueue(concurrentTransactions int) *queue { return &queue{ size: 1, count: 1, max: 1, - lock: make(chan struct{}, 1), + // The first available token is not added as an optimization because the + // caller would immediately remove it anyway. + // If additional tokens are allowed, we delay adding them until the row + // range becomes hot and a second in-flight transaction occurs. + tokens: make(chan struct{}, concurrentTransactions), } } diff --git a/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go b/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go index fe281e3fdeb..83c2bc716d4 100644 --- a/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go +++ b/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go @@ -42,7 +42,7 @@ func resetVariables() { func TestTxSerializer(t *testing.T) { resetVariables() - txs := New(false, 2, 3) + txs := New(false, 2, 3, 1) // tx1. done1, waited1, err1 := txs.Wait(context.Background(), "t1 where1", "t1") @@ -108,6 +108,75 @@ func TestTxSerializer(t *testing.T) { } } +func TestTxSerializer_ConcurrentTransactions(t *testing.T) { + resetVariables() + // Allow up to 2 concurrent transactions per hot row. + txs := New(false, 3, 3, 2) + + // tx1. + done1, waited1, err1 := txs.Wait(context.Background(), "t1 where1", "t1") + if err1 != nil { + t.Fatal(err1) + } + if waited1 { + t.Fatalf("tx1 must never wait: %v", waited1) + } + + // tx2. + done2, waited2, err2 := txs.Wait(context.Background(), "t1 where1", "t1") + if err2 != nil { + t.Fatal(err1) + } + if waited2 { + t.Fatalf("tx2 must not wait: %v", waited1) + } + + // tx3 (gets queued and must wait). + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + + done3, waited3, err3 := txs.Wait(context.Background(), "t1 where1", "t1") + if err3 != nil { + t.Fatal(err3) + } + if !waited3 { + t.Fatalf("tx3 must wait: %v", waited2) + } + if got, want := waits.Counts()["t1"], int64(1); got != want { + t.Fatalf("variable not incremented: got = %v, want = %v", got, want) + } + + done3() + }() + + // Wait until tx3 is waiting before we finish tx2 and unblock tx3. + if err := waitForPending(txs, "t1 where1", 3); err != nil { + t.Fatal(err) + } + // Finish tx2 before tx1 to test that the "finish-order" does not matter. + // Unblocks tx3. + done2() + // Wait for tx3 to finish. + wg.Wait() + // Finish tx1 to delete the queue object. + done1() + + if txs.queues["t1 where1"] != nil { + t.Fatal("queue object was not deleted after last transaction") + } + + // 3 transactions were recorded. + if err := testHTTPHandler(txs, 3); err != nil { + t.Fatal(err) + } + // 1 of them had to wait. + if got, want := waits.Counts()["t1"], int64(1); got != want { + t.Fatalf("variable not incremented: got = %v, want = %v", got, want) + } +} + func waitForPending(txs *TxSerializer, key string, i int) error { start := time.Now() for { @@ -144,13 +213,14 @@ func testHTTPHandler(txs *TxSerializer, count int) error { return nil } -// TestTxSerializerCancel runs 3 pending transactions. tx2 will get canceled -// and tx3 will be unblocked once tx1 is done. +// TestTxSerializerCancel runs 4 pending transactions. +// tx1 and tx2 are allowed to run concurrently while tx3 and tx4 are queued. +// tx3 will get canceled and tx4 will be unblocked once tx1 is done. func TestTxSerializerCancel(t *testing.T) { resetVariables() - txs := New(false, 3, 3) + txs := New(false, 4, 4, 2) - // tx2 and tx3 will record their number once they're done waiting. + // tx3 and tx4 will record their number once they're done waiting. txDone := make(chan int) // tx1. @@ -161,68 +231,77 @@ func TestTxSerializerCancel(t *testing.T) { if waited1 { t.Fatalf("tx1 must never wait: %v", waited1) } + // tx2. + done2, waited2, err2 := txs.Wait(context.Background(), "t1 where1", "t1") + if err2 != nil { + t.Fatal(err2) + } + if waited2 { + t.Fatalf("tx2 must not wait: %v", waited2) + } - // tx2 (gets queued and must wait). - ctx2, cancel2 := context.WithCancel(context.Background()) + // tx3 (gets queued and must wait). + ctx3, cancel3 := context.WithCancel(context.Background()) wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() - _, _, err2 := txs.Wait(ctx2, "t1 where1", "t1") - if err2 != context.Canceled { - t.Fatal(err2) + _, _, err3 := txs.Wait(ctx3, "t1 where1", "t1") + if err3 != context.Canceled { + t.Fatal(err3) } - txDone <- 2 + txDone <- 3 }() - // Wait until tx2 is waiting before we try tx3. - if err := waitForPending(txs, "t1 where1", 2); err != nil { + // Wait until tx3 is waiting before we try tx4. + if err := waitForPending(txs, "t1 where1", 3); err != nil { t.Fatal(err) } - // tx3 (gets queued and must wait as well). + // tx4 (gets queued and must wait as well). wg.Add(1) go func() { defer wg.Done() - done3, waited3, err3 := txs.Wait(context.Background(), "t1 where1", "t1") - if err3 != nil { - t.Fatal(err3) + done4, waited4, err4 := txs.Wait(context.Background(), "t1 where1", "t1") + if err4 != nil { + t.Fatal(err4) } - if !waited3 { - t.Fatalf("tx3 must have waited: %v", waited3) + if !waited4 { + t.Fatalf("tx4 must have waited: %v", waited4) } - txDone <- 3 + txDone <- 4 - done3() + done4() }() - // Wait until tx3 is waiting before we start to cancel tx2. - if err := waitForPending(txs, "t1 where1", 3); err != nil { + // Wait until tx4 is waiting before we start to cancel tx3. + if err := waitForPending(txs, "t1 where1", 4); err != nil { t.Fatal(err) } - // Cancel tx2. - cancel2() - if got := <-txDone; got != 2 { - t.Fatalf("tx2 should have been unblocked after the cancel: %v", got) + // Cancel tx3. + cancel3() + if got := <-txDone; got != 3 { + t.Fatalf("tx3 should have been unblocked after the cancel: %v", got) } // Finish tx1. done1() - // Wait for tx3. - if got := <-txDone; got != 3 { + // Wait for tx4. + if got := <-txDone; got != 4 { t.Fatalf("wrong tx was unblocked after tx1: %v", got) } - wg.Wait() + // Finish tx2 (the last transaction) which will delete the queue object. + done2() if txs.queues["t1 where1"] != nil { t.Fatal("queue object was not deleted after last transaction") } - // 3 total transactions get recorded. - if err := testHTTPHandler(txs, 3); err != nil { + // 4 total transactions get recorded. + if err := testHTTPHandler(txs, 4); err != nil { t.Fatal(err) } // 2 of them had to wait. @@ -235,7 +314,7 @@ func TestTxSerializerCancel(t *testing.T) { // the two concurrent transactions for the same key. func TestTxSerializerDryRun(t *testing.T) { resetVariables() - txs := New(true, 1, 2) + txs := New(true, 1, 2, 1) // tx1. done1, waited1, err1 := txs.Wait(context.Background(), "t1 where1", "t1") @@ -300,7 +379,7 @@ func TestTxSerializerDryRun(t *testing.T) { // reject transactions although they may succeed within the txpool constraints // and RPC deadline. func TestTxSerializerGlobalQueueOverflow(t *testing.T) { - txs := New(false, 1, 1 /* maxGlobalQueueSize */) + txs := New(false, 1, 1 /* maxGlobalQueueSize */, 1) // tx1. done1, waited1, err1 := txs.Wait(context.Background(), "t1 where1", "t1") @@ -337,7 +416,7 @@ func TestTxSerializerGlobalQueueOverflow(t *testing.T) { } func TestTxSerializerPending(t *testing.T) { - txs := New(false, 1, 1) + txs := New(false, 1, 1, 1) if got, want := txs.Pending("t1 where1"), 0; got != want { t.Fatalf("there should be no pending transaction: got = %v, want = %v", got, want) } From cfe4ccefe62d7ea8b7519a7c82cbc505b7d8649f Mon Sep 17 00:00:00 2001 From: Michael Berlin Date: Mon, 28 Aug 2017 19:07:06 +0200 Subject: [PATCH 2/4] vttablet: Hot Row Protection: Integration test for concurrent transaction support. --- .../tabletserver/tabletserver_test.go | 135 +++++++++++++++++- 1 file changed, 131 insertions(+), 4 deletions(-) diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 4a230c4bdf3..23d56637354 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -1513,7 +1513,7 @@ func TestSerializeTransactionsSameRow(t *testing.T) { db.SetBeforeFunc("update test_table set name_string = 'tx1' where pk in (1) /* _stream test_table (pk ) (1 ); */", func() { close(tx1Started) - if err := waitForTxSerializationCount(tsv, "test_table where pk = 1 and name = 1", 2); err != nil { + if err := waitForTxSerializationPendingQueries(tsv, "test_table where pk = 1 and name = 1", 2); err != nil { t.Fatal(err) } }) @@ -1581,7 +1581,134 @@ func TestSerializeTransactionsSameRow(t *testing.T) { } } -func waitForTxSerializationCount(tsv *TabletServer, key string, i int) error { +func TestSerializeTransactionsSameRow_ConcurrentTransactions(t *testing.T) { + // This test runs three transaction in parallel: + // tx1 | tx2 | tx3 + // Out of these three, two can run in parallel because we increased the + // ConcurrentTransactions limit to 2. + // One out of the three transaction will always get serialized though. + db := setUpTabletServerTest(t) + defer db.Close() + testUtils := newTestUtils() + config := testUtils.newQueryServiceConfig() + config.EnableHotRowProtection = true + config.HotRowProtectionConcurrentTransactions = 2 + // Reduce the txpool to 2 because we should never consume more than two slots. + config.TransactionCap = 2 + tsv := NewTabletServerWithNilTopoServer(config) + dbconfigs := testUtils.newDBConfigs(db) + target := querypb.Target{TabletType: topodatapb.TabletType_MASTER} + if err := tsv.StartService(target, dbconfigs, testUtils.newMysqld(&dbconfigs)); err != nil { + t.Fatalf("StartService failed: %v", err) + } + defer tsv.StopService() + countStart := tabletenv.WaitStats.Counts()["TxSerializer"] + + // Fake data. + q1 := "update test_table set name_string = 'tx1' where pk = :pk and name = :name" + q2 := "update test_table set name_string = 'tx2' where pk = :pk and name = :name" + q3 := "update test_table set name_string = 'tx3' where pk = :pk and name = :name" + // Every request needs their own bind variables to avoid data races. + bvTx1 := map[string]*querypb.BindVariable{ + "pk": sqltypes.Int64BindVariable(1), + "name": sqltypes.Int64BindVariable(1), + } + bvTx2 := map[string]*querypb.BindVariable{ + "pk": sqltypes.Int64BindVariable(1), + "name": sqltypes.Int64BindVariable(1), + } + bvTx3 := map[string]*querypb.BindVariable{ + "pk": sqltypes.Int64BindVariable(1), + "name": sqltypes.Int64BindVariable(1), + } + + tx1Started := make(chan struct{}) + allQueriesPending := make(chan struct{}) + db.SetBeforeFunc("update test_table set name_string = 'tx1' where pk in (1) /* _stream test_table (pk ) (1 ); */", + func() { + close(tx1Started) + <-allQueriesPending + }) + + // Run all three transactions. + ctx := context.Background() + wg := sync.WaitGroup{} + + // tx1. + wg.Add(1) + go func() { + defer wg.Done() + + _, tx1, err := tsv.BeginExecute(ctx, &target, q1, bvTx1, nil) + if err != nil { + t.Fatalf("failed to execute query: %s: %s", q1, err) + } + + if err := tsv.Commit(ctx, &target, tx1); err != nil { + t.Fatalf("call TabletServer.Commit failed: %v", err) + } + }() + + // tx2. + wg.Add(1) + go func() { + defer wg.Done() + + // Wait for tx1 to avoid that this tx could pass tx1, without any contention. + // In that case, we would see less than 3 pending transactions. + <-tx1Started + + _, tx2, err := tsv.BeginExecute(ctx, &target, q2, bvTx2, nil) + if err != nil { + t.Fatalf("failed to execute query: %s: %s", q2, err) + } + + if err := tsv.Commit(ctx, &target, tx2); err != nil { + t.Fatalf("call TabletServer.Commit failed: %v", err) + } + }() + + // tx3. + wg.Add(1) + go func() { + defer wg.Done() + + // Wait for tx1 to avoid that this tx could pass tx1, without any contention. + // In that case, we would see less than 3 pending transactions. + <-tx1Started + + _, tx3, err := tsv.BeginExecute(ctx, &target, q3, bvTx3, nil) + if err != nil { + t.Fatalf("failed to execute query: %s: %s", q3, err) + } + + if err := tsv.Commit(ctx, &target, tx3); err != nil { + t.Fatalf("call TabletServer.Commit failed: %v", err) + } + }() + + // At this point, all three transactions should be blocked in BeginExecute() + // and therefore count as pending transaction by the Hot Row Protection. + // + // NOTE: We are not doing more sophisticated synchronizations between the + // transactions via db.SetBeforeFunc() for the same reason as mentioned + // in TestSerializeTransactionsSameRow: The MySQL C client does not seem + // to allow more than connection attempt at a time. + if err := waitForTxSerializationPendingQueries(tsv, "test_table where pk = 1 and name = 1", 3); err != nil { + t.Fatal(err) + } + close(allQueriesPending) + + wg.Wait() + + got, ok := tabletenv.WaitStats.Counts()["TxSerializer"] + want := countStart + 1 + if !ok || got != want { + t.Fatalf("One out of the three transactions must have waited: ok? %v got: %v want: %v", ok, got, want) + } +} + +func waitForTxSerializationPendingQueries(tsv *TabletServer, key string, i int) error { start := time.Now() for { got, want := tsv.qe.txSerializer.Pending(key), i @@ -1777,7 +1904,7 @@ func TestSerializeTransactionsSameRow_RequestCanceled(t *testing.T) { defer wg.Done() // Wait until tx1 and tx2 are pending to make the test deterministic. - if err := waitForTxSerializationCount(tsv, "test_table where pk = 1 and name = 1", 2); err != nil { + if err := waitForTxSerializationPendingQueries(tsv, "test_table where pk = 1 and name = 1", 2); err != nil { t.Fatal(err) } @@ -1792,7 +1919,7 @@ func TestSerializeTransactionsSameRow_RequestCanceled(t *testing.T) { }() // Wait until tx1, 2 and 3 are pending. - if err := waitForTxSerializationCount(tsv, "test_table where pk = 1 and name = 1", 3); err != nil { + if err := waitForTxSerializationPendingQueries(tsv, "test_table where pk = 1 and name = 1", 3); err != nil { t.Fatal(err) } // Now unblock tx2 and cancel it. From 21bff4e6d6a994d332b6498c57da65d9743ce082 Mon Sep 17 00:00:00 2001 From: Michael Berlin Date: Mon, 28 Aug 2017 20:15:34 +0200 Subject: [PATCH 3/4] vttablet: Hot Row Protection: Reverse the "tokens" queue for the allowed concurrent transactions from pull to push. Instead of pre-filling the channel with tokens, we will occupy one slot in the channel only while our transaction is pending. This way, we avoid the costly prepopulation of the channel with tokens. --- .../txserializer/tx_serializer.go | 72 +++++++++---------- 1 file changed, 33 insertions(+), 39 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txserializer/tx_serializer.go b/go/vt/vttablet/tabletserver/txserializer/tx_serializer.go index f9f955c137f..c764c92c17f 100644 --- a/go/vt/vttablet/tabletserver/txserializer/tx_serializer.go +++ b/go/vt/vttablet/tabletserver/txserializer/tx_serializer.go @@ -122,8 +122,8 @@ func (t *TxSerializer) Wait(ctx context.Context, key, table string) (done DoneFu if err != nil { if waited { // Waiting failed early e.g. due a canceled context and we did NOT get the - // token. Call "done" now because we don't return it to the caller. - t.unlockLocked(key, false /* returnToken */) + // slot. Call "done" now because we don't return it to the caller. + t.unlockLocked(key, false /* returnSlot */) } return nil, waited, err } @@ -131,13 +131,13 @@ func (t *TxSerializer) Wait(ctx context.Context, key, table string) (done DoneFu } // lockLocked queues this transaction. It will unblock immediately if this -// transaction is the first in the queue or when it got a token. +// transaction is the first in the queue or when it acquired a slot. // The method has the suffix "Locked" to clarify that "t.mu" must be locked. func (t *TxSerializer) lockLocked(ctx context.Context, key, table string) (bool, error) { q, ok := t.queues[key] if !ok { // First transaction in the queue i.e. we don't wait and return immediately. - t.queues[key] = newQueue(t.concurrentTransactions) + t.queues[key] = newQueueForFirstTransaction(t.concurrentTransactions) t.globalSize++ return false, nil } @@ -170,15 +170,6 @@ func (t *TxSerializer) lockLocked(ctx context.Context, key, table string) (bool, if q.size == 2 && q.max == 1 { // Hot row detected: A second, concurrent transaction is seen for the first // time. - // The first transaction already holds the first token and will return it - // when it's done and calls "unlock". - // If more tokens are allowed, add them now. (We delayed adding the tokens - // until now as an optimization for the default case when there is no hot - // row.) - additionalTokens := t.concurrentTransactions - 1 - for i := 1; i <= additionalTokens; i++ { - q.tokens <- struct{}{} - } // Include first transaction in the count at /debug/hotrows. (It was not // recorded on purpose because it did not wait.) @@ -201,18 +192,18 @@ func (t *TxSerializer) lockLocked(ctx context.Context, key, table string) (bool, t.mu.Unlock() defer t.mu.Lock() - // Non-blocking read of a token. + // Non-blocking write attempt to get a slot. select { - case <-q.tokens: - // Return waited=false because a token was immediately available. + case q.availableSlots <- struct{}{}: + // Return waited=false because a slot was immediately available. return false, nil default: } - // Wait for the next available token. + // Blocking wait for the next available slot. waits.Add(table, 1) select { - case <-q.tokens: + case q.availableSlots <- struct{}{}: return true, nil case <-ctx.Done(): return true, ctx.Err() @@ -226,7 +217,7 @@ func (t *TxSerializer) unlock(key string) { t.unlockLocked(key, true) } -func (t *TxSerializer) unlockLocked(key string, returnToken bool) { +func (t *TxSerializer) unlockLocked(key string, returnSlot bool) { q := t.queues[key] q.size-- t.globalSize-- @@ -247,9 +238,11 @@ func (t *TxSerializer) unlockLocked(key string, returnToken bool) { return } - // Return token to queue. Wakes up the next queued transaction. - if !t.dryRun && returnToken { - q.tokens <- struct{}{} + // Give up slot by removing ourselves from the channel. + // Wakes up the next queued transaction. + if !t.dryRun && returnSlot { + // This should never block. + <-q.availableSlots } } @@ -266,14 +259,14 @@ func (t *TxSerializer) Pending(key string) int { return q.size } -// queue reprents the local queue for a particular row (range). +// queue represents the local queue for a particular row (range). // // Note that we don't use a dedicated queue structure for all waiting // transactions. Instead, we leverage that Go routines waiting for a channel -// are woken up in the order they are queued up. The "tokens" field is said -// channel which has n elements, "tokens", for the number of concurrent -// transactions which can access the tx pool. All queued transactions are -// competing for these tokens. +// are woken up in the order they are queued up. The "availableSlots" field is +// said channel which has n free slots (for the number of concurrent +// transactions which can access the tx pool). All queued transactions are +// competing for these slots and try to add themselves to the channel. type queue struct { // NOTE: The following fields are guarded by TxSerializer.mu. // size counts how many transactions are currently queued/in flight (includes @@ -285,20 +278,21 @@ type queue struct { // were simultaneously queued for the same row range. max int - // tokens holds one element for each allowed tx pool slot. E.g. if the channel - // has a size of 1, only one transaction at a time is allowed through. - tokens chan struct{} + // availableSlots limits the number of concurrent transactions *per* + // hot row (range). It holds one element for each allowed pending + // transaction i.e. consumed tx pool slot. Consequently, if the channel + // is full, subsequent transactions have to wait until they can place + // their entry here. + availableSlots chan struct{} } -func newQueue(concurrentTransactions int) *queue { +func newQueueForFirstTransaction(concurrentTransactions int) *queue { + availableSlots := make(chan struct{}, concurrentTransactions) + availableSlots <- struct{}{} return &queue{ - size: 1, - count: 1, - max: 1, - // The first available token is not added as an optimization because the - // caller would immediately remove it anyway. - // If additional tokens are allowed, we delay adding them until the row - // range becomes hot and a second in-flight transaction occurs. - tokens: make(chan struct{}, concurrentTransactions), + size: 1, + count: 1, + max: 1, + availableSlots: availableSlots, } } From c2c7a4bcb1103de998cd3ee92b3dd6dedd43f99b Mon Sep 17 00:00:00 2001 From: Michael Berlin Date: Tue, 29 Aug 2017 07:24:35 -0700 Subject: [PATCH 4/4] vttablet: Hot Row Protection: Defer the channel creation until there is a second transaction and the row actually becomes hot. This saves 22% execution time for the default case that there is no hot row. before: $ go test -bench BenchmarkTxSerializer_NoHotRow -benchtime 30s goos: darwin goarch: amd64 BenchmarkTxSerializer_NoHotRow-4 100000000 330 ns/op after: $ go test -bench BenchmarkTxSerializer_NoHotRow -benchtime 30s goos: darwin goarch: amd64 BenchmarkTxSerializer_NoHotRow-4 200000000 257 ns/op memstats before: 64 B/op 2 allocs/op memstats after: 160 B/op 3 allocs/op Note that this optimization is important for two reasons: - The code is on the critical serving path i.e. every single vttablet request has to go through it. - This code path has to use a global mutex which effectively serializes all requests and reduces the parallelism. Given that, I think this optimization is justified. --- .../txserializer/tx_serializer.go | 46 +++++++++++++------ .../txserializer/tx_serializer_test.go | 44 ++++++++++++++++++ 2 files changed, 75 insertions(+), 15 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txserializer/tx_serializer.go b/go/vt/vttablet/tabletserver/txserializer/tx_serializer.go index c764c92c17f..52ba58bd769 100644 --- a/go/vt/vttablet/tabletserver/txserializer/tx_serializer.go +++ b/go/vt/vttablet/tabletserver/txserializer/tx_serializer.go @@ -164,17 +164,22 @@ func (t *TxSerializer) lockLocked(ctx context.Context, key, table string) (bool, } } - t.globalSize++ - q.size++ - q.count++ - if q.size == 2 && q.max == 1 { - // Hot row detected: A second, concurrent transaction is seen for the first - // time. + if q.availableSlots == nil { + // Hot row detected: A second, concurrent transaction is seen for the + // first time. + + // As an optimization, we deferred the creation of the channel until now. + q.availableSlots = make(chan struct{}, t.concurrentTransactions) + q.availableSlots <- struct{}{} // Include first transaction in the count at /debug/hotrows. (It was not // recorded on purpose because it did not wait.) t.Record(key) } + + t.globalSize++ + q.size++ + q.count++ if q.size > q.max { q.max = q.size } @@ -221,6 +226,7 @@ func (t *TxSerializer) unlockLocked(key string, returnSlot bool) { q := t.queues[key] q.size-- t.globalSize-- + if q.size == 0 { // This is the last transaction in flight. delete(t.queues, key) @@ -235,15 +241,26 @@ func (t *TxSerializer) unlockLocked(key string, returnSlot bool) { // Return early because the queue "q" for this "key" will not be used any // more. + // We intentionally skip returning the last slot and closing the + // "availableSlots" channel because it is not required by Go. return } // Give up slot by removing ourselves from the channel. // Wakes up the next queued transaction. - if !t.dryRun && returnSlot { - // This should never block. - <-q.availableSlots + + if t.dryRun { + // Dry-run did not acquire a slot in the first place. + return } + + if !returnSlot { + // We did not acquire a slot in the first place e.g. due to a canceled context. + return + } + + // This should never block. + <-q.availableSlots } // Pending returns the number of queued transactions (including the ones which @@ -283,16 +300,15 @@ type queue struct { // transaction i.e. consumed tx pool slot. Consequently, if the channel // is full, subsequent transactions have to wait until they can place // their entry here. + // NOTE: As an optimization, we defer the creation of the channel until + // a second transaction for the same hot row is running. availableSlots chan struct{} } func newQueueForFirstTransaction(concurrentTransactions int) *queue { - availableSlots := make(chan struct{}, concurrentTransactions) - availableSlots <- struct{}{} return &queue{ - size: 1, - count: 1, - max: 1, - availableSlots: availableSlots, + size: 1, + count: 1, + max: 1, } } diff --git a/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go b/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go index 83c2bc716d4..7e719141c30 100644 --- a/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go +++ b/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go @@ -40,6 +40,29 @@ func resetVariables() { globalQueueExceededDryRun.Set(0) } +func TestTxSerializer_NoHotRow(t *testing.T) { + resetVariables() + txs := New(false, 1, 1, 5) + + done, waited, err := txs.Wait(context.Background(), "t1 where1", "t1") + if err != nil { + t.Fatal(err) + } + if waited { + t.Fatal("non-parallel tx must never wait") + } + done() + + // No hot row was recoded. + if err := testHTTPHandler(txs, 0); err != nil { + t.Fatal(err) + } + // No transaction had to wait. + if got, want := waits.Counts()["t1"], int64(0); got != want { + t.Fatalf("wrong Waits variable: got = %v, want = %v", got, want) + } +} + func TestTxSerializer(t *testing.T) { resetVariables() txs := New(false, 2, 3, 1) @@ -206,6 +229,10 @@ func testHTTPHandler(txs *TxSerializer, count int) error { want := fmt.Sprintf(`Length: 1 %d: t1 where1 `, count) + if count == 0 { + want = `Length: 0 +` + } if got := rr.Body.String(); got != want { return fmt.Errorf("wrong content: got = \n%v\n want = \n%v", got, want) } @@ -421,3 +448,20 @@ func TestTxSerializerPending(t *testing.T) { t.Fatalf("there should be no pending transaction: got = %v, want = %v", got, want) } } + +func BenchmarkTxSerializer_NoHotRow(b *testing.B) { + txs := New(false, 1, 1, 5) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + done, waited, err := txs.Wait(context.Background(), "t1 where1", "t1") + if err != nil { + b.Fatal(err) + } + if waited { + b.Fatal("non-parallel tx must never wait") + } + done() + } +}