Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go/vt/vttablet/tabletserver/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ func NewQueryEngine(checker connpool.MySQLChecker, se *schema.Engine, config tab

qe.consolidator = sync2.NewConsolidator()
qe.txSerializer = txserializer.New(config.EnableHotRowProtectionDryRun,
config.HotRowProtectionMaxQueueSize, config.HotRowProtectionMaxGlobalQueueSize)
config.HotRowProtectionMaxQueueSize,
config.HotRowProtectionMaxGlobalQueueSize,
config.HotRowProtectionConcurrentTransactions)
qe.streamQList = NewQueryList()

qe.autoCommit.Set(config.EnableAutoCommit)
Expand Down
16 changes: 12 additions & 4 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func init() {
flag.BoolVar(&Config.EnableHotRowProtectionDryRun, "enable_hot_row_protection_dry_run", DefaultQsConfig.EnableHotRowProtectionDryRun, "If true, hot row protection is not enforced but logs if transactions would have been queued.")
flag.IntVar(&Config.HotRowProtectionMaxQueueSize, "hot_row_protection_max_queue_size", DefaultQsConfig.HotRowProtectionMaxQueueSize, "Maximum number of BeginExecute RPCs which will be queued for the same row (range).")
flag.IntVar(&Config.HotRowProtectionMaxGlobalQueueSize, "hot_row_protection_max_global_queue_size", DefaultQsConfig.HotRowProtectionMaxGlobalQueueSize, "Global queue limit across all row (ranges). Useful to prevent that the queue can grow unbounded.")
flag.IntVar(&Config.HotRowProtectionConcurrentTransactions, "hot_row_protection_concurrent_transactions", DefaultQsConfig.HotRowProtectionConcurrentTransactions, "Number of concurrent transactions let through to the txpool/MySQL for the same hot row. Should be > 1 to have enough 'ready' transactions in MySQL and benefit from a pipelining effect.")

flag.BoolVar(&Config.HeartbeatEnable, "heartbeat_enable", DefaultQsConfig.HeartbeatEnable, "If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the table _vt.heartbeat. The result is used to inform the serving state of the vttablet via healthchecks.")
flag.DurationVar(&Config.HeartbeatInterval, "heartbeat_interval", DefaultQsConfig.HeartbeatInterval, "How frequently to read and write replication heartbeat.")
Expand Down Expand Up @@ -125,10 +126,11 @@ type TabletConfig struct {
TxThrottlerConfig string
TxThrottlerHealthCheckCells []string

EnableHotRowProtection bool
EnableHotRowProtectionDryRun bool
HotRowProtectionMaxQueueSize int
HotRowProtectionMaxGlobalQueueSize int
EnableHotRowProtection bool
EnableHotRowProtectionDryRun bool
HotRowProtectionMaxQueueSize int
HotRowProtectionMaxGlobalQueueSize int
HotRowProtectionConcurrentTransactions int

HeartbeatEnable bool
HeartbeatInterval time.Duration
Expand Down Expand Up @@ -180,6 +182,9 @@ var DefaultQsConfig = TabletConfig{
// Default value is the same as TransactionCap.
HotRowProtectionMaxQueueSize: 20,
HotRowProtectionMaxGlobalQueueSize: 1000,
// Allow more than 1 transaction for the same hot row through to have enough
// of them ready in MySQL and profit from a pipelining effect.
HotRowProtectionConcurrentTransactions: 5,

HeartbeatEnable: false,
HeartbeatInterval: 1 * time.Second,
Expand Down Expand Up @@ -218,6 +223,9 @@ func VerifyConfig() error {
if globalSize, size := Config.HotRowProtectionMaxGlobalQueueSize, Config.HotRowProtectionMaxQueueSize; globalSize < size {
return fmt.Errorf("global queue size must be >= per row (range) queue size: -hot_row_protection_max_global_queue_size < hot_row_protection_max_queue_size (%v < %v)", globalSize, size)
}
if v := Config.HotRowProtectionConcurrentTransactions; v <= 0 {
return fmt.Errorf("-hot_row_protection_concurrent_transactions must be > 0 (specified value: %v)", v)
}
return nil
}

Expand Down
138 changes: 134 additions & 4 deletions go/vt/vttablet/tabletserver/tabletserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1475,6 +1475,7 @@ func TestSerializeTransactionsSameRow(t *testing.T) {
testUtils := newTestUtils()
config := testUtils.newQueryServiceConfig()
config.EnableHotRowProtection = true
config.HotRowProtectionConcurrentTransactions = 1
// Reduce the txpool to 2 because we should never consume more than two slots.
config.TransactionCap = 2
tsv := NewTabletServerWithNilTopoServer(config)
Expand Down Expand Up @@ -1512,7 +1513,7 @@ func TestSerializeTransactionsSameRow(t *testing.T) {
db.SetBeforeFunc("update test_table set name_string = 'tx1' where pk in (1) /* _stream test_table (pk ) (1 ); */",
func() {
close(tx1Started)
if err := waitForTxSerializationCount(tsv, "test_table where pk = 1 and name = 1", 2); err != nil {
if err := waitForTxSerializationPendingQueries(tsv, "test_table where pk = 1 and name = 1", 2); err != nil {
t.Fatal(err)
}
})
Expand Down Expand Up @@ -1580,7 +1581,134 @@ func TestSerializeTransactionsSameRow(t *testing.T) {
}
}

func waitForTxSerializationCount(tsv *TabletServer, key string, i int) error {
func TestSerializeTransactionsSameRow_ConcurrentTransactions(t *testing.T) {
// This test runs three transaction in parallel:
// tx1 | tx2 | tx3
// Out of these three, two can run in parallel because we increased the
// ConcurrentTransactions limit to 2.
// One out of the three transaction will always get serialized though.
db := setUpTabletServerTest(t)
defer db.Close()
testUtils := newTestUtils()
config := testUtils.newQueryServiceConfig()
config.EnableHotRowProtection = true
config.HotRowProtectionConcurrentTransactions = 2
// Reduce the txpool to 2 because we should never consume more than two slots.
config.TransactionCap = 2
tsv := NewTabletServerWithNilTopoServer(config)
dbconfigs := testUtils.newDBConfigs(db)
target := querypb.Target{TabletType: topodatapb.TabletType_MASTER}
if err := tsv.StartService(target, dbconfigs, testUtils.newMysqld(&dbconfigs)); err != nil {
t.Fatalf("StartService failed: %v", err)
}
defer tsv.StopService()
countStart := tabletenv.WaitStats.Counts()["TxSerializer"]

// Fake data.
q1 := "update test_table set name_string = 'tx1' where pk = :pk and name = :name"
q2 := "update test_table set name_string = 'tx2' where pk = :pk and name = :name"
q3 := "update test_table set name_string = 'tx3' where pk = :pk and name = :name"
// Every request needs their own bind variables to avoid data races.
bvTx1 := map[string]*querypb.BindVariable{
"pk": sqltypes.Int64BindVariable(1),
"name": sqltypes.Int64BindVariable(1),
}
bvTx2 := map[string]*querypb.BindVariable{
"pk": sqltypes.Int64BindVariable(1),
"name": sqltypes.Int64BindVariable(1),
}
bvTx3 := map[string]*querypb.BindVariable{
"pk": sqltypes.Int64BindVariable(1),
"name": sqltypes.Int64BindVariable(1),
}

tx1Started := make(chan struct{})
allQueriesPending := make(chan struct{})
db.SetBeforeFunc("update test_table set name_string = 'tx1' where pk in (1) /* _stream test_table (pk ) (1 ); */",
func() {
close(tx1Started)
<-allQueriesPending
})

// Run all three transactions.
ctx := context.Background()
wg := sync.WaitGroup{}

// tx1.
wg.Add(1)
go func() {
defer wg.Done()

_, tx1, err := tsv.BeginExecute(ctx, &target, q1, bvTx1, nil)
if err != nil {
t.Fatalf("failed to execute query: %s: %s", q1, err)
}

if err := tsv.Commit(ctx, &target, tx1); err != nil {
t.Fatalf("call TabletServer.Commit failed: %v", err)
}
}()

// tx2.
wg.Add(1)
go func() {
defer wg.Done()

// Wait for tx1 to avoid that this tx could pass tx1, without any contention.
// In that case, we would see less than 3 pending transactions.
<-tx1Started

_, tx2, err := tsv.BeginExecute(ctx, &target, q2, bvTx2, nil)
if err != nil {
t.Fatalf("failed to execute query: %s: %s", q2, err)
}

if err := tsv.Commit(ctx, &target, tx2); err != nil {
t.Fatalf("call TabletServer.Commit failed: %v", err)
}
}()

// tx3.
wg.Add(1)
go func() {
defer wg.Done()

// Wait for tx1 to avoid that this tx could pass tx1, without any contention.
// In that case, we would see less than 3 pending transactions.
<-tx1Started

_, tx3, err := tsv.BeginExecute(ctx, &target, q3, bvTx3, nil)
if err != nil {
t.Fatalf("failed to execute query: %s: %s", q3, err)
}

if err := tsv.Commit(ctx, &target, tx3); err != nil {
t.Fatalf("call TabletServer.Commit failed: %v", err)
}
}()

// At this point, all three transactions should be blocked in BeginExecute()
// and therefore count as pending transaction by the Hot Row Protection.
//
// NOTE: We are not doing more sophisticated synchronizations between the
// transactions via db.SetBeforeFunc() for the same reason as mentioned
// in TestSerializeTransactionsSameRow: The MySQL C client does not seem
// to allow more than connection attempt at a time.
if err := waitForTxSerializationPendingQueries(tsv, "test_table where pk = 1 and name = 1", 3); err != nil {
t.Fatal(err)
}
close(allQueriesPending)

wg.Wait()

got, ok := tabletenv.WaitStats.Counts()["TxSerializer"]
want := countStart + 1
if !ok || got != want {
t.Fatalf("One out of the three transactions must have waited: ok? %v got: %v want: %v", ok, got, want)
}
}

func waitForTxSerializationPendingQueries(tsv *TabletServer, key string, i int) error {
start := time.Now()
for {
got, want := tsv.qe.txSerializer.Pending(key), i
Expand All @@ -1607,6 +1735,7 @@ func TestSerializeTransactionsSameRow_TooManyPendingRequests(t *testing.T) {
config := testUtils.newQueryServiceConfig()
config.EnableHotRowProtection = true
config.HotRowProtectionMaxQueueSize = 1
config.HotRowProtectionConcurrentTransactions = 1
tsv := NewTabletServerWithNilTopoServer(config)
dbconfigs := testUtils.newDBConfigs(db)
target := querypb.Target{TabletType: topodatapb.TabletType_MASTER}
Expand Down Expand Up @@ -1693,6 +1822,7 @@ func TestSerializeTransactionsSameRow_RequestCanceled(t *testing.T) {
testUtils := newTestUtils()
config := testUtils.newQueryServiceConfig()
config.EnableHotRowProtection = true
config.HotRowProtectionConcurrentTransactions = 1
tsv := NewTabletServerWithNilTopoServer(config)
dbconfigs := testUtils.newDBConfigs(db)
target := querypb.Target{TabletType: topodatapb.TabletType_MASTER}
Expand Down Expand Up @@ -1774,7 +1904,7 @@ func TestSerializeTransactionsSameRow_RequestCanceled(t *testing.T) {
defer wg.Done()

// Wait until tx1 and tx2 are pending to make the test deterministic.
if err := waitForTxSerializationCount(tsv, "test_table where pk = 1 and name = 1", 2); err != nil {
if err := waitForTxSerializationPendingQueries(tsv, "test_table where pk = 1 and name = 1", 2); err != nil {
t.Fatal(err)
}

Expand All @@ -1789,7 +1919,7 @@ func TestSerializeTransactionsSameRow_RequestCanceled(t *testing.T) {
}()

// Wait until tx1, 2 and 3 are pending.
if err := waitForTxSerializationCount(tsv, "test_table where pk = 1 and name = 1", 3); err != nil {
if err := waitForTxSerializationPendingQueries(tsv, "test_table where pk = 1 and name = 1", 3); err != nil {
t.Fatal(err)
}
// Now unblock tx2 and cancel it.
Expand Down
Loading