diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go index 73a9450c071..8260cd6f382 100644 --- a/go/vt/vttablet/tabletserver/query_engine.go +++ b/go/vt/vttablet/tabletserver/query_engine.go @@ -181,7 +181,9 @@ func NewQueryEngine(checker connpool.MySQLChecker, se *schema.Engine, config tab qe.consolidator = sync2.NewConsolidator() qe.txSerializer = txserializer.New(config.EnableHotRowProtectionDryRun, - config.HotRowProtectionMaxQueueSize, config.HotRowProtectionMaxGlobalQueueSize) + config.HotRowProtectionMaxQueueSize, + config.HotRowProtectionMaxGlobalQueueSize, + config.HotRowProtectionConcurrentTransactions) qe.streamQList = NewQueryList() qe.autoCommit.Set(config.EnableAutoCommit) diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index 0fc3194c6f7..cd2007b8b52 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -79,6 +79,7 @@ func init() { flag.BoolVar(&Config.EnableHotRowProtectionDryRun, "enable_hot_row_protection_dry_run", DefaultQsConfig.EnableHotRowProtectionDryRun, "If true, hot row protection is not enforced but logs if transactions would have been queued.") flag.IntVar(&Config.HotRowProtectionMaxQueueSize, "hot_row_protection_max_queue_size", DefaultQsConfig.HotRowProtectionMaxQueueSize, "Maximum number of BeginExecute RPCs which will be queued for the same row (range).") flag.IntVar(&Config.HotRowProtectionMaxGlobalQueueSize, "hot_row_protection_max_global_queue_size", DefaultQsConfig.HotRowProtectionMaxGlobalQueueSize, "Global queue limit across all row (ranges). Useful to prevent that the queue can grow unbounded.") + flag.IntVar(&Config.HotRowProtectionConcurrentTransactions, "hot_row_protection_concurrent_transactions", DefaultQsConfig.HotRowProtectionConcurrentTransactions, "Number of concurrent transactions let through to the txpool/MySQL for the same hot row. Should be > 1 to have enough 'ready' transactions in MySQL and benefit from a pipelining effect.") flag.BoolVar(&Config.HeartbeatEnable, "heartbeat_enable", DefaultQsConfig.HeartbeatEnable, "If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the table _vt.heartbeat. The result is used to inform the serving state of the vttablet via healthchecks.") flag.DurationVar(&Config.HeartbeatInterval, "heartbeat_interval", DefaultQsConfig.HeartbeatInterval, "How frequently to read and write replication heartbeat.") @@ -125,10 +126,11 @@ type TabletConfig struct { TxThrottlerConfig string TxThrottlerHealthCheckCells []string - EnableHotRowProtection bool - EnableHotRowProtectionDryRun bool - HotRowProtectionMaxQueueSize int - HotRowProtectionMaxGlobalQueueSize int + EnableHotRowProtection bool + EnableHotRowProtectionDryRun bool + HotRowProtectionMaxQueueSize int + HotRowProtectionMaxGlobalQueueSize int + HotRowProtectionConcurrentTransactions int HeartbeatEnable bool HeartbeatInterval time.Duration @@ -180,6 +182,9 @@ var DefaultQsConfig = TabletConfig{ // Default value is the same as TransactionCap. HotRowProtectionMaxQueueSize: 20, HotRowProtectionMaxGlobalQueueSize: 1000, + // Allow more than 1 transaction for the same hot row through to have enough + // of them ready in MySQL and profit from a pipelining effect. + HotRowProtectionConcurrentTransactions: 5, HeartbeatEnable: false, HeartbeatInterval: 1 * time.Second, @@ -218,6 +223,9 @@ func VerifyConfig() error { if globalSize, size := Config.HotRowProtectionMaxGlobalQueueSize, Config.HotRowProtectionMaxQueueSize; globalSize < size { return fmt.Errorf("global queue size must be >= per row (range) queue size: -hot_row_protection_max_global_queue_size < hot_row_protection_max_queue_size (%v < %v)", globalSize, size) } + if v := Config.HotRowProtectionConcurrentTransactions; v <= 0 { + return fmt.Errorf("-hot_row_protection_concurrent_transactions must be > 0 (specified value: %v)", v) + } return nil } diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 820005390a3..23d56637354 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -1475,6 +1475,7 @@ func TestSerializeTransactionsSameRow(t *testing.T) { testUtils := newTestUtils() config := testUtils.newQueryServiceConfig() config.EnableHotRowProtection = true + config.HotRowProtectionConcurrentTransactions = 1 // Reduce the txpool to 2 because we should never consume more than two slots. config.TransactionCap = 2 tsv := NewTabletServerWithNilTopoServer(config) @@ -1512,7 +1513,7 @@ func TestSerializeTransactionsSameRow(t *testing.T) { db.SetBeforeFunc("update test_table set name_string = 'tx1' where pk in (1) /* _stream test_table (pk ) (1 ); */", func() { close(tx1Started) - if err := waitForTxSerializationCount(tsv, "test_table where pk = 1 and name = 1", 2); err != nil { + if err := waitForTxSerializationPendingQueries(tsv, "test_table where pk = 1 and name = 1", 2); err != nil { t.Fatal(err) } }) @@ -1580,7 +1581,134 @@ func TestSerializeTransactionsSameRow(t *testing.T) { } } -func waitForTxSerializationCount(tsv *TabletServer, key string, i int) error { +func TestSerializeTransactionsSameRow_ConcurrentTransactions(t *testing.T) { + // This test runs three transaction in parallel: + // tx1 | tx2 | tx3 + // Out of these three, two can run in parallel because we increased the + // ConcurrentTransactions limit to 2. + // One out of the three transaction will always get serialized though. + db := setUpTabletServerTest(t) + defer db.Close() + testUtils := newTestUtils() + config := testUtils.newQueryServiceConfig() + config.EnableHotRowProtection = true + config.HotRowProtectionConcurrentTransactions = 2 + // Reduce the txpool to 2 because we should never consume more than two slots. + config.TransactionCap = 2 + tsv := NewTabletServerWithNilTopoServer(config) + dbconfigs := testUtils.newDBConfigs(db) + target := querypb.Target{TabletType: topodatapb.TabletType_MASTER} + if err := tsv.StartService(target, dbconfigs, testUtils.newMysqld(&dbconfigs)); err != nil { + t.Fatalf("StartService failed: %v", err) + } + defer tsv.StopService() + countStart := tabletenv.WaitStats.Counts()["TxSerializer"] + + // Fake data. + q1 := "update test_table set name_string = 'tx1' where pk = :pk and name = :name" + q2 := "update test_table set name_string = 'tx2' where pk = :pk and name = :name" + q3 := "update test_table set name_string = 'tx3' where pk = :pk and name = :name" + // Every request needs their own bind variables to avoid data races. + bvTx1 := map[string]*querypb.BindVariable{ + "pk": sqltypes.Int64BindVariable(1), + "name": sqltypes.Int64BindVariable(1), + } + bvTx2 := map[string]*querypb.BindVariable{ + "pk": sqltypes.Int64BindVariable(1), + "name": sqltypes.Int64BindVariable(1), + } + bvTx3 := map[string]*querypb.BindVariable{ + "pk": sqltypes.Int64BindVariable(1), + "name": sqltypes.Int64BindVariable(1), + } + + tx1Started := make(chan struct{}) + allQueriesPending := make(chan struct{}) + db.SetBeforeFunc("update test_table set name_string = 'tx1' where pk in (1) /* _stream test_table (pk ) (1 ); */", + func() { + close(tx1Started) + <-allQueriesPending + }) + + // Run all three transactions. + ctx := context.Background() + wg := sync.WaitGroup{} + + // tx1. + wg.Add(1) + go func() { + defer wg.Done() + + _, tx1, err := tsv.BeginExecute(ctx, &target, q1, bvTx1, nil) + if err != nil { + t.Fatalf("failed to execute query: %s: %s", q1, err) + } + + if err := tsv.Commit(ctx, &target, tx1); err != nil { + t.Fatalf("call TabletServer.Commit failed: %v", err) + } + }() + + // tx2. + wg.Add(1) + go func() { + defer wg.Done() + + // Wait for tx1 to avoid that this tx could pass tx1, without any contention. + // In that case, we would see less than 3 pending transactions. + <-tx1Started + + _, tx2, err := tsv.BeginExecute(ctx, &target, q2, bvTx2, nil) + if err != nil { + t.Fatalf("failed to execute query: %s: %s", q2, err) + } + + if err := tsv.Commit(ctx, &target, tx2); err != nil { + t.Fatalf("call TabletServer.Commit failed: %v", err) + } + }() + + // tx3. + wg.Add(1) + go func() { + defer wg.Done() + + // Wait for tx1 to avoid that this tx could pass tx1, without any contention. + // In that case, we would see less than 3 pending transactions. + <-tx1Started + + _, tx3, err := tsv.BeginExecute(ctx, &target, q3, bvTx3, nil) + if err != nil { + t.Fatalf("failed to execute query: %s: %s", q3, err) + } + + if err := tsv.Commit(ctx, &target, tx3); err != nil { + t.Fatalf("call TabletServer.Commit failed: %v", err) + } + }() + + // At this point, all three transactions should be blocked in BeginExecute() + // and therefore count as pending transaction by the Hot Row Protection. + // + // NOTE: We are not doing more sophisticated synchronizations between the + // transactions via db.SetBeforeFunc() for the same reason as mentioned + // in TestSerializeTransactionsSameRow: The MySQL C client does not seem + // to allow more than connection attempt at a time. + if err := waitForTxSerializationPendingQueries(tsv, "test_table where pk = 1 and name = 1", 3); err != nil { + t.Fatal(err) + } + close(allQueriesPending) + + wg.Wait() + + got, ok := tabletenv.WaitStats.Counts()["TxSerializer"] + want := countStart + 1 + if !ok || got != want { + t.Fatalf("One out of the three transactions must have waited: ok? %v got: %v want: %v", ok, got, want) + } +} + +func waitForTxSerializationPendingQueries(tsv *TabletServer, key string, i int) error { start := time.Now() for { got, want := tsv.qe.txSerializer.Pending(key), i @@ -1607,6 +1735,7 @@ func TestSerializeTransactionsSameRow_TooManyPendingRequests(t *testing.T) { config := testUtils.newQueryServiceConfig() config.EnableHotRowProtection = true config.HotRowProtectionMaxQueueSize = 1 + config.HotRowProtectionConcurrentTransactions = 1 tsv := NewTabletServerWithNilTopoServer(config) dbconfigs := testUtils.newDBConfigs(db) target := querypb.Target{TabletType: topodatapb.TabletType_MASTER} @@ -1693,6 +1822,7 @@ func TestSerializeTransactionsSameRow_RequestCanceled(t *testing.T) { testUtils := newTestUtils() config := testUtils.newQueryServiceConfig() config.EnableHotRowProtection = true + config.HotRowProtectionConcurrentTransactions = 1 tsv := NewTabletServerWithNilTopoServer(config) dbconfigs := testUtils.newDBConfigs(db) target := querypb.Target{TabletType: topodatapb.TabletType_MASTER} @@ -1774,7 +1904,7 @@ func TestSerializeTransactionsSameRow_RequestCanceled(t *testing.T) { defer wg.Done() // Wait until tx1 and tx2 are pending to make the test deterministic. - if err := waitForTxSerializationCount(tsv, "test_table where pk = 1 and name = 1", 2); err != nil { + if err := waitForTxSerializationPendingQueries(tsv, "test_table where pk = 1 and name = 1", 2); err != nil { t.Fatal(err) } @@ -1789,7 +1919,7 @@ func TestSerializeTransactionsSameRow_RequestCanceled(t *testing.T) { }() // Wait until tx1, 2 and 3 are pending. - if err := waitForTxSerializationCount(tsv, "test_table where pk = 1 and name = 1", 3); err != nil { + if err := waitForTxSerializationPendingQueries(tsv, "test_table where pk = 1 and name = 1", 3); err != nil { t.Fatal(err) } // Now unblock tx2 and cancel it. diff --git a/go/vt/vttablet/tabletserver/txserializer/tx_serializer.go b/go/vt/vttablet/tabletserver/txserializer/tx_serializer.go index e708b38a38b..52ba58bd769 100644 --- a/go/vt/vttablet/tabletserver/txserializer/tx_serializer.go +++ b/go/vt/vttablet/tabletserver/txserializer/tx_serializer.go @@ -72,9 +72,10 @@ type TxSerializer struct { *sync2.ConsolidatorCache // Immutable fields. - dryRun bool - maxQueueSize int - maxGlobalQueueSize int + dryRun bool + maxQueueSize int + maxGlobalQueueSize int + concurrentTransactions int log *logutil.ThrottledLogger logDryRun *logutil.ThrottledLogger @@ -88,12 +89,13 @@ type TxSerializer struct { } // New returns a TxSerializer object. -func New(dryRun bool, maxQueueSize, maxGlobalQueueSize int) *TxSerializer { +func New(dryRun bool, maxQueueSize, maxGlobalQueueSize, concurrentTransactions int) *TxSerializer { return &TxSerializer{ - ConsolidatorCache: sync2.NewConsolidatorCache(1000), - dryRun: dryRun, - maxQueueSize: maxQueueSize, - maxGlobalQueueSize: maxGlobalQueueSize, + ConsolidatorCache: sync2.NewConsolidatorCache(1000), + dryRun: dryRun, + maxQueueSize: maxQueueSize, + maxGlobalQueueSize: maxGlobalQueueSize, + concurrentTransactions: concurrentTransactions, log: logutil.NewThrottledLogger("HotRowProtection", 5*time.Second), logDryRun: logutil.NewThrottledLogger("HotRowProtection DryRun", 5*time.Second), logWaitsDryRun: logutil.NewThrottledLogger("HotRowProtection Waits DryRun", 5*time.Second), @@ -120,8 +122,8 @@ func (t *TxSerializer) Wait(ctx context.Context, key, table string) (done DoneFu if err != nil { if waited { // Waiting failed early e.g. due a canceled context and we did NOT get the - // token. Call "done" now because we don't return it to the caller. - t.unlockLocked(key, false /* returnToken */) + // slot. Call "done" now because we don't return it to the caller. + t.unlockLocked(key, false /* returnSlot */) } return nil, waited, err } @@ -129,13 +131,13 @@ func (t *TxSerializer) Wait(ctx context.Context, key, table string) (done DoneFu } // lockLocked queues this transaction. It will unblock immediately if this -// transaction is the first in the queue or when it got the token (queue.lock). +// transaction is the first in the queue or when it acquired a slot. // The method has the suffix "Locked" to clarify that "t.mu" must be locked. func (t *TxSerializer) lockLocked(ctx context.Context, key, table string) (bool, error) { q, ok := t.queues[key] if !ok { // First transaction in the queue i.e. we don't wait and return immediately. - t.queues[key] = newQueue(t.maxQueueSize) + t.queues[key] = newQueueForFirstTransaction(t.concurrentTransactions) t.globalSize++ return false, nil } @@ -162,6 +164,19 @@ func (t *TxSerializer) lockLocked(ctx context.Context, key, table string) (bool, } } + if q.availableSlots == nil { + // Hot row detected: A second, concurrent transaction is seen for the + // first time. + + // As an optimization, we deferred the creation of the channel until now. + q.availableSlots = make(chan struct{}, t.concurrentTransactions) + q.availableSlots <- struct{}{} + + // Include first transaction in the count at /debug/hotrows. (It was not + // recorded on purpose because it did not wait.) + t.Record(key) + } + t.globalSize++ q.size++ q.count++ @@ -170,11 +185,6 @@ func (t *TxSerializer) lockLocked(ctx context.Context, key, table string) (bool, } // Publish the number of waits at /debug/hotrows. t.Record(key) - if q.size == 2 { - // Include first transaction in the count. (It was not recorded on purpose - // because it did not wait.) - t.Record(key) - } if t.dryRun { waitsDryRun.Add(table, 1) @@ -183,13 +193,22 @@ func (t *TxSerializer) lockLocked(ctx context.Context, key, table string) (bool, } // Unlock before the wait and relock before returning because our caller - // Wait() hold the lock and assumes it still has it. + // Wait() holds the lock and assumes it still has it. t.mu.Unlock() defer t.mu.Lock() + // Non-blocking write attempt to get a slot. + select { + case q.availableSlots <- struct{}{}: + // Return waited=false because a slot was immediately available. + return false, nil + default: + } + + // Blocking wait for the next available slot. waits.Add(table, 1) select { - case <-q.lock: + case q.availableSlots <- struct{}{}: return true, nil case <-ctx.Done(): return true, ctx.Err() @@ -203,11 +222,13 @@ func (t *TxSerializer) unlock(key string) { t.unlockLocked(key, true) } -func (t *TxSerializer) unlockLocked(key string, returnToken bool) { +func (t *TxSerializer) unlockLocked(key string, returnSlot bool) { q := t.queues[key] q.size-- t.globalSize-- + if q.size == 0 { + // This is the last transaction in flight. delete(t.queues, key) if q.max > 1 { @@ -217,16 +238,33 @@ func (t *TxSerializer) unlockLocked(key string, returnToken bool) { t.log.Infof("%v simultaneous transactions (%v in total) for the same row range (%v) were queued.", q.max, q.count, key) } } + + // Return early because the queue "q" for this "key" will not be used any + // more. + // We intentionally skip returning the last slot and closing the + // "availableSlots" channel because it is not required by Go. + return } - // Return token to queue. Wakes up the next queued transaction. - if !t.dryRun && returnToken { - q.lock <- struct{}{} + // Give up slot by removing ourselves from the channel. + // Wakes up the next queued transaction. + + if t.dryRun { + // Dry-run did not acquire a slot in the first place. + return + } + + if !returnSlot { + // We did not acquire a slot in the first place e.g. due to a canceled context. + return } + + // This should never block. + <-q.availableSlots } -// Pending returns the number of queued transactions (including the one which -// is currently in flight.) +// Pending returns the number of queued transactions (including the ones which +// are currently in flight.) func (t *TxSerializer) Pending(key string) int { t.mu.Lock() defer t.mu.Unlock() @@ -238,17 +276,18 @@ func (t *TxSerializer) Pending(key string) int { return q.size } -// queue reprents the local queue for a particular row (range). +// queue represents the local queue for a particular row (range). // // Note that we don't use a dedicated queue structure for all waiting // transactions. Instead, we leverage that Go routines waiting for a channel -// are woken up in the order they are queued up. The "lock" field is said -// channel which has exactly one element, a token. All queued transactions are -// competing for this token. +// are woken up in the order they are queued up. The "availableSlots" field is +// said channel which has n free slots (for the number of concurrent +// transactions which can access the tx pool). All queued transactions are +// competing for these slots and try to add themselves to the channel. type queue struct { // NOTE: The following fields are guarded by TxSerializer.mu. - // size counts how many transactions are queued (includes the one - // transaction which is not waiting.) + // size counts how many transactions are currently queued/in flight (includes + // the transactions which are not waiting.) size int // count is the same as "size", but never gets decremented. count int @@ -256,14 +295,20 @@ type queue struct { // were simultaneously queued for the same row range. max int - lock chan struct{} + // availableSlots limits the number of concurrent transactions *per* + // hot row (range). It holds one element for each allowed pending + // transaction i.e. consumed tx pool slot. Consequently, if the channel + // is full, subsequent transactions have to wait until they can place + // their entry here. + // NOTE: As an optimization, we defer the creation of the channel until + // a second transaction for the same hot row is running. + availableSlots chan struct{} } -func newQueue(max int) *queue { +func newQueueForFirstTransaction(concurrentTransactions int) *queue { return &queue{ size: 1, count: 1, max: 1, - lock: make(chan struct{}, 1), } } diff --git a/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go b/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go index fe281e3fdeb..7e719141c30 100644 --- a/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go +++ b/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go @@ -40,9 +40,32 @@ func resetVariables() { globalQueueExceededDryRun.Set(0) } +func TestTxSerializer_NoHotRow(t *testing.T) { + resetVariables() + txs := New(false, 1, 1, 5) + + done, waited, err := txs.Wait(context.Background(), "t1 where1", "t1") + if err != nil { + t.Fatal(err) + } + if waited { + t.Fatal("non-parallel tx must never wait") + } + done() + + // No hot row was recoded. + if err := testHTTPHandler(txs, 0); err != nil { + t.Fatal(err) + } + // No transaction had to wait. + if got, want := waits.Counts()["t1"], int64(0); got != want { + t.Fatalf("wrong Waits variable: got = %v, want = %v", got, want) + } +} + func TestTxSerializer(t *testing.T) { resetVariables() - txs := New(false, 2, 3) + txs := New(false, 2, 3, 1) // tx1. done1, waited1, err1 := txs.Wait(context.Background(), "t1 where1", "t1") @@ -108,6 +131,75 @@ func TestTxSerializer(t *testing.T) { } } +func TestTxSerializer_ConcurrentTransactions(t *testing.T) { + resetVariables() + // Allow up to 2 concurrent transactions per hot row. + txs := New(false, 3, 3, 2) + + // tx1. + done1, waited1, err1 := txs.Wait(context.Background(), "t1 where1", "t1") + if err1 != nil { + t.Fatal(err1) + } + if waited1 { + t.Fatalf("tx1 must never wait: %v", waited1) + } + + // tx2. + done2, waited2, err2 := txs.Wait(context.Background(), "t1 where1", "t1") + if err2 != nil { + t.Fatal(err1) + } + if waited2 { + t.Fatalf("tx2 must not wait: %v", waited1) + } + + // tx3 (gets queued and must wait). + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + + done3, waited3, err3 := txs.Wait(context.Background(), "t1 where1", "t1") + if err3 != nil { + t.Fatal(err3) + } + if !waited3 { + t.Fatalf("tx3 must wait: %v", waited2) + } + if got, want := waits.Counts()["t1"], int64(1); got != want { + t.Fatalf("variable not incremented: got = %v, want = %v", got, want) + } + + done3() + }() + + // Wait until tx3 is waiting before we finish tx2 and unblock tx3. + if err := waitForPending(txs, "t1 where1", 3); err != nil { + t.Fatal(err) + } + // Finish tx2 before tx1 to test that the "finish-order" does not matter. + // Unblocks tx3. + done2() + // Wait for tx3 to finish. + wg.Wait() + // Finish tx1 to delete the queue object. + done1() + + if txs.queues["t1 where1"] != nil { + t.Fatal("queue object was not deleted after last transaction") + } + + // 3 transactions were recorded. + if err := testHTTPHandler(txs, 3); err != nil { + t.Fatal(err) + } + // 1 of them had to wait. + if got, want := waits.Counts()["t1"], int64(1); got != want { + t.Fatalf("variable not incremented: got = %v, want = %v", got, want) + } +} + func waitForPending(txs *TxSerializer, key string, i int) error { start := time.Now() for { @@ -137,6 +229,10 @@ func testHTTPHandler(txs *TxSerializer, count int) error { want := fmt.Sprintf(`Length: 1 %d: t1 where1 `, count) + if count == 0 { + want = `Length: 0 +` + } if got := rr.Body.String(); got != want { return fmt.Errorf("wrong content: got = \n%v\n want = \n%v", got, want) } @@ -144,13 +240,14 @@ func testHTTPHandler(txs *TxSerializer, count int) error { return nil } -// TestTxSerializerCancel runs 3 pending transactions. tx2 will get canceled -// and tx3 will be unblocked once tx1 is done. +// TestTxSerializerCancel runs 4 pending transactions. +// tx1 and tx2 are allowed to run concurrently while tx3 and tx4 are queued. +// tx3 will get canceled and tx4 will be unblocked once tx1 is done. func TestTxSerializerCancel(t *testing.T) { resetVariables() - txs := New(false, 3, 3) + txs := New(false, 4, 4, 2) - // tx2 and tx3 will record their number once they're done waiting. + // tx3 and tx4 will record their number once they're done waiting. txDone := make(chan int) // tx1. @@ -161,68 +258,77 @@ func TestTxSerializerCancel(t *testing.T) { if waited1 { t.Fatalf("tx1 must never wait: %v", waited1) } + // tx2. + done2, waited2, err2 := txs.Wait(context.Background(), "t1 where1", "t1") + if err2 != nil { + t.Fatal(err2) + } + if waited2 { + t.Fatalf("tx2 must not wait: %v", waited2) + } - // tx2 (gets queued and must wait). - ctx2, cancel2 := context.WithCancel(context.Background()) + // tx3 (gets queued and must wait). + ctx3, cancel3 := context.WithCancel(context.Background()) wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() - _, _, err2 := txs.Wait(ctx2, "t1 where1", "t1") - if err2 != context.Canceled { - t.Fatal(err2) + _, _, err3 := txs.Wait(ctx3, "t1 where1", "t1") + if err3 != context.Canceled { + t.Fatal(err3) } - txDone <- 2 + txDone <- 3 }() - // Wait until tx2 is waiting before we try tx3. - if err := waitForPending(txs, "t1 where1", 2); err != nil { + // Wait until tx3 is waiting before we try tx4. + if err := waitForPending(txs, "t1 where1", 3); err != nil { t.Fatal(err) } - // tx3 (gets queued and must wait as well). + // tx4 (gets queued and must wait as well). wg.Add(1) go func() { defer wg.Done() - done3, waited3, err3 := txs.Wait(context.Background(), "t1 where1", "t1") - if err3 != nil { - t.Fatal(err3) + done4, waited4, err4 := txs.Wait(context.Background(), "t1 where1", "t1") + if err4 != nil { + t.Fatal(err4) } - if !waited3 { - t.Fatalf("tx3 must have waited: %v", waited3) + if !waited4 { + t.Fatalf("tx4 must have waited: %v", waited4) } - txDone <- 3 + txDone <- 4 - done3() + done4() }() - // Wait until tx3 is waiting before we start to cancel tx2. - if err := waitForPending(txs, "t1 where1", 3); err != nil { + // Wait until tx4 is waiting before we start to cancel tx3. + if err := waitForPending(txs, "t1 where1", 4); err != nil { t.Fatal(err) } - // Cancel tx2. - cancel2() - if got := <-txDone; got != 2 { - t.Fatalf("tx2 should have been unblocked after the cancel: %v", got) + // Cancel tx3. + cancel3() + if got := <-txDone; got != 3 { + t.Fatalf("tx3 should have been unblocked after the cancel: %v", got) } // Finish tx1. done1() - // Wait for tx3. - if got := <-txDone; got != 3 { + // Wait for tx4. + if got := <-txDone; got != 4 { t.Fatalf("wrong tx was unblocked after tx1: %v", got) } - wg.Wait() + // Finish tx2 (the last transaction) which will delete the queue object. + done2() if txs.queues["t1 where1"] != nil { t.Fatal("queue object was not deleted after last transaction") } - // 3 total transactions get recorded. - if err := testHTTPHandler(txs, 3); err != nil { + // 4 total transactions get recorded. + if err := testHTTPHandler(txs, 4); err != nil { t.Fatal(err) } // 2 of them had to wait. @@ -235,7 +341,7 @@ func TestTxSerializerCancel(t *testing.T) { // the two concurrent transactions for the same key. func TestTxSerializerDryRun(t *testing.T) { resetVariables() - txs := New(true, 1, 2) + txs := New(true, 1, 2, 1) // tx1. done1, waited1, err1 := txs.Wait(context.Background(), "t1 where1", "t1") @@ -300,7 +406,7 @@ func TestTxSerializerDryRun(t *testing.T) { // reject transactions although they may succeed within the txpool constraints // and RPC deadline. func TestTxSerializerGlobalQueueOverflow(t *testing.T) { - txs := New(false, 1, 1 /* maxGlobalQueueSize */) + txs := New(false, 1, 1 /* maxGlobalQueueSize */, 1) // tx1. done1, waited1, err1 := txs.Wait(context.Background(), "t1 where1", "t1") @@ -337,8 +443,25 @@ func TestTxSerializerGlobalQueueOverflow(t *testing.T) { } func TestTxSerializerPending(t *testing.T) { - txs := New(false, 1, 1) + txs := New(false, 1, 1, 1) if got, want := txs.Pending("t1 where1"), 0; got != want { t.Fatalf("there should be no pending transaction: got = %v, want = %v", got, want) } } + +func BenchmarkTxSerializer_NoHotRow(b *testing.B) { + txs := New(false, 1, 1, 5) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + done, waited, err := txs.Wait(context.Background(), "t1 where1", "t1") + if err != nil { + b.Fatal(err) + } + if waited { + b.Fatal("non-parallel tx must never wait") + } + done() + } +}