diff --git a/config/tablet/default.yaml b/config/tablet/default.yaml index ad88e320871..3321e3f17ff 100644 --- a/config/tablet/default.yaml +++ b/config/tablet/default.yaml @@ -117,6 +117,7 @@ cacheResultFields: true # enable-query-plan-field-caching # enable-tx-throttler # tx-throttler-config # tx-throttler-healthcheck-cells +# tx-throttler-tablet-types # enable_transaction_limit # enable_transaction_limit_dry_run # transaction_limit_per_user diff --git a/doc/ReplicationLagBasedThrottlingOfTransactions.md b/doc/ReplicationLagBasedThrottlingOfTransactions.md index ad1d98b151f..68686d4f72f 100644 --- a/doc/ReplicationLagBasedThrottlingOfTransactions.md +++ b/doc/ReplicationLagBasedThrottlingOfTransactions.md @@ -30,7 +30,13 @@ If this is not specified a [default](https://github.com/vitessio/vitess/tree/mai * *tx-throttler-healthcheck-cells* A comma separated list of datacenter cells. The throttler will only monitor -the non-RDONLY replicas found in these cells for replication lag. +the replicas found in these cells for replication lag. + +* *tx-throttler-tablet-types* + +A comma separated list of tablet types. The throttler will only monitor tablets +with these types. Only `replica` and/or `rdonly` types are supported. The default +is `replica`. # Caveats and Known Issues * The throttler keeps trying to explore the maximum rate possible while keeping @@ -39,4 +45,3 @@ lag limit may occasionally be slightly violated. * Transactions are considered homogeneous. There is currently no support for specifying how `expensive` a transaction is. - diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 0dc87f7d96c..085dafde315 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -93,6 +93,7 @@ Usage of vttablet: --enable-consolidator Synonym to -enable_consolidator (default true) --enable-consolidator-replicas Synonym to -enable_consolidator_replicas --enable-lag-throttler Synonym to -enable_lag_throttler + --enable-per-workload-table-metrics If true, query counts and query error metrics include a label that identifies the workload --enable-tx-throttler Synonym to -enable_tx_throttler --enable_consolidator This option enables the query consolidator. (default true) --enable_consolidator_replicas This option enables the query consolidator only on replicas. @@ -341,9 +342,13 @@ Usage of vttablet: --twopc_abandon_age float time in seconds. Any unresolved transaction older than this time will be sent to the coordinator to be resolved. --twopc_coordinator_address string address of the (VTGate) process(es) that will be used to notify of abandoned transactions. --twopc_enable if the flag is on, 2pc is enabled. Other 2pc flags must be supplied. - --tx-throttler-config string Synonym to -tx_throttler_config (default "target_replication_lag_sec: 2\nmax_replication_lag_sec: 10\ninitial_rate: 100\nmax_increase: 1\nemergency_decrease: 0.5\nmin_duration_between_increases_sec: 40\nmax_duration_between_increases_sec: 62\nmin_duration_between_decreases_sec: 20\nspread_backlog_across_sec: 20\nage_bad_rate_after_sec: 180\nbad_rate_increase: 0.1\nmax_rate_approach_threshold: 0.9\n") + --tx-throttler-config string Synonym to -tx_throttler_config (default "target_replication_lag_sec:2 max_replication_lag_sec:10 initial_rate:100 max_increase:1 emergency_decrease:0.5 min_duration_between_increases_sec:40 max_duration_between_increases_sec:62 min_duration_between_decreases_sec:20 spread_backlog_across_sec:20 age_bad_rate_after_sec:180 bad_rate_increase:0.1 max_rate_approach_threshold:0.9") + --tx-throttler-default-priority int Default priority assigned to queries that lack priority information (default 100) + --tx-throttler-dry-run If present, the transaction throttler only records metrics about requests received and throttled, but does not actually throttle any requests. --tx-throttler-healthcheck-cells strings Synonym to -tx_throttler_healthcheck_cells - --tx_throttler_config string The configuration of the transaction throttler as a text formatted throttlerdata.Configuration protocol buffer message (default "target_replication_lag_sec: 2\nmax_replication_lag_sec: 10\ninitial_rate: 100\nmax_increase: 1\nemergency_decrease: 0.5\nmin_duration_between_increases_sec: 40\nmax_duration_between_increases_sec: 62\nmin_duration_between_decreases_sec: 20\nspread_backlog_across_sec: 20\nage_bad_rate_after_sec: 180\nbad_rate_increase: 0.1\nmax_rate_approach_threshold: 0.9\n") + --tx-throttler-tablet-types strings A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly. (default replica) + --tx-throttler-topo-refresh-interval duration The rate that the transaction throttler will refresh the topology to find cells. (default 5m0s) + --tx_throttler_config string The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message. (default "target_replication_lag_sec:2 max_replication_lag_sec:10 initial_rate:100 max_increase:1 emergency_decrease:0.5 min_duration_between_increases_sec:40 max_duration_between_increases_sec:62 min_duration_between_decreases_sec:20 spread_backlog_across_sec:20 age_bad_rate_after_sec:180 bad_rate_increase:0.1 max_rate_approach_threshold:0.9") --tx_throttler_healthcheck_cells strings A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler. --unhealthy_threshold duration replication lag after which a replica is considered unhealthy (default 2h0m0s) --use_super_read_only Set super_read_only flag when performing planned failover. diff --git a/go/flagutil/flagutil.go b/go/flagutil/flagutil.go index d010ea0bc4f..c9c8973ace7 100644 --- a/go/flagutil/flagutil.go +++ b/go/flagutil/flagutil.go @@ -193,6 +193,17 @@ func DualFormatBoolVar(fs *pflag.FlagSet, p *bool, name string, value bool, usag } } +// DualFormatVar creates a flag which supports both dashes and underscores +func DualFormatVar(fs *pflag.FlagSet, val pflag.Value, name string, usage string) { + dashes := strings.Replace(name, "_", "-", -1) + underscores := strings.Replace(name, "-", "_", -1) + + fs.Var(val, underscores, usage) + if dashes != underscores { + fs.Var(val, dashes, fmt.Sprintf("Synonym to -%s", underscores)) + } +} + // DurationOrIntVar implements pflag.Value for flags that have historically been // of type IntVar (and then converted to seconds or some other unit) but are // now transitioning to a proper DurationVar type. diff --git a/go/vt/proto/query/query.pb.go b/go/vt/proto/query/query.pb.go index b6da31e8183..8b4bd0a49ef 100644 --- a/go/vt/proto/query/query.pb.go +++ b/go/vt/proto/query/query.pb.go @@ -1178,6 +1178,12 @@ type ExecuteOptions struct { // if the user has created temp tables, Vitess will not reuse plans created for this session in other sessions. // The current session can still use other sessions cached plans. HasCreatedTempTables bool `protobuf:"varint,12,opt,name=has_created_temp_tables,json=hasCreatedTempTables,proto3" json:"has_created_temp_tables,omitempty"` + // WorkloadName specifies the name of the workload as indicated in query directives. This is used for instrumentation + // in metrics and tracing spans. + WorkloadName string `protobuf:"bytes,15,opt,name=WorkloadName,proto3" json:"WorkloadName,omitempty"` + // priority specifies the priority of the query, between 0 and 100. This is leveraged by the transaction + // throttler to determine whether, under resource contention, a query should or should not be throttled. + Priority string `protobuf:"bytes,16,opt,name=priority,proto3" json:"priority,omitempty"` } func (x *ExecuteOptions) Reset() { @@ -1268,6 +1274,20 @@ func (x *ExecuteOptions) GetHasCreatedTempTables() bool { return false } +func (x *ExecuteOptions) GetWorkloadName() string { + if x != nil { + return x.WorkloadName + } + return "" +} + +func (x *ExecuteOptions) GetPriority() string { + if x != nil { + return x.Priority + } + return "" +} + // Field describes a single column returned by a query type Field struct { state protoimpl.MessageState @@ -5368,7 +5388,7 @@ var file_query_proto_rawDesc = []byte{ 0x12, 0x29, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x42, 0x69, 0x6e, 0x64, 0x56, 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, - 0xc5, 0x07, 0x0a, 0x0e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, + 0x85, 0x08, 0x0a, 0x0e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x4d, 0x0a, 0x0f, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x64, 0x5f, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x24, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, @@ -5400,7 +5420,11 @@ var file_query_proto_rawDesc = []byte{ 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x35, 0x0a, 0x17, 0x68, 0x61, 0x73, 0x5f, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x74, 0x65, 0x6d, 0x70, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x08, 0x52, 0x14, 0x68, 0x61, 0x73, 0x43, 0x72, 0x65, 0x61, - 0x74, 0x65, 0x64, 0x54, 0x65, 0x6d, 0x70, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x22, 0x3b, 0x0a, + 0x74, 0x65, 0x64, 0x54, 0x65, 0x6d, 0x70, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x22, 0x0a, + 0x0c, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x0f, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0c, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x4e, 0x61, 0x6d, + 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x10, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x22, 0x3b, 0x0a, 0x0e, 0x49, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x64, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x12, 0x11, 0x0a, 0x0d, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x41, 0x4e, 0x44, 0x5f, 0x4e, 0x41, 0x4d, 0x45, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4f, 0x4e, 0x4c, 0x59, 0x10, diff --git a/go/vt/proto/query/query_vtproto.pb.go b/go/vt/proto/query/query_vtproto.pb.go index 7ed228cb100..6f205943dca 100644 --- a/go/vt/proto/query/query_vtproto.pb.go +++ b/go/vt/proto/query/query_vtproto.pb.go @@ -377,6 +377,22 @@ func (m *ExecuteOptions) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if len(m.Priority) > 0 { + i -= len(m.Priority) + copy(dAtA[i:], m.Priority) + i = encodeVarint(dAtA, i, uint64(len(m.Priority))) + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0x82 + } + if len(m.WorkloadName) > 0 { + i -= len(m.WorkloadName) + copy(dAtA[i:], m.WorkloadName) + i = encodeVarint(dAtA, i, uint64(len(m.WorkloadName))) + i-- + dAtA[i] = 0x7a + } if m.HasCreatedTempTables { i-- if m.HasCreatedTempTables { @@ -4275,6 +4291,14 @@ func (m *ExecuteOptions) SizeVT() (n int) { if m.HasCreatedTempTables { n += 2 } + l = len(m.WorkloadName) + if l > 0 { + n += 1 + l + sov(uint64(l)) + } + l = len(m.Priority) + if l > 0 { + n += 2 + l + sov(uint64(l)) + } if m.unknownFields != nil { n += len(m.unknownFields) } @@ -6836,6 +6860,70 @@ func (m *ExecuteOptions) UnmarshalVT(dAtA []byte) error { } } m.HasCreatedTempTables = bool(v != 0) + case 15: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field WorkloadName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.WorkloadName = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 16: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Priority", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Priority = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skip(dAtA[iNdEx:]) diff --git a/go/vt/sqlparser/comments.go b/go/vt/sqlparser/comments.go index 0faebe09f4b..413e95d5c40 100644 --- a/go/vt/sqlparser/comments.go +++ b/go/vt/sqlparser/comments.go @@ -20,6 +20,9 @@ import ( "strconv" "strings" "unicode" + + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" ) const ( @@ -44,8 +47,19 @@ const ( DirectiveQueryPlanner = "PLANNER" // DirectiveVtexplainRunDMLQueries tells explain format = vtexplain that it is okay to also run the query. DirectiveVtexplainRunDMLQueries = "EXECUTE_DML_QUERIES" + // DirectiveWorkloadName specifies the name of the client application workload issuing the query. + DirectiveWorkloadName = "WORKLOAD_NAME" + // DirectivePriority specifies the priority of a workload. It should be an integer between 0 and MaxPriorityValue, + // where 0 is the highest priority, and MaxPriorityValue is the lowest one. + DirectivePriority = "PRIORITY" + + // MaxPriorityValue specifies the maximum value allowed for the priority query directive. Valid priority values are + // between zero and MaxPriorityValue. + MaxPriorityValue = 100 ) +var ErrInvalidPriority = vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Invalid priority value specified in query") + func isNonSpace(r rune) bool { return !unicode.IsSpace(r) } @@ -378,3 +392,42 @@ func AllowScatterDirective(stmt Statement) bool { } return comments != nil && comments.Directives().IsSet(DirectiveAllowScatter) } + +// GetPriorityFromStatement gets the priority from the provided Statement, using DirectivePriority +func GetPriorityFromStatement(statement Statement) (string, error) { + commentedStatement, ok := statement.(Commented) + // This would mean that the statement lacks comments, so we can't obtain the workload from it. Hence default to + // empty priority + if !ok { + return "", nil + } + + directives := commentedStatement.GetParsedComments().Directives() + priority, ok := directives.GetString(DirectivePriority, "") + if !ok || priority == "" { + return "", nil + } + + intPriority, err := strconv.Atoi(priority) + if err != nil || intPriority < 0 || intPriority > MaxPriorityValue { + return "", ErrInvalidPriority + } + + return priority, nil +} + +// GetWorkloadNameFromStatement gets the workload name from the provided Statement, using workloadLabel as the name of +// the query directive that specifies it. +func GetWorkloadNameFromStatement(statement Statement) string { + commentedStatement, ok := statement.(Commented) + // This would mean that the statement lacks comments, so we can't obtain the workload from it. Hence default to + // empty workload name + if !ok { + return "" + } + + directives := commentedStatement.GetParsedComments().Directives() + workloadName, _ := directives.GetString(DirectiveWorkloadName, "") + + return workloadName +} diff --git a/go/vt/sqlparser/comments_test.go b/go/vt/sqlparser/comments_test.go index 4906b9fbcd7..b839f78df66 100644 --- a/go/vt/sqlparser/comments_test.go +++ b/go/vt/sqlparser/comments_test.go @@ -468,3 +468,68 @@ func TestIgnoreMaxMaxMemoryRowsDirective(t *testing.T) { }) } } + +func TestGetPriorityFromStatement(t *testing.T) { + testCases := []struct { + query string + expectedPriority string + expectedError error + }{ + { + query: "select * from a_table", + expectedPriority: "", + expectedError: nil, + }, + { + query: "select /*vt+ ANOTHER_DIRECTIVE=324 */ * from another_table", + expectedPriority: "", + expectedError: nil, + }, + { + query: "select /*vt+ PRIORITY=33 */ * from another_table", + expectedPriority: "33", + expectedError: nil, + }, + { + query: "select /*vt+ PRIORITY=200 */ * from another_table", + expectedPriority: "", + expectedError: ErrInvalidPriority, + }, + { + query: "select /*vt+ PRIORITY=-1 */ * from another_table", + expectedPriority: "", + expectedError: ErrInvalidPriority, + }, + { + query: "select /*vt+ PRIORITY=some_text */ * from another_table", + expectedPriority: "", + expectedError: ErrInvalidPriority, + }, + { + query: "select /*vt+ PRIORITY=0 */ * from another_table", + expectedPriority: "0", + expectedError: nil, + }, + { + query: "select /*vt+ PRIORITY=100 */ * from another_table", + expectedPriority: "100", + expectedError: nil, + }, + } + + for _, testCase := range testCases { + theThestCase := testCase + t.Run(theThestCase.query, func(t *testing.T) { + t.Parallel() + stmt, err := Parse(theThestCase.query) + assert.NoError(t, err) + actualPriority, actualError := GetPriorityFromStatement(stmt) + if theThestCase.expectedError != nil { + assert.ErrorIs(t, actualError, theThestCase.expectedError) + } else { + assert.NoError(t, err) + assert.Equal(t, theThestCase.expectedPriority, actualPriority) + } + }) + } +} diff --git a/go/vt/throttler/throttler.go b/go/vt/throttler/throttler.go index 03a20013396..83a1c52225e 100644 --- a/go/vt/throttler/throttler.go +++ b/go/vt/throttler/throttler.go @@ -130,19 +130,31 @@ func NewThrottler(name, unit string, threadCount int, maxRate, maxReplicationLag return newThrottler(GlobalManager, name, unit, threadCount, maxRate, maxReplicationLag, time.Now) } +func NewThrottlerFromConfig(name, unit string, threadCount int, maxRateModuleMaxRate int64, maxReplicationLagModuleConfig MaxReplicationLagModuleConfig, nowFunc func() time.Time) (*Throttler, error) { + return newThrottlerFromConfig(GlobalManager, name, unit, threadCount, maxRateModuleMaxRate, maxReplicationLagModuleConfig, nowFunc) +} + func newThrottler(manager *managerImpl, name, unit string, threadCount int, maxRate, maxReplicationLag int64, nowFunc func() time.Time) (*Throttler, error) { - // Verify input parameters. - if maxRate < 0 { - return nil, fmt.Errorf("maxRate must be >= 0: %v", maxRate) + config := NewMaxReplicationLagModuleConfig(maxReplicationLag) + config.MaxReplicationLagSec = maxReplicationLag + + return newThrottlerFromConfig(manager, name, unit, threadCount, maxRate, config, nowFunc) + +} + +func newThrottlerFromConfig(manager *managerImpl, name, unit string, threadCount int, maxRateModuleMaxRate int64, maxReplicationLagModuleConfig MaxReplicationLagModuleConfig, nowFunc func() time.Time) (*Throttler, error) { + err := maxReplicationLagModuleConfig.Verify() + if err != nil { + return nil, fmt.Errorf("invalid max replication lag config: %w", err) } - if maxReplicationLag < 0 { - return nil, fmt.Errorf("maxReplicationLag must be >= 0: %v", maxReplicationLag) + if maxRateModuleMaxRate < 0 { + return nil, fmt.Errorf("maxRate must be >= 0: %v", maxRateModuleMaxRate) } // Enable the configured modules. - maxRateModule := NewMaxRateModule(maxRate) + maxRateModule := NewMaxRateModule(maxRateModuleMaxRate) actualRateHistory := newAggregatedIntervalHistory(1024, 1*time.Second, threadCount) - maxReplicationLagModule, err := NewMaxReplicationLagModule(NewMaxReplicationLagModuleConfig(maxReplicationLag), actualRateHistory, nowFunc) + maxReplicationLagModule, err := NewMaxReplicationLagModule(maxReplicationLagModuleConfig, actualRateHistory, nowFunc) if err != nil { return nil, err } diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index c452c44e210..90cb611bbea 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -248,10 +248,18 @@ func (t *noopVCursor) SetWorkload(querypb.ExecuteOptions_Workload) { panic("implement me") } +func (t *noopVCursor) SetWorkloadName(string) { + panic("implement me") +} + func (t *noopVCursor) SetPlannerVersion(querypb.ExecuteOptions_PlannerVersion) { panic("implement me") } +func (t *noopVCursor) SetPriority(string) { + panic("implement me") +} + func (t *noopVCursor) SetTarget(string) error { panic("implement me") } @@ -669,10 +677,18 @@ func (f *loggingVCursor) SetWorkload(querypb.ExecuteOptions_Workload) { panic("implement me") } +func (f *loggingVCursor) SetWorkloadName(string) { + panic("implement me") +} + func (f *loggingVCursor) SetPlannerVersion(querypb.ExecuteOptions_PlannerVersion) { panic("implement me") } +func (f *loggingVCursor) SetPriority(string) { + panic("implement me") +} + func (f *loggingVCursor) FindRoutedTable(tbl sqlparser.TableName) (*vindexes.Table, error) { f.log = append(f.log, fmt.Sprintf("FindTable(%s)", sqlparser.String(tbl))) return f.tableRoutes.tbl, nil diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index dd1aa219ed4..0dfd0e30ff0 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -144,6 +144,8 @@ type ( SetTransactionMode(vtgatepb.TransactionMode) SetWorkload(querypb.ExecuteOptions_Workload) SetPlannerVersion(querypb.ExecuteOptions_PlannerVersion) + SetWorkloadName(string) + SetPriority(string) SetFoundRows(uint64) SetDDLStrategy(string) diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 8214361da4c..7c0e4eceed2 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -985,6 +985,12 @@ func (e *Executor) getPlan(ctx context.Context, vcursor *vcursorImpl, sql string } ignoreMaxMemoryRows := sqlparser.IgnoreMaxMaxMemoryRowsDirective(stmt) vcursor.SetIgnoreMaxMemoryRows(ignoreMaxMemoryRows) + vcursor.SetWorkloadName(sqlparser.GetWorkloadNameFromStatement(stmt)) + priority, err := sqlparser.GetPriorityFromStatement(stmt) + if err != nil { + return nil, err + } + vcursor.SetPriority(priority) setVarComment, err := prepareSetVarComment(vcursor, stmt) if err != nil { diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index 6e80f3841aa..5ec116d8503 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -1826,6 +1826,48 @@ func TestGetPlanNormalized(t *testing.T) { assertCacheContains(t, r, want) } +func TestGetPlanPriority(t *testing.T) { + + testCases := []struct { + name string + sql string + expectedPriority string + expectedError error + }{ + {name: "Invalid priority", sql: "select /*vt+ PRIORITY=something */ * from music_user_map", expectedPriority: "", expectedError: sqlparser.ErrInvalidPriority}, + {name: "Valid priority", sql: "select /*vt+ PRIORITY=33 */ * from music_user_map", expectedPriority: "33", expectedError: nil}, + {name: "empty priority", sql: "select * from music_user_map", expectedPriority: "", expectedError: nil}, + } + + session := NewSafeSession(&vtgatepb.Session{TargetString: "@unknown", Options: &querypb.ExecuteOptions{}}) + + for _, aTestCase := range testCases { + testCase := aTestCase + + t.Run(testCase.name, func(t *testing.T) { + r, _, _, _ := createExecutorEnv() + r.normalize = true + logStats := logstats.NewLogStats(ctx, "Test", "", "", nil) + vCursor, err := newVCursorImpl(session, makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + assert.NoError(t, err) + + stmt, err := sqlparser.Parse(testCase.sql) + assert.NoError(t, err) + crticalityFromStatement, _ := sqlparser.GetPriorityFromStatement(stmt) + + _, err = r.getPlan(context.Background(), vCursor, testCase.sql, makeComments("/* some comment */"), map[string]*querypb.BindVariable{}, nil, logStats) + if testCase.expectedError != nil { + assert.ErrorIs(t, err, testCase.expectedError) + } else { + assert.NoError(t, err) + assert.Equal(t, testCase.expectedPriority, crticalityFromStatement) + assert.Equal(t, testCase.expectedPriority, vCursor.safeSession.Options.Priority) + } + }) + } + +} + func TestPassthroughDDL(t *testing.T) { executor, sbc1, sbc2, _ := createExecutorEnv() primarySession.TargetString = "TestExecutor" diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index 25632cdd319..2f225db410b 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -786,6 +786,21 @@ func (vc *vcursorImpl) SetPlannerVersion(v plancontext.PlannerVersion) { vc.safeSession.GetOrCreateOptions().PlannerVersion = v } +func (vc *vcursorImpl) SetPriority(priority string) { + if priority != "" { + vc.safeSession.GetOrCreateOptions().Priority = priority + } else if vc.safeSession.Options != nil && vc.safeSession.Options.Priority != "" { + vc.safeSession.Options.Priority = "" + } + +} + +func (vc *vcursorImpl) SetWorkloadName(workloadName string) { + if workloadName != "" { + vc.safeSession.GetOrCreateOptions().WorkloadName = workloadName + } +} + // SetFoundRows implements the SessionActions interface func (vc *vcursorImpl) SetFoundRows(foundRows uint64) { vc.safeSession.FoundRows = foundRows diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go index 4de993dcfb8..65c9960ce69 100644 --- a/go/vt/vttablet/tabletserver/query_engine.go +++ b/go/vt/vttablet/tabletserver/query_engine.go @@ -173,6 +173,9 @@ type QueryEngine struct { // stats queryCounts, queryTimes, queryRowCounts, queryErrorCounts, queryRowsAffected, queryRowsReturned *stats.CountersWithMultiLabels + // stats flags + enablePerWorkloadTableMetrics bool + // Loggers accessCheckerLogger *logutil.ThrottledLogger } @@ -189,11 +192,12 @@ func NewQueryEngine(env tabletenv.Env, se *schema.Engine) *QueryEngine { } qe := &QueryEngine{ - env: env, - se: se, - tables: make(map[string]*schema.Table), - plans: cache.NewDefaultCacheImpl(cacheCfg), - queryRuleSources: rules.NewMap(), + env: env, + se: se, + tables: make(map[string]*schema.Table), + plans: cache.NewDefaultCacheImpl(cacheCfg), + queryRuleSources: rules.NewMap(), + enablePerWorkloadTableMetrics: config.EnablePerWorkloadTableMetrics, } qe.conns = connpool.NewPool(env, "ConnPool", config.OltpReadPool) @@ -246,12 +250,19 @@ func NewQueryEngine(env tabletenv.Env, se *schema.Engine) *QueryEngine { env.Exporter().NewGaugeFunc("QueryCacheSize", "Query engine query cache size", qe.plans.UsedCapacity) env.Exporter().NewGaugeFunc("QueryCacheCapacity", "Query engine query cache capacity", qe.plans.MaxCapacity) env.Exporter().NewCounterFunc("QueryCacheEvictions", "Query engine query cache evictions", qe.plans.Evictions) - qe.queryCounts = env.Exporter().NewCountersWithMultiLabels("QueryCounts", "query counts", []string{"Table", "Plan"}) - qe.queryTimes = env.Exporter().NewCountersWithMultiLabels("QueryTimesNs", "query times in ns", []string{"Table", "Plan"}) - qe.queryRowCounts = env.Exporter().NewCountersWithMultiLabels("QueryRowCounts", "(DEPRECATED - use QueryRowsAffected and QueryRowsReturned instead) query row counts", []string{"Table", "Plan"}) - qe.queryRowsAffected = env.Exporter().NewCountersWithMultiLabels("QueryRowsAffected", "query rows affected", []string{"Table", "Plan"}) - qe.queryRowsReturned = env.Exporter().NewCountersWithMultiLabels("QueryRowsReturned", "query rows returned", []string{"Table", "Plan"}) - qe.queryErrorCounts = env.Exporter().NewCountersWithMultiLabels("QueryErrorCounts", "query error counts", []string{"Table", "Plan"}) + + labels := []string{"Table", "Plan"} + if config.EnablePerWorkloadTableMetrics { + labels = []string{"Table", "Plan", "Workload"} + } + + qe.queryCounts = env.Exporter().NewCountersWithMultiLabels("QueryCounts", "query counts", labels) + qe.queryTimes = env.Exporter().NewCountersWithMultiLabels("QueryTimesNs", "query times in ns", labels) + qe.queryRowCounts = env.Exporter().NewCountersWithMultiLabels("QueryRowCounts", "(DEPRECATED - use QueryRowsAffected and QueryRowsReturned instead) query row counts", labels) + + qe.queryRowsAffected = env.Exporter().NewCountersWithMultiLabels("QueryRowsAffected", "query rows affected", labels) + qe.queryRowsReturned = env.Exporter().NewCountersWithMultiLabels("QueryRowsReturned", "query rows returned", labels) + qe.queryErrorCounts = env.Exporter().NewCountersWithMultiLabels("QueryErrorCounts", "query error counts", labels) env.Exporter().HandleFunc("/debug/hotrows", qe.txSerializer.ServeHTTP) env.Exporter().HandleFunc("/debug/tablet_plans", qe.handleHTTPQueryPlans) @@ -479,9 +490,13 @@ func (qe *QueryEngine) QueryPlanCacheLen() int { } // AddStats adds the given stats for the planName.tableName -func (qe *QueryEngine) AddStats(planType planbuilder.PlanType, tableName string, queryCount int64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount int64) { +func (qe *QueryEngine) AddStats(planType planbuilder.PlanType, tableName, workload string, queryCount int64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount int64) { // table names can contain "." characters, replace them! keys := []string{tableName, planType.String()} + // Only use the workload as a label if that's enabled in the configuration. + if qe.enablePerWorkloadTableMetrics { + keys = append(keys, workload) + } qe.queryCounts.Add(keys, queryCount) qe.queryTimes.Add(keys, int64(duration)) qe.queryRowCounts.Add(keys, rowsAffected) diff --git a/go/vt/vttablet/tabletserver/query_engine_test.go b/go/vt/vttablet/tabletserver/query_engine_test.go index c03af9186b3..10cb0d382cd 100644 --- a/go/vt/vttablet/tabletserver/query_engine_test.go +++ b/go/vt/vttablet/tabletserver/query_engine_test.go @@ -574,82 +574,160 @@ func TestPlanCachePollution(t *testing.T) { func TestAddQueryStats(t *testing.T) { testcases := []struct { - name string - planType planbuilder.PlanType - tableName string - queryCount int64 - duration time.Duration - mysqlTime time.Duration - rowsAffected int64 - rowsReturned int64 - errorCount int64 - expectedQueryCounts string - expectedQueryTimes string - expectedQueryRowsAffected string - expectedQueryRowsReturned string - expectedQueryRowCounts string - expectedQueryErrorCounts string + name string + planType planbuilder.PlanType + tableName string + queryCount int64 + duration time.Duration + mysqlTime time.Duration + rowsAffected int64 + rowsReturned int64 + errorCount int64 + enablePerWorkloadTableMetrics bool + workload string + expectedQueryCounts string + expectedQueryTimes string + expectedQueryRowsAffected string + expectedQueryRowsReturned string + expectedQueryRowCounts string + expectedQueryErrorCounts string }{ { - name: "select query", - planType: planbuilder.PlanSelect, - tableName: "A", - queryCount: 1, - duration: 10, - rowsAffected: 0, - rowsReturned: 15, - errorCount: 0, - expectedQueryCounts: `{"A.Select": 1}`, - expectedQueryTimes: `{"A.Select": 10}`, - expectedQueryRowsAffected: `{}`, - expectedQueryRowsReturned: `{"A.Select": 15}`, - expectedQueryRowCounts: `{"A.Select": 0}`, - expectedQueryErrorCounts: `{"A.Select": 0}`, + name: "select query", + planType: planbuilder.PlanSelect, + tableName: "A", + queryCount: 1, + duration: 10, + rowsAffected: 0, + rowsReturned: 15, + errorCount: 0, + enablePerWorkloadTableMetrics: false, + workload: "some-workload", + expectedQueryCounts: `{"A.Select": 1}`, + expectedQueryTimes: `{"A.Select": 10}`, + expectedQueryRowsAffected: `{}`, + expectedQueryRowsReturned: `{"A.Select": 15}`, + expectedQueryRowCounts: `{"A.Select": 0}`, + expectedQueryErrorCounts: `{"A.Select": 0}`, }, { - name: "select into query", - planType: planbuilder.PlanSelect, - tableName: "A", - queryCount: 1, - duration: 10, - rowsAffected: 15, - rowsReturned: 0, - errorCount: 0, - expectedQueryCounts: `{"A.Select": 1}`, - expectedQueryTimes: `{"A.Select": 10}`, - expectedQueryRowsAffected: `{"A.Select": 15}`, - expectedQueryRowsReturned: `{"A.Select": 0}`, - expectedQueryRowCounts: `{"A.Select": 15}`, - expectedQueryErrorCounts: `{"A.Select": 0}`, + name: "select into query", + planType: planbuilder.PlanSelect, + tableName: "A", + queryCount: 1, + duration: 10, + rowsAffected: 15, + rowsReturned: 0, + errorCount: 0, + enablePerWorkloadTableMetrics: false, + workload: "some-workload", + expectedQueryCounts: `{"A.Select": 1}`, + expectedQueryTimes: `{"A.Select": 10}`, + expectedQueryRowsAffected: `{"A.Select": 15}`, + expectedQueryRowsReturned: `{"A.Select": 0}`, + expectedQueryRowCounts: `{"A.Select": 15}`, + expectedQueryErrorCounts: `{"A.Select": 0}`, }, { - name: "error", - planType: planbuilder.PlanSelect, - tableName: "A", - queryCount: 1, - duration: 10, - rowsAffected: 0, - rowsReturned: 0, - errorCount: 1, - expectedQueryCounts: `{"A.Select": 1}`, - expectedQueryTimes: `{"A.Select": 10}`, - expectedQueryRowsAffected: `{}`, - expectedQueryRowsReturned: `{"A.Select": 0}`, - expectedQueryRowCounts: `{"A.Select": 0}`, - expectedQueryErrorCounts: `{"A.Select": 1}`, + name: "error", + planType: planbuilder.PlanSelect, + tableName: "A", + queryCount: 1, + duration: 10, + rowsAffected: 0, + rowsReturned: 0, + errorCount: 1, + enablePerWorkloadTableMetrics: false, + workload: "some-workload", + expectedQueryCounts: `{"A.Select": 1}`, + expectedQueryTimes: `{"A.Select": 10}`, + expectedQueryRowsAffected: `{}`, + expectedQueryRowsReturned: `{"A.Select": 0}`, + expectedQueryRowCounts: `{"A.Select": 0}`, + expectedQueryErrorCounts: `{"A.Select": 1}`, }, { - name: "insert query", - planType: planbuilder.PlanInsert, - tableName: "A", - queryCount: 1, - duration: 10, - rowsAffected: 15, - rowsReturned: 0, - errorCount: 0, - expectedQueryCounts: `{"A.Insert": 1}`, - expectedQueryTimes: `{"A.Insert": 10}`, - expectedQueryRowsAffected: `{"A.Insert": 15}`, - expectedQueryRowsReturned: `{}`, - expectedQueryRowCounts: `{"A.Insert": 15}`, - expectedQueryErrorCounts: `{"A.Insert": 0}`, + name: "insert query", + planType: planbuilder.PlanInsert, + tableName: "A", + queryCount: 1, + duration: 10, + rowsAffected: 15, + rowsReturned: 0, + errorCount: 0, + enablePerWorkloadTableMetrics: false, + workload: "some-workload", + expectedQueryCounts: `{"A.Insert": 1}`, + expectedQueryTimes: `{"A.Insert": 10}`, + expectedQueryRowsAffected: `{"A.Insert": 15}`, + expectedQueryRowsReturned: `{}`, + expectedQueryRowCounts: `{"A.Insert": 15}`, + expectedQueryErrorCounts: `{"A.Insert": 0}`, + }, { + name: "select query with per workload metrics", + planType: planbuilder.PlanSelect, + tableName: "A", + queryCount: 1, + duration: 10, + rowsAffected: 0, + rowsReturned: 15, + errorCount: 0, + enablePerWorkloadTableMetrics: true, + workload: "some-workload", + expectedQueryCounts: `{"A.Select.some-workload": 1}`, + expectedQueryTimes: `{"A.Select.some-workload": 10}`, + expectedQueryRowsAffected: `{}`, + expectedQueryRowsReturned: `{"A.Select.some-workload": 15}`, + expectedQueryRowCounts: ``, + expectedQueryErrorCounts: `{"A.Select.some-workload": 0}`, + }, { + name: "select into query with per workload metrics", + planType: planbuilder.PlanSelect, + tableName: "A", + queryCount: 1, + duration: 10, + rowsAffected: 15, + rowsReturned: 0, + errorCount: 0, + enablePerWorkloadTableMetrics: true, + workload: "some-workload", + expectedQueryCounts: `{"A.Select.some-workload": 1}`, + expectedQueryTimes: `{"A.Select.some-workload": 10}`, + expectedQueryRowsAffected: `{"A.Select.some-workload": 15}`, + expectedQueryRowsReturned: `{"A.Select.some-workload": 0}`, + expectedQueryRowCounts: `{"A.Select.some-workload": 15}`, + expectedQueryErrorCounts: `{"A.Select.some-workload": 0}`, + }, { + name: "error with per workload metrics", + planType: planbuilder.PlanSelect, + tableName: "A", + queryCount: 1, + duration: 10, + rowsAffected: 0, + rowsReturned: 0, + errorCount: 1, + enablePerWorkloadTableMetrics: true, + workload: "some-workload", + expectedQueryCounts: `{"A.Select.some-workload": 1}`, + expectedQueryTimes: `{"A.Select.some-workload": 10}`, + expectedQueryRowsAffected: `{}`, + expectedQueryRowsReturned: `{"A.Select.some-workload": 0}`, + expectedQueryRowCounts: `{}`, + expectedQueryErrorCounts: `{"A.Select.some-workload": 1}`, + }, { + name: "insert query with per workload metrics", + planType: planbuilder.PlanInsert, + tableName: "A", + queryCount: 1, + duration: 10, + rowsAffected: 15, + rowsReturned: 0, + errorCount: 0, + enablePerWorkloadTableMetrics: true, + workload: "some-workload", + expectedQueryCounts: `{"A.Insert.some-workload": 1}`, + expectedQueryTimes: `{"A.Insert.some-workload": 10}`, + expectedQueryRowsAffected: `{"A.Insert.some-workload": 15}`, + expectedQueryRowsReturned: `{}`, + expectedQueryRowCounts: `{"A.Insert.some-workload": 15}`, + expectedQueryErrorCounts: `{"A.Insert.some-workload": 0}`, }, } @@ -658,10 +736,11 @@ func TestAddQueryStats(t *testing.T) { t.Run(testcase.name, func(t *testing.T) { config := tabletenv.NewDefaultConfig() config.DB = newDBConfigs(fakesqldb.New(t)) + config.EnablePerWorkloadTableMetrics = testcase.enablePerWorkloadTableMetrics env := tabletenv.NewEnv(config, "TestAddQueryStats_"+testcase.name) se := schema.NewEngine(env) qe := NewQueryEngine(env, se) - qe.AddStats(testcase.planType, testcase.tableName, testcase.queryCount, testcase.duration, testcase.mysqlTime, testcase.rowsAffected, testcase.rowsReturned, testcase.errorCount) + qe.AddStats(testcase.planType, testcase.tableName, testcase.workload, testcase.queryCount, testcase.duration, testcase.mysqlTime, testcase.rowsAffected, testcase.rowsReturned, testcase.errorCount) assert.Equal(t, testcase.expectedQueryCounts, qe.queryCounts.String()) assert.Equal(t, testcase.expectedQueryTimes, qe.queryTimes.String()) assert.Equal(t, testcase.expectedQueryRowsAffected, qe.queryRowsAffected.String()) diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 4c85c947bac..8e730475c43 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -62,6 +62,7 @@ type QueryExecutor struct { tsv *TabletServer tabletType topodatapb.TabletType setting *pools.Setting + workload string } const ( @@ -75,6 +76,8 @@ var streamResultPool = sync.Pool{New: func() any { } }} +var errTxThrottled = vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "Transaction throttled") + func returnStreamResult(result *sqltypes.Result) error { // only return large results slices to the pool if cap(result.Rows) >= streamRowsSize { @@ -118,11 +121,11 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) { } if reply == nil { - qre.tsv.qe.AddStats(qre.plan.PlanID, tableName, 1, duration, mysqlTime, 0, 0, 1) + qre.tsv.qe.AddStats(qre.plan.PlanID, tableName, qre.options.GetWorkloadName(), 1, duration, mysqlTime, 0, 0, 1) qre.plan.AddStats(1, duration, mysqlTime, 0, 0, 1) return } - qre.tsv.qe.AddStats(qre.plan.PlanID, tableName, 1, duration, mysqlTime, int64(reply.RowsAffected), int64(len(reply.Rows)), 0) + qre.tsv.qe.AddStats(qre.plan.PlanID, tableName, qre.options.GetWorkloadName(), 1, duration, mysqlTime, int64(reply.RowsAffected), int64(len(reply.Rows)), 0) qre.plan.AddStats(1, duration, mysqlTime, reply.RowsAffected, uint64(len(reply.Rows)), 0) qre.logStats.RowsAffected = int(reply.RowsAffected) qre.logStats.Rows = reply.Rows @@ -202,6 +205,10 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt } qre.options.TransactionIsolation = querypb.ExecuteOptions_AUTOCOMMIT + if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options), qre.options.GetWorkloadName()) { + return nil, errTxThrottled + } + conn, _, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.setting) if err != nil { @@ -213,6 +220,9 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt } func (qre *QueryExecutor) execAsTransaction(f func(conn *StatefulConnection) (*sqltypes.Result, error)) (*sqltypes.Result, error) { + if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options), qre.options.GetWorkloadName()) { + return nil, errTxThrottled + } conn, beginSQL, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.setting) if err != nil { return nil, err diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index 526419c4f49..934d933ecac 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -17,6 +17,7 @@ limitations under the License. package tabletserver import ( + "context" "fmt" "io" "math/rand" @@ -24,10 +25,6 @@ import ( "strings" "testing" - "vitess.io/vitess/go/vt/vttablet/tabletserver/tx" - - "context" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -44,6 +41,8 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder" "vitess.io/vitess/go/vt/vttablet/tabletserver/rules" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tx" + "vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler" querypb "vitess.io/vitess/go/vt/proto/query" tableaclpb "vitess.io/vitess/go/vt/proto/tableacl" @@ -81,6 +80,10 @@ func TestQueryExecutorPlans(t *testing.T) { // inTxWant is the query log we expect if we're in a transation. // If empty, then we should expect the same as logWant. inTxWant string + // errorWant is the error we expect to get, if any, and should be nil if no error should be returned + errorWant error + // TxThrottler allows the test case to override the transaction throttler + txThrottler txthrottler.TxThrottler }{{ input: "select * from t", dbResponses: []dbResponse{{ @@ -267,7 +270,25 @@ func TestQueryExecutorPlans(t *testing.T) { resultWant: emptyResult, planWant: "Show", logWant: "show create table mysql.`user`", - }} + }, { + input: "update test_table set a=1", + dbResponses: []dbResponse{{ + query: "update test_table set a = 1 limit 10001", + result: dmlResult, + }}, + errorWant: errTxThrottled, + txThrottler: &mockTxThrottler{true}, + }, { + input: "update test_table set a=1", + passThrough: true, + dbResponses: []dbResponse{{ + query: "update test_table set a = 1 limit 10001", + result: dmlResult, + }}, + errorWant: errTxThrottled, + txThrottler: &mockTxThrottler{true}, + }, + } for _, tcase := range testcases { t.Run(tcase.input, func(t *testing.T) { db := setUpQueryExecutorTest(t) @@ -277,6 +298,9 @@ func TestQueryExecutorPlans(t *testing.T) { } ctx := context.Background() tsv := newTestTabletServer(ctx, noFlags, db) + if tcase.txThrottler != nil { + tsv.txThrottler = tcase.txThrottler + } tsv.config.DB.DBName = "ks" defer tsv.StopService() @@ -285,32 +309,39 @@ func TestQueryExecutorPlans(t *testing.T) { // Test outside a transaction. qre := newTestQueryExecutor(ctx, tsv, tcase.input, 0) got, err := qre.Execute() - require.NoError(t, err, tcase.input) - assert.Equal(t, tcase.resultWant, got, tcase.input) - assert.Equal(t, tcase.planWant, qre.logStats.PlanType, tcase.input) - assert.Equal(t, tcase.logWant, qre.logStats.RewrittenSQL(), tcase.input) - + if tcase.errorWant == nil { + require.NoError(t, err, tcase.input) + assert.Equal(t, tcase.resultWant, got, tcase.input) + assert.Equal(t, tcase.planWant, qre.logStats.PlanType, tcase.input) + assert.Equal(t, tcase.logWant, qre.logStats.RewrittenSQL(), tcase.input) + } else { + assert.True(t, vterrors.Equals(err, tcase.errorWant)) + } // Wait for the existing query to be processed by the cache tsv.QueryPlanCacheWait() // Test inside a transaction. target := tsv.sm.Target() state, err := tsv.Begin(ctx, target, nil) - require.NoError(t, err) - require.NotNil(t, state.TabletAlias, "alias should not be nil") - assert.Equal(t, tsv.alias, state.TabletAlias, "Wrong alias returned by Begin") - defer tsv.Commit(ctx, target, state.TransactionID) - - qre = newTestQueryExecutor(ctx, tsv, tcase.input, state.TransactionID) - got, err = qre.Execute() - require.NoError(t, err, tcase.input) - assert.Equal(t, tcase.resultWant, got, "in tx: %v", tcase.input) - assert.Equal(t, tcase.planWant, qre.logStats.PlanType, "in tx: %v", tcase.input) - want := tcase.logWant - if tcase.inTxWant != "" { - want = tcase.inTxWant + if tcase.errorWant == nil { + require.NoError(t, err) + require.NotNil(t, state.TabletAlias, "alias should not be nil") + assert.Equal(t, tsv.alias, state.TabletAlias, "Wrong alias returned by Begin") + defer tsv.Commit(ctx, target, state.TransactionID) + + qre = newTestQueryExecutor(ctx, tsv, tcase.input, state.TransactionID) + got, err = qre.Execute() + require.NoError(t, err, tcase.input) + assert.Equal(t, tcase.resultWant, got, "in tx: %v", tcase.input) + assert.Equal(t, tcase.planWant, qre.logStats.PlanType, "in tx: %v", tcase.input) + want := tcase.logWant + if tcase.inTxWant != "" { + want = tcase.inTxWant + } + assert.Equal(t, want, qre.logStats.RewrittenSQL(), "in tx: %v", tcase.input) + } else { + assert.True(t, vterrors.Equals(err, tcase.errorWant)) } - assert.Equal(t, want, qre.logStats.RewrittenSQL(), "in tx: %v", tcase.input) }) } } @@ -1540,3 +1571,22 @@ func addQueryExecutorSupportedQueries(db *fakesqldb.DB) { }}, }) } + +type mockTxThrottler struct { + throttle bool +} + +func (m mockTxThrottler) InitDBConfig(target *querypb.Target) { + panic("implement me") +} + +func (m mockTxThrottler) Open() (err error) { + return nil +} + +func (m mockTxThrottler) Close() { +} + +func (m mockTxThrottler) Throttle(priority int, workload string) (result bool) { + return m.throttle +} diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index df2ab5f23e5..fc622538e58 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -30,9 +30,16 @@ import ( "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" - querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/throttler" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vterrors" + + querypb "vitess.io/vitess/go/vt/proto/query" + throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) // These constants represent values for various config parameters. @@ -83,6 +90,24 @@ var ( txLogHandler = "/debug/txlog" ) +type TxThrottlerConfigFlag struct { + *throttlerdatapb.Configuration +} + +func NewTxThrottlerConfigFlag() *TxThrottlerConfigFlag { + return &TxThrottlerConfigFlag{&throttlerdatapb.Configuration{}} +} + +func (t *TxThrottlerConfigFlag) Get() *throttlerdatapb.Configuration { + return t.Configuration +} + +func (t *TxThrottlerConfigFlag) Set(arg string) error { + return prototext.Unmarshal([]byte(arg), t.Configuration) +} + +func (t *TxThrottlerConfigFlag) Type() string { return "string" } + // RegisterTabletEnvFlags is a public API to register tabletenv flags for use by test cases that expect // some flags to be set with default values func RegisterTabletEnvFlags(fs *pflag.FlagSet) { @@ -136,9 +161,14 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) { fs.BoolVar(¤tConfig.TwoPCEnable, "twopc_enable", defaultConfig.TwoPCEnable, "if the flag is on, 2pc is enabled. Other 2pc flags must be supplied.") fs.StringVar(¤tConfig.TwoPCCoordinatorAddress, "twopc_coordinator_address", defaultConfig.TwoPCCoordinatorAddress, "address of the (VTGate) process(es) that will be used to notify of abandoned transactions.") SecondsVar(fs, ¤tConfig.TwoPCAbandonAge, "twopc_abandon_age", defaultConfig.TwoPCAbandonAge, "time in seconds. Any unresolved transaction older than this time will be sent to the coordinator to be resolved.") + // Tx throttler config flagutil.DualFormatBoolVar(fs, ¤tConfig.EnableTxThrottler, "enable_tx_throttler", defaultConfig.EnableTxThrottler, "If true replication-lag-based throttling on transactions will be enabled.") - flagutil.DualFormatStringVar(fs, ¤tConfig.TxThrottlerConfig, "tx_throttler_config", defaultConfig.TxThrottlerConfig, "The configuration of the transaction throttler as a text formatted throttlerdata.Configuration protocol buffer message") + flagutil.DualFormatVar(fs, currentConfig.TxThrottlerConfig, "tx_throttler_config", "The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message.") flagutil.DualFormatStringListVar(fs, ¤tConfig.TxThrottlerHealthCheckCells, "tx_throttler_healthcheck_cells", defaultConfig.TxThrottlerHealthCheckCells, "A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.") + fs.IntVar(¤tConfig.TxThrottlerDefaultPriority, "tx-throttler-default-priority", defaultConfig.TxThrottlerDefaultPriority, "Default priority assigned to queries that lack priority information") + fs.Var(currentConfig.TxThrottlerTabletTypes, "tx-throttler-tablet-types", "A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly.") + fs.BoolVar(¤tConfig.TxThrottlerDryRun, "tx-throttler-dry-run", defaultConfig.TxThrottlerDryRun, "If present, the transaction throttler only records metrics about requests received and throttled, but does not actually throttle any requests.") + fs.DurationVar(¤tConfig.TxThrottlerTopoRefreshInterval, "tx-throttler-topo-refresh-interval", time.Minute*5, "The rate that the transaction throttler will refresh the topology to find cells.") fs.BoolVar(&enableHotRowProtection, "enable_hot_row_protection", false, "If true, incoming transactions for the same row (range) will be queued and cannot consume all txpool slots.") fs.BoolVar(&enableHotRowProtectionDryRun, "enable_hot_row_protection_dry_run", false, "If true, hot row protection is not enforced but logs if transactions would have been queued.") @@ -180,6 +210,8 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) { fs.Int64Var(¤tConfig.RowStreamer.MaxInnoDBTrxHistLen, "vreplication_copy_phase_max_innodb_history_list_length", 1000000, "The maximum InnoDB transaction history that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet.") fs.Int64Var(¤tConfig.RowStreamer.MaxMySQLReplLagSecs, "vreplication_copy_phase_max_mysql_replication_lag", 43200, "The maximum MySQL replication lag (in seconds) that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet.") + + fs.BoolVar(¤tConfig.EnablePerWorkloadTableMetrics, "enable-per-workload-table-metrics", defaultConfig.EnablePerWorkloadTableMetrics, "If true, query counts and query error metrics include a label that identifies the workload") } var ( @@ -305,9 +337,13 @@ type TabletConfig struct { TwoPCCoordinatorAddress string `json:"-"` TwoPCAbandonAge Seconds `json:"-"` - EnableTxThrottler bool `json:"-"` - TxThrottlerConfig string `json:"-"` - TxThrottlerHealthCheckCells []string `json:"-"` + EnableTxThrottler bool `json:"-"` + TxThrottlerConfig *TxThrottlerConfigFlag `json:"-"` + TxThrottlerHealthCheckCells []string `json:"-"` + TxThrottlerDefaultPriority int `json:"-"` + TxThrottlerTabletTypes *topoproto.TabletTypeListFlag `json:"-"` + TxThrottlerTopoRefreshInterval time.Duration `json:"-"` + TxThrottlerDryRun bool `json:"-"` EnableLagThrottler bool `json:"-"` @@ -318,6 +354,8 @@ type TabletConfig struct { EnableSettingsPool bool `json:"-"` RowStreamer RowStreamerConfig `json:"rowStreamer,omitempty"` + + EnablePerWorkloadTableMetrics bool `json:"-"` } // ConnPoolConfig contains the config for a conn pool. @@ -441,6 +479,9 @@ func (c *TabletConfig) Verify() error { if err := c.verifyTransactionLimitConfig(); err != nil { return err } + if err := c.verifyTxThrottlerConfig(); err != nil { + return err + } if v := c.HotRowProtection.MaxQueueSize; v <= 0 { return fmt.Errorf("-hot_row_protection_max_queue_size must be > 0 (specified value: %v)", v) } @@ -486,6 +527,36 @@ func (c *TabletConfig) verifyTransactionLimitConfig() error { return nil } +// verifyTxThrottlerConfig checks the TxThrottler related config for sanity. +func (c *TabletConfig) verifyTxThrottlerConfig() error { + if !c.EnableTxThrottler { + return nil + } + + err := throttler.MaxReplicationLagModuleConfig{Configuration: c.TxThrottlerConfig.Get()}.Verify() + if err != nil { + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "failed to parse throttlerdatapb.Configuration config: %v", err) + } + + if v := c.TxThrottlerDefaultPriority; v > sqlparser.MaxPriorityValue || v < 0 { + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--tx-throttler-default-priority must be > 0 and < 100 (specified value: %d)", v) + } + + if c.TxThrottlerTabletTypes == nil || len(*c.TxThrottlerTabletTypes) == 0 { + return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "--tx-throttler-tablet-types must be defined when transaction throttler is enabled") + } + for _, tabletType := range *c.TxThrottlerTabletTypes { + switch tabletType { + case topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY: + continue + default: + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported tablet type %q", tabletType) + } + } + + return nil +} + // Some of these values are for documentation purposes. // They actually get overwritten during Init. var defaultConfig = TabletConfig{ @@ -549,9 +620,13 @@ var defaultConfig = TabletConfig{ DeprecatedCacheResultFields: true, SignalWhenSchemaChange: true, - EnableTxThrottler: false, - TxThrottlerConfig: defaultTxThrottlerConfig(), - TxThrottlerHealthCheckCells: []string{}, + EnableTxThrottler: false, + TxThrottlerConfig: defaultTxThrottlerConfig(), + TxThrottlerHealthCheckCells: []string{}, + TxThrottlerDefaultPriority: sqlparser.MaxPriorityValue, // This leads to all queries being candidates to throttle + TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA}, + TxThrottlerDryRun: false, + TxThrottlerTopoRefreshInterval: time.Minute * 5, EnableLagThrottler: false, // Feature flag; to switch to 'true' at some stage in the future @@ -564,19 +639,20 @@ var defaultConfig = TabletConfig{ MaxInnoDBTrxHistLen: 1000000, MaxMySQLReplLagSecs: 43200, }, + + EnablePerWorkloadTableMetrics: false, } -// defaultTxThrottlerConfig formats the default throttlerdata.Configuration -// object in text format. It uses the object returned by -// throttler.DefaultMaxReplicationLagModuleConfig().Configuration and overrides some of its -// fields. It panics on error. -func defaultTxThrottlerConfig() string { +// defaultTxThrottlerConfig returns the default TxThrottlerConfigFlag object based on +// a throttler.DefaultMaxReplicationLagModuleConfig().Configuration and overrides some of +// its fields. It panics on error. +func defaultTxThrottlerConfig() *TxThrottlerConfigFlag { // Take throttler.DefaultMaxReplicationLagModuleConfig and override some fields. config := throttler.DefaultMaxReplicationLagModuleConfig().Configuration // TODO(erez): Make DefaultMaxReplicationLagModuleConfig() return a MaxReplicationLagSec of 10 // and remove this line. config.MaxReplicationLagSec = 10 - return prototext.Format(config) + return &TxThrottlerConfigFlag{config} } func defaultTransactionLimitConfig() TransactionLimitConfig { diff --git a/go/vt/vttablet/tabletserver/tabletenv/config_test.go b/go/vt/vttablet/tabletserver/tabletenv/config_test.go index 3235cf31fd2..1eae5218d2a 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config_test.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config_test.go @@ -26,7 +26,13 @@ import ( "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/dbconfigs" + "vitess.io/vitess/go/vt/throttler" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/yaml2" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) func TestConfigParse(t *testing.T) { @@ -320,3 +326,127 @@ func TestFlags(t *testing.T) { want.SanitizeLogMessages = true assert.Equal(t, want, currentConfig) } + +func TestTxThrottlerConfigFlag(t *testing.T) { + f := NewTxThrottlerConfigFlag() + defaultMaxReplicationLagModuleConfig := throttler.DefaultMaxReplicationLagModuleConfig().Configuration + + { + assert.Nil(t, f.Set(defaultMaxReplicationLagModuleConfig.String())) + assert.Equal(t, defaultMaxReplicationLagModuleConfig.String(), f.String()) + assert.Equal(t, "string", f.Type()) + } + { + defaultMaxReplicationLagModuleConfig.TargetReplicationLagSec = 5 + assert.Nil(t, f.Set(defaultMaxReplicationLagModuleConfig.String())) + assert.NotNil(t, f.Get()) + assert.Equal(t, int64(5), f.Get().TargetReplicationLagSec) + } + { + assert.NotNil(t, f.Set("should not parse")) + } +} + +func TestVerifyTxThrottlerConfig(t *testing.T) { + defaultMaxReplicationLagModuleConfig := throttler.DefaultMaxReplicationLagModuleConfig().Configuration + invalidMaxReplicationLagModuleConfig := throttler.DefaultMaxReplicationLagModuleConfig().Configuration + invalidMaxReplicationLagModuleConfig.TargetReplicationLagSec = -1 + + type testConfig struct { + Name string + ExpectedErrorCode vtrpcpb.Code + // + EnableTxThrottler bool + TxThrottlerConfig *TxThrottlerConfigFlag + TxThrottlerHealthCheckCells []string + TxThrottlerTabletTypes *topoproto.TabletTypeListFlag + TxThrottlerDefaultPriority int + } + + tests := []testConfig{ + { + // default (disabled) + Name: "default", + EnableTxThrottler: false, + }, + { + // enabled with invalid throttler config + Name: "enabled invalid config", + ExpectedErrorCode: vtrpcpb.Code_INVALID_ARGUMENT, + EnableTxThrottler: true, + TxThrottlerConfig: &TxThrottlerConfigFlag{invalidMaxReplicationLagModuleConfig}, + }, + { + // enabled with good config (default/replica tablet type) + Name: "enabled", + EnableTxThrottler: true, + TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}, + TxThrottlerHealthCheckCells: []string{"cell1"}, + }, + { + // enabled + replica and rdonly tablet types + Name: "enabled plus rdonly", + EnableTxThrottler: true, + TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}, + TxThrottlerHealthCheckCells: []string{"cell1"}, + TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{ + topodatapb.TabletType_REPLICA, + topodatapb.TabletType_RDONLY, + }, + }, + { + // enabled without tablet types + Name: "enabled without tablet types", + ExpectedErrorCode: vtrpcpb.Code_FAILED_PRECONDITION, + EnableTxThrottler: true, + TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}, + TxThrottlerHealthCheckCells: []string{"cell1"}, + TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{}, + }, + { + // enabled + disallowed tablet type + Name: "enabled disallowed tablet type", + ExpectedErrorCode: vtrpcpb.Code_INVALID_ARGUMENT, + EnableTxThrottler: true, + TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}, + TxThrottlerHealthCheckCells: []string{"cell1"}, + TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{topodatapb.TabletType_DRAINED}, + }, + { + // enabled + disallowed priority + Name: "enabled disallowed priority", + ExpectedErrorCode: vtrpcpb.Code_INVALID_ARGUMENT, + EnableTxThrottler: true, + TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}, + TxThrottlerDefaultPriority: 12345, + TxThrottlerHealthCheckCells: []string{"cell1"}, + }, + } + + for _, test := range tests { + test := test + t.Run(test.Name, func(t *testing.T) { + t.Parallel() + + config := defaultConfig + config.EnableTxThrottler = test.EnableTxThrottler + if test.TxThrottlerConfig == nil { + test.TxThrottlerConfig = NewTxThrottlerConfigFlag() + } + config.TxThrottlerConfig = test.TxThrottlerConfig + config.TxThrottlerHealthCheckCells = test.TxThrottlerHealthCheckCells + config.TxThrottlerDefaultPriority = test.TxThrottlerDefaultPriority + if test.TxThrottlerTabletTypes != nil { + config.TxThrottlerTabletTypes = test.TxThrottlerTabletTypes + } + + err := config.verifyTxThrottlerConfig() + if test.ExpectedErrorCode == vtrpcpb.Code_OK { + assert.Nil(t, err) + } else { + assert.NotNil(t, err) + assert.Equal(t, test.ExpectedErrorCode, vterrors.Code(err)) + } + }) + } +} diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index c8959d5db7e..cc3160fe1a9 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -112,7 +112,7 @@ type TabletServer struct { tracker *schema.Tracker watcher *BinlogWatcher qe *QueryEngine - txThrottler *txthrottler.TxThrottler + txThrottler txthrottler.TxThrottler te *TxEngine messager *messager.Engine hs *healthStreamer @@ -179,7 +179,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to tsv.tracker = schema.NewTracker(tsv, tsv.vstreamer, tsv.se) tsv.watcher = NewBinlogWatcher(tsv, tsv.vstreamer, tsv.config) tsv.qe = NewQueryEngine(tsv, tsv.se) - tsv.txThrottler = txthrottler.NewTxThrottler(tsv.config, topoServer) + tsv.txThrottler = txthrottler.NewTxThrottler(tsv, topoServer) tsv.te = NewTxEngine(tsv) tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer) @@ -488,8 +488,8 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save target, options, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { startTime := time.Now() - if tsv.txThrottler.Throttle() { - return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "Transaction throttled") + if tsv.txThrottler.Throttle(tsv.getPriorityFromOptions(options), options.GetWorkloadName()) { + return errTxThrottled } var connSetting *pools.Setting if len(settings) > 0 { @@ -520,6 +520,30 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save return state, err } +func (tsv *TabletServer) getPriorityFromOptions(options *querypb.ExecuteOptions) int { + priority := tsv.config.TxThrottlerDefaultPriority + if options == nil { + return priority + } + if options.Priority == "" { + return priority + } + + optionsPriority, err := strconv.Atoi(options.Priority) + // This should never error out, as the value for Priority has been validated in the vtgate already. + // Still, handle it just to make sure. + if err != nil { + log.Errorf( + "The value of the %s query directive could not be converted to integer, using the "+ + "default value. Error was: %s", + sqlparser.DirectivePriority, priority, err) + + return priority + } + + return optionsPriority +} + // Commit commits the specified transaction. func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, transactionID int64) (newReservedID int64, err error) { err = tsv.execRequest( @@ -746,6 +770,7 @@ func (tsv *TabletServer) execute(ctx context.Context, target *querypb.Target, sq bindVariables = make(map[string]*querypb.BindVariable) } query, comments := sqlparser.SplitMarginComments(sql) + plan, err := tsv.qe.GetPlan(ctx, logStats, query, skipQueryPlanCache(options)) if err != nil { return err @@ -1403,6 +1428,7 @@ func (tsv *TabletServer) execRequest( span, ctx := trace.NewSpan(ctx, "TabletServer."+requestName) if options != nil { span.Annotate("isolation-level", options.TransactionIsolation) + span.Annotate("workload_name", options.WorkloadName) } trace.AnnotateSQL(span, sqlparser.Preview(sql)) if target != nil { @@ -1410,6 +1436,7 @@ func (tsv *TabletServer) execRequest( span.Annotate("shard", target.Shard) span.Annotate("keyspace", target.Keyspace) } + defer span.Finish() logStats := tabletenv.NewLogStats(ctx, requestName) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 5b724ca97cf..923c860d184 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -17,19 +17,19 @@ limitations under the License. package txthrottler import ( - "fmt" + "context" + "math/rand" + "reflect" "strings" "sync" "time" "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/encoding/prototext" - - "context" - + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/throttler" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" @@ -39,7 +39,81 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) -// TxThrottler throttles transactions based on replication lag. +// These vars store the functions used to create the topo server, healthcheck, +// topology watchers and go/vt/throttler. These are provided here so that they can be overridden +// in tests to generate mocks. +type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck +type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface +type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) + +var ( + healthCheckFactory healthCheckFactoryFunc + topologyWatcherFactory topologyWatcherFactoryFunc + throttlerFactory throttlerFactoryFunc +) + +func resetTxThrottlerFactories() { + healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck { + return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ",")) + } + topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface { + return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, discovery.NewFilterByKeyspace([]string{keyspace}), cell, refreshInterval, true, topoReadConcurrency) + } + throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { + return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now) + } +} + +func init() { + resetTxThrottlerFactories() +} + +// TxThrottler defines the interface for the transaction throttler. +type TxThrottler interface { + InitDBConfig(target *querypb.Target) + Open() (err error) + Close() + Throttle(priority int, workload string) (result bool) +} + +// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler +// It is only used here to allow mocking out a throttler object. +type ThrottlerInterface interface { + Throttle(threadID int) time.Duration + ThreadFinished(threadID int) + Close() + MaxRate() int64 + SetMaxRate(rate int64) + RecordReplicationLag(time time.Time, th *discovery.TabletHealth) + GetConfiguration() *throttlerdatapb.Configuration + UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error + ResetConfiguration() +} + +// TopologyWatcherInterface defines the public interface that is implemented by +// discovery.LegacyTopologyWatcher. It is only used here to allow mocking out +// go/vt/discovery.LegacyTopologyWatcher. +type TopologyWatcherInterface interface { + Start() + Stop() +} + +// TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with +// go/vt/throttler.GlobalManager. +const TxThrottlerName = "TransactionThrottler" + +// fetchKnownCells gathers a list of known cells from the topology. On error, +// the cell of the local tablet will be used and an error is logged. +func fetchKnownCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) []string { + cells, err := topoServer.GetKnownCells(ctx) + if err != nil { + log.Errorf("txThrottler: falling back to local cell due to error fetching cells from topology: %+v", err) + cells = []string{target.Cell} + } + return cells +} + +// txThrottler implements TxThrottle for throttling transactions based on replication lag. // It's a thin wrapper around the throttler found in vitess/go/vt/throttler. // It uses a discovery.HealthCheck to send replication-lag updates to the wrapped throttler. // @@ -63,187 +137,109 @@ import ( // // To release the resources used by the throttler the caller should call Close(). // t.Close() // -// A TxThrottler object is generally not thread-safe: at any given time at most one goroutine should +// A txThrottler object is generally not thread-safe: at any given time at most one goroutine should // be executing a method. The only exception is the 'Throttle' method where multiple goroutines are // allowed to execute it concurrently. -type TxThrottler struct { - // config stores the transaction throttler's configuration. - // It is populated in NewTxThrottler and is not modified - // since. - config *txThrottlerConfig +type txThrottler struct { + config *tabletenv.TabletConfig // state holds an open transaction throttler state. It is nil // if the TransactionThrottler is closed. - state *txThrottlerState - - target *querypb.Target -} - -// NewTxThrottler tries to construct a TxThrottler from the -// relevant fields in the tabletenv.Config object. It returns a disabled TxThrottler if -// any error occurs. -// This function calls tryCreateTxThrottler that does the actual creation work -// and returns an error if one occurred. -func NewTxThrottler(config *tabletenv.TabletConfig, topoServer *topo.Server) *TxThrottler { - txThrottler, err := tryCreateTxThrottler(config, topoServer) - if err != nil { - log.Errorf("Error creating transaction throttler. Transaction throttling will"+ - " be disabled. Error: %v", err) - txThrottler, err = newTxThrottler(&txThrottlerConfig{enabled: false}) - if err != nil { - panic("BUG: Can't create a disabled transaction throttler") - } - } else { - log.Infof("Initialized transaction throttler with config: %+v", txThrottler.config) - } - return txThrottler -} - -// InitDBConfig initializes the target parameters for the throttler. -func (t *TxThrottler) InitDBConfig(target *querypb.Target) { - t.target = proto.Clone(target).(*querypb.Target) -} - -func tryCreateTxThrottler(config *tabletenv.TabletConfig, topoServer *topo.Server) (*TxThrottler, error) { - if !config.EnableTxThrottler { - return newTxThrottler(&txThrottlerConfig{enabled: false}) - } - - var throttlerConfig throttlerdatapb.Configuration - if err := prototext.Unmarshal([]byte(config.TxThrottlerConfig), &throttlerConfig); err != nil { - return nil, err - } - - // Clone tsv.TxThrottlerHealthCheckCells so that we don't assume tsv.TxThrottlerHealthCheckCells - // is immutable. - healthCheckCells := make([]string, len(config.TxThrottlerHealthCheckCells)) - copy(healthCheckCells, config.TxThrottlerHealthCheckCells) - - return newTxThrottler(&txThrottlerConfig{ - enabled: true, - topoServer: topoServer, - throttlerConfig: &throttlerConfig, - healthCheckCells: healthCheckCells, - }) + state txThrottlerState + + target *querypb.Target + topoServer *topo.Server + + // stats + throttlerRunning *stats.Gauge + topoWatchers *stats.GaugesWithSingleLabel + healthChecksReadTotal *stats.CountersWithMultiLabels + healthChecksRecordedTotal *stats.CountersWithMultiLabels + requestsTotal *stats.CountersWithSingleLabel + requestsThrottled *stats.CountersWithSingleLabel } -// txThrottlerConfig holds the parameters that need to be -// passed when constructing a TxThrottler object. -type txThrottlerConfig struct { - // enabled is true if the transaction throttler is enabled. All methods - // of a disabled transaction throttler do nothing and Throttle() always - // returns false. - enabled bool - - topoServer *topo.Server - throttlerConfig *throttlerdatapb.Configuration - // healthCheckCells stores the cell names in which running vttablets will be monitored for - // replication lag. - healthCheckCells []string -} - -// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler -// It is only used here to allow mocking out a throttler object. -type ThrottlerInterface interface { - Throttle(threadID int) time.Duration - ThreadFinished(threadID int) - Close() - MaxRate() int64 - SetMaxRate(rate int64) - RecordReplicationLag(time time.Time, th *discovery.TabletHealth) - GetConfiguration() *throttlerdatapb.Configuration - UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error - ResetConfiguration() +type txThrottlerState interface { + deallocateResources() + StatsUpdate(tabletStats *discovery.TabletHealth) + throttle() bool } -// TopologyWatcherInterface defines the public interface that is implemented by -// discovery.LegacyTopologyWatcher. It is only used here to allow mocking out -// go/vt/discovery.LegacyTopologyWatcher. -type TopologyWatcherInterface interface { - Start() - Stop() -} +// txThrottlerStateImpl holds the state of an open TxThrottler object. +type txThrottlerStateImpl struct { + config *tabletenv.TabletConfig + txThrottler *txThrottler -// txThrottlerState holds the state of an open TxThrottler object. -type txThrottlerState struct { // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). // That method is required to be called in serial for each threadId. - throttleMu sync.Mutex - throttler ThrottlerInterface - stopHealthCheck context.CancelFunc + throttleMu sync.Mutex + throttler ThrottlerInterface + stopHealthCheck context.CancelFunc + topologyWatchers map[string]TopologyWatcherInterface healthCheck discovery.HealthCheck - topologyWatchers []TopologyWatcherInterface -} - -// These vars store the functions used to create the topo server, healthcheck, -// topology watchers and go/vt/throttler. These are provided here so that they can be overridden -// in tests to generate mocks. -type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck -type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface -type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (ThrottlerInterface, error) - -var ( - healthCheckFactory healthCheckFactoryFunc - topologyWatcherFactory topologyWatcherFactoryFunc - throttlerFactory throttlerFactoryFunc -) + healthCheckChan chan *discovery.TabletHealth + healthCheckCells []string + cellsFromTopo bool -func init() { - resetTxThrottlerFactories() + // tabletTypes stores the tablet types for throttling + tabletTypes map[topodatapb.TabletType]bool } -func resetTxThrottlerFactories() { - healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck { - return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ",")) - } - topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface { - return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, discovery.NewFilterByKeyspace([]string{keyspace}), cell, refreshInterval, true, topoReadConcurrency) +// NewTxThrottler tries to construct a txThrottler from the relevant +// fields in the tabletenv.Env and topo.Server objects. +func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler { + config := env.Config() + if config.EnableTxThrottler { + if len(config.TxThrottlerHealthCheckCells) == 0 { + defer log.Infof("Initialized transaction throttler using tabletTypes: %+v, cellsFromTopo: true, topoRefreshInterval: %s, throttlerConfig: %q", + config.TxThrottlerTabletTypes, config.TxThrottlerTopoRefreshInterval, config.TxThrottlerConfig.Get(), + ) + } else { + defer log.Infof("Initialized transaction throttler using tabletTypes: %+v, healthCheckCells: %+v, throttlerConfig: %q", + config.TxThrottlerTabletTypes, config.TxThrottlerHealthCheckCells, config.TxThrottlerConfig.Get(), + ) + } } - throttlerFactory = func(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (ThrottlerInterface, error) { - return throttler.NewThrottler(name, unit, threadCount, maxRate, maxReplicationLag) + + return &txThrottler{ + config: config, + topoServer: topoServer, + throttlerRunning: env.Exporter().NewGauge(TxThrottlerName+"Running", "transaction throttler running state"), + topoWatchers: env.Exporter().NewGaugesWithSingleLabel(TxThrottlerName+"TopoWatchers", "transaction throttler topology watchers", "cell"), + healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRead", "transaction throttler healthchecks read", + []string{"cell", "DbType"}), + healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRecorded", "transaction throttler healthchecks recorded", + []string{"cell", "DbType"}), + requestsTotal: env.Exporter().NewCountersWithSingleLabel(TxThrottlerName+"Requests", "transaction throttler requests", "workload"), + requestsThrottled: env.Exporter().NewCountersWithSingleLabel(TxThrottlerName+"Throttled", "transaction throttler requests throttled", "workload"), } } -// TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with -// go/vt/throttler.GlobalManager. -const TxThrottlerName = "TransactionThrottler" - -func newTxThrottler(config *txThrottlerConfig) (*TxThrottler, error) { - if config.enabled { - // Verify config. - err := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}.Verify() - if err != nil { - return nil, err - } - if len(config.healthCheckCells) == 0 { - return nil, fmt.Errorf("empty healthCheckCells given. %+v", config) - } - } - return &TxThrottler{ - config: config, - }, nil +// InitDBConfig initializes the target parameters for the throttler. +func (t *txThrottler) InitDBConfig(target *querypb.Target) { + t.target = proto.Clone(target).(*querypb.Target) } // Open opens the transaction throttler. It must be called prior to 'Throttle'. -func (t *TxThrottler) Open() error { - if !t.config.enabled { +func (t *txThrottler) Open() (err error) { + if !t.config.EnableTxThrottler { return nil } if t.state != nil { return nil } - log.Info("TxThrottler: opening") - var err error - t.state, err = newTxThrottlerState(t.config, t.target.Keyspace, t.target.Shard, t.target.Cell) + log.Info("txThrottler: opening") + t.throttlerRunning.Set(1) + t.state, err = newTxThrottlerState(t, t.config, t.target) return err } -// Close closes the TxThrottler object and releases resources. +// Close closes the txThrottler object and releases resources. // It should be called after the throttler is no longer needed. // It's ok to call this method on a closed throttler--in which case the method does nothing. -func (t *TxThrottler) Close() { - if !t.config.enabled { +func (t *txThrottler) Close() { + if !t.config.EnableTxThrottler { return } if t.state == nil { @@ -251,79 +247,150 @@ func (t *TxThrottler) Close() { } t.state.deallocateResources() t.state = nil - log.Info("TxThrottler: closed") + t.throttlerRunning.Set(0) + log.Info("txThrottler: closed") } // Throttle should be called before a new transaction is started. // It returns true if the transaction should not proceed (the caller // should back off). Throttle requires that Open() was previously called // successfully. -func (t *TxThrottler) Throttle() (result bool) { - if !t.config.enabled { +func (t *txThrottler) Throttle(priority int, workload string) (result bool) { + if !t.config.EnableTxThrottler { return false } if t.state == nil { - panic("BUG: Throttle() called on a closed TxThrottler") + return false } - return t.state.throttle() + + // Throttle according to both what the throttler state says and the priority. Workloads with lower priority value + // are less likely to be throttled. + result = t.state.throttle() && rand.Intn(sqlparser.MaxPriorityValue) < priority + + t.requestsTotal.Add(workload, 1) + if result { + t.requestsThrottled.Add(workload, 1) + } + + return result && !t.config.TxThrottlerDryRun } -func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string) (*txThrottlerState, error) { +func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfig, target *querypb.Target) (txThrottlerState, error) { + maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.TxThrottlerConfig.Get()} + t, err := throttlerFactory( TxThrottlerName, "TPS", /* unit */ 1, /* threadCount */ throttler.MaxRateModuleDisabled, /* maxRate */ - config.throttlerConfig.MaxReplicationLagSec /* maxReplicationLag */) + maxReplicationLagModuleConfig, + ) if err != nil { return nil, err } - if err := t.UpdateConfiguration(config.throttlerConfig, true /* copyZeroValues */); err != nil { + if err := t.UpdateConfiguration(config.TxThrottlerConfig.Get(), true /* copyZeroValues */); err != nil { t.Close() return nil, err } - result := &txThrottlerState{ - throttler: t, + + tabletTypes := make(map[topodatapb.TabletType]bool, len(*config.TxThrottlerTabletTypes)) + for _, tabletType := range *config.TxThrottlerTabletTypes { + tabletTypes[tabletType] = true + } + + state := &txThrottlerStateImpl{ + config: config, + healthCheckCells: config.TxThrottlerHealthCheckCells, + tabletTypes: tabletTypes, + throttler: t, + txThrottler: txThrottler, } - createTxThrottlerHealthCheck(config, result, cell) - - result.topologyWatchers = make( - []TopologyWatcherInterface, 0, len(config.healthCheckCells)) - for _, cell := range config.healthCheckCells { - result.topologyWatchers = append( - result.topologyWatchers, - topologyWatcherFactory( - config.topoServer, - result.healthCheck, - cell, - keyspace, - shard, - discovery.DefaultTopologyWatcherRefreshInterval, - discovery.DefaultTopoReadConcurrency)) + + // get cells from topo if none defined in tabletenv config + if len(state.healthCheckCells) == 0 { + ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + defer cancel() + state.healthCheckCells = fetchKnownCells(ctx, txThrottler.topoServer, target) + state.cellsFromTopo = true } - return result, nil -} -func createTxThrottlerHealthCheck(config *txThrottlerConfig, result *txThrottlerState, cell string) { ctx, cancel := context.WithCancel(context.Background()) - result.stopHealthCheck = cancel - result.healthCheck = healthCheckFactory(config.topoServer, cell, config.healthCheckCells) - ch := result.healthCheck.Subscribe() - go func(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case th := <-ch: - result.StatsUpdate(th) - } + state.stopHealthCheck = cancel + state.initHealthCheckStream(txThrottler.topoServer, target) + go state.healthChecksProcessor(ctx, txThrottler.topoServer, target) + + return state, nil +} + +func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) { + ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells) + ts.healthCheckChan = ts.healthCheck.Subscribe() + + ts.topologyWatchers = make( + map[string]TopologyWatcherInterface, len(ts.healthCheckCells)) + for _, cell := range ts.healthCheckCells { + ts.topologyWatchers[cell] = topologyWatcherFactory( + topoServer, + ts.healthCheck, + cell, + target.Keyspace, + target.Shard, + discovery.DefaultTopologyWatcherRefreshInterval, + discovery.DefaultTopoReadConcurrency, + ) + ts.txThrottler.topoWatchers.Add(cell, 1) + } +} + +func (ts *txThrottlerStateImpl) closeHealthCheckStream() { + if ts.healthCheck == nil { + return + } + for cell, watcher := range ts.topologyWatchers { + watcher.Stop() + ts.txThrottler.topoWatchers.Reset(cell) + } + ts.topologyWatchers = nil + ts.stopHealthCheck() + ts.healthCheck.Close() +} + +func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { + fetchCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer cancel() + + knownCells := fetchKnownCells(fetchCtx, topoServer, target) + if !reflect.DeepEqual(knownCells, ts.healthCheckCells) { + log.Info("txThrottler: restarting healthcheck stream due to topology cells update") + ts.healthCheckCells = knownCells + ts.closeHealthCheckStream() + ts.initHealthCheckStream(topoServer, target) + } +} + +func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { + var cellsUpdateTicks <-chan time.Time + if ts.cellsFromTopo { + ticker := time.NewTicker(ts.config.TxThrottlerTopoRefreshInterval) + cellsUpdateTicks = ticker.C + defer ticker.Stop() + } + for { + select { + case <-ctx.Done(): + return + case <-cellsUpdateTicks: + ts.updateHealthCheckCells(ctx, topoServer, target) + case th := <-ts.healthCheckChan: + ts.StatsUpdate(th) } - }(ctx) + } } -func (ts *txThrottlerState) throttle() bool { +func (ts *txThrottlerStateImpl) throttle() bool { if ts.throttler == nil { - panic("BUG: throttle called after deallocateResources was called.") + log.Error("txThrottler: throttle called after deallocateResources was called") + return false } // Serialize calls to ts.throttle.Throttle() ts.throttleMu.Lock() @@ -331,34 +398,31 @@ func (ts *txThrottlerState) throttle() bool { return ts.throttler.Throttle(0 /* threadId */) > 0 } -func (ts *txThrottlerState) deallocateResources() { - // We don't really need to nil out the fields here - // as deallocateResources is not expected to be called - // more than once, but it doesn't hurt to do so. - for _, watcher := range ts.topologyWatchers { - watcher.Stop() - } - ts.topologyWatchers = nil - - ts.healthCheck.Close() +func (ts *txThrottlerStateImpl) deallocateResources() { + // Close healthcheck and topo watchers + ts.closeHealthCheckStream() ts.healthCheck = nil - // After ts.healthCheck is closed txThrottlerState.StatsUpdate() is guaranteed not + // After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not // to be executing, so we can safely close the throttler. ts.throttler.Close() ts.throttler = nil } // StatsUpdate updates the health of a tablet with the given healthcheck. -func (ts *txThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) { - // Ignore PRIMARY and RDONLY stats. - // We currently do not monitor RDONLY tablets for replication lag. RDONLY tablets are not - // candidates for becoming primary during failover, and it's acceptable to serve somewhat - // stale date from these. - // TODO(erez): If this becomes necessary, we can add a configuration option that would - // determine whether we consider RDONLY tablets here, as well. - if tabletStats.Target.TabletType != topodatapb.TabletType_REPLICA { +func (ts *txThrottlerStateImpl) StatsUpdate(tabletStats *discovery.TabletHealth) { + if len(ts.tabletTypes) == 0 { return } - ts.throttler.RecordReplicationLag(time.Now(), tabletStats) + + tabletType := tabletStats.Target.TabletType + metricLabels := []string{tabletStats.Target.Cell, tabletType.String()} + ts.txThrottler.healthChecksReadTotal.Add(metricLabels, 1) + + // Monitor tablets for replication lag if they have a tablet + // type specified by the --tx-throttler-tablet-types flag. + if ts.tabletTypes[tabletType] { + ts.throttler.RecordReplicationLag(time.Now(), tabletStats) + ts.txThrottler.healthChecksRecordedTotal.Add(metricLabels, 1) + } } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 1606fa2cf4c..62a4d7a4abb 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -22,14 +22,18 @@ package txthrottler //go:generate mockgen -destination mock_topology_watcher_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler TopologyWatcherInterface import ( + "context" "testing" "time" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/throttler" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" querypb "vitess.io/vitess/go/vt/proto/query" @@ -39,17 +43,16 @@ import ( func TestDisabledThrottler(t *testing.T) { config := tabletenv.NewDefaultConfig() config.EnableTxThrottler = false - throttler := NewTxThrottler(config, nil) + env := tabletenv.NewEnv(config, t.Name()) + throttler := NewTxThrottler(env, nil) throttler.InitDBConfig(&querypb.Target{ Keyspace: "keyspace", Shard: "shard", }) - if err := throttler.Open(); err != nil { - t.Fatalf("want: nil, got: %v", err) - } - if result := throttler.Throttle(); result != false { - t.Errorf("want: false, got: %v", result) - } + assert.Nil(t, throttler.Open()) + assert.False(t, throttler.Throttle(0, "some_workload")) + throttlerImpl, _ := throttler.(*txThrottler) + assert.Zero(t, throttlerImpl.throttlerRunning.Get()) throttler.Close() } @@ -70,28 +73,18 @@ func TestEnabledThrottler(t *testing.T) { } topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface { - if ts != topoServer { - t.Errorf("want: %v, got: %v", ts, topoServer) - } - if cell != "cell1" && cell != "cell2" { - t.Errorf("want: cell1 or cell2, got: %v", cell) - } - if keyspace != "keyspace" { - t.Errorf("want: keyspace, got: %v", keyspace) - } - if shard != "shard" { - t.Errorf("want: shard, got: %v", shard) - } + assert.Equal(t, ts, topoServer) + assert.Contains(t, []string{"cell1", "cell2"}, cell) + assert.Equal(t, "keyspace", keyspace) + assert.Equal(t, "shard", shard) result := NewMockTopologyWatcherInterface(mockCtrl) result.EXPECT().Stop() return result } mockThrottler := NewMockThrottlerInterface(mockCtrl) - throttlerFactory = func(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (ThrottlerInterface, error) { - if threadCount != 1 { - t.Errorf("want: 1, got: %v", threadCount) - } + throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { + assert.Equal(t, 1, threadCount) return mockThrottler, nil } @@ -100,47 +93,136 @@ func TestEnabledThrottler(t *testing.T) { call1.Return(0 * time.Second) tabletStats := &discovery.TabletHealth{ Target: &querypb.Target{ + Cell: "cell1", TabletType: topodatapb.TabletType_REPLICA, }, } call2 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats) call3 := mockThrottler.EXPECT().Throttle(0) call3.Return(1 * time.Second) - call4 := mockThrottler.EXPECT().Close() + + call4 := mockThrottler.EXPECT().Throttle(0) + call4.Return(1 * time.Second) + calllast := mockThrottler.EXPECT().Close() + call1.After(call0) call2.After(call1) call3.After(call2) call4.After(call3) + calllast.After(call4) config := tabletenv.NewDefaultConfig() config.EnableTxThrottler = true - config.TxThrottlerHealthCheckCells = []string{"cell1", "cell2"} + config.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA} - throttler, err := tryCreateTxThrottler(config, ts) - if err != nil { - t.Fatalf("want: nil, got: %v", err) - } + env := tabletenv.NewEnv(config, t.Name()) + throttler := NewTxThrottler(env, ts) + throttlerImpl, _ := throttler.(*txThrottler) + assert.NotNil(t, throttlerImpl) throttler.InitDBConfig(&querypb.Target{ + Cell: "cell1", Keyspace: "keyspace", Shard: "shard", }) - if err := throttler.Open(); err != nil { - t.Fatalf("want: nil, got: %v", err) - } - if result := throttler.Throttle(); result != false { - t.Errorf("want: false, got: %v", result) - } - throttler.state.StatsUpdate(tabletStats) + + assert.Nil(t, throttlerImpl.Open()) + throttlerStateImpl := throttlerImpl.state.(*txThrottlerStateImpl) + assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerStateImpl.tabletTypes) + assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get()) + assert.Equal(t, map[string]int64{"cell1": 1, "cell2": 1}, throttlerImpl.topoWatchers.Counts()) + + assert.False(t, throttlerImpl.Throttle(100, "some_workload")) + assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Counts()["some_workload"]) + assert.Zero(t, throttlerImpl.requestsThrottled.Counts()["some_workload"]) + + throttlerImpl.state.StatsUpdate(tabletStats) // This calls replication lag thing + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksReadTotal.Counts()) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts()) rdonlyTabletStats := &discovery.TabletHealth{ Target: &querypb.Target{ + Cell: "cell2", TabletType: topodatapb.TabletType_RDONLY, }, } - // This call should not be forwarded to the go/vt/throttler.Throttler object. - throttler.state.StatsUpdate(rdonlyTabletStats) + // This call should not be forwarded to the go/vt/throttlerImpl.Throttler object. + throttlerImpl.state.StatsUpdate(rdonlyTabletStats) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1, "cell2.RDONLY": 1}, throttlerImpl.healthChecksReadTotal.Counts()) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts()) + // The second throttle call should reject. - if result := throttler.Throttle(); result != true { - t.Errorf("want: true, got: %v", result) + assert.True(t, throttlerImpl.Throttle(100, "some_workload")) + assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Counts()["some_workload"]) + assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some_workload"]) + + // This call should not throttle due to priority. Check that's the case and counters agree. + assert.False(t, throttlerImpl.Throttle(0, "some_workload")) + assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Counts()["some_workload"]) + assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some_workload"]) + throttlerImpl.Close() + assert.Zero(t, throttlerImpl.throttlerRunning.Get()) + assert.Equal(t, map[string]int64{"cell1": 0, "cell2": 0}, throttlerImpl.topoWatchers.Counts()) +} + +func TestFetchKnownCells(t *testing.T) { + { + ts := memorytopo.NewServer("cell1", "cell2") + cells := fetchKnownCells(context.Background(), ts, &querypb.Target{Cell: "cell1"}) + assert.Equal(t, []string{"cell1", "cell2"}, cells) } - throttler.Close() + { + ts := memorytopo.NewServer() + cells := fetchKnownCells(context.Background(), ts, &querypb.Target{Cell: "cell1"}) + assert.Equal(t, []string{"cell1"}, cells) + } +} + +func TestDryRunThrottler(t *testing.T) { + config := tabletenv.NewDefaultConfig() + env := tabletenv.NewEnv(config, t.Name()) + + testCases := []struct { + Name string + txThrottlerStateShouldThrottle bool + throttlerDryRun bool + expectedResult bool + }{ + {Name: "Real run throttles when txThrottlerStateImpl says it should", txThrottlerStateShouldThrottle: true, throttlerDryRun: false, expectedResult: true}, + {Name: "Real run does not throttle when txThrottlerStateImpl says it should not", txThrottlerStateShouldThrottle: false, throttlerDryRun: false, expectedResult: false}, + {Name: "Dry run does not throttle when txThrottlerStateImpl says it should", txThrottlerStateShouldThrottle: true, throttlerDryRun: true, expectedResult: false}, + {Name: "Dry run does not throttle when txThrottlerStateImpl says it should not", txThrottlerStateShouldThrottle: false, throttlerDryRun: true, expectedResult: false}, + } + + for _, aTestCase := range testCases { + theTestCase := aTestCase + + t.Run(theTestCase.Name, func(t *testing.T) { + aTxThrottler := &txThrottler{ + config: &tabletenv.TabletConfig{ + EnableTxThrottler: true, + TxThrottlerDryRun: theTestCase.throttlerDryRun, + }, + state: &mockTxThrottlerState{shouldThrottle: theTestCase.txThrottlerStateShouldThrottle}, + throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"), + requestsTotal: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerRequests", "transaction throttler requests", "workload"), + requestsThrottled: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerThrottled", "transaction throttler requests throttled", "workload"), + } + + assert.Equal(t, theTestCase.expectedResult, aTxThrottler.Throttle(100, "some-workload")) + }) + } +} + +type mockTxThrottlerState struct { + shouldThrottle bool +} + +func (t *mockTxThrottlerState) deallocateResources() { + +} +func (t *mockTxThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) { + +} + +func (t *mockTxThrottlerState) throttle() bool { + return t.shouldThrottle } diff --git a/proto/query.proto b/proto/query.proto index 1e7e533cf44..fb376ed9cc9 100644 --- a/proto/query.proto +++ b/proto/query.proto @@ -325,6 +325,14 @@ message ExecuteOptions { // if the user has created temp tables, Vitess will not reuse plans created for this session in other sessions. // The current session can still use other sessions cached plans. bool has_created_temp_tables = 12; + + // WorkloadName specifies the name of the workload as indicated in query directives. This is used for instrumentation + // in metrics and tracing spans. + string WorkloadName = 15; + + // priority specifies the priority of the query, between 0 and 100. This is leveraged by the transaction + // throttler to determine whether, under resource contention, a query should or should not be throttled. + string priority = 16; } // Field describes a single column returned by a query diff --git a/test/templates/unit_test_self_hosted.tpl b/test/templates/unit_test_self_hosted.tpl index 68a620c1428..96a267e372c 100644 --- a/test/templates/unit_test_self_hosted.tpl +++ b/test/templates/unit_test_self_hosted.tpl @@ -55,8 +55,7 @@ jobs: timeout_minutes: 30 max_attempts: 3 retry_on: error - command: | - docker run --name "{{.ImageName}}_$GITHUB_SHA" {{.ImageName}}:$GITHUB_SHA /bin/bash -c 'make unit_test' + command: docker run --name "{{.ImageName}}_$GITHUB_SHA" {{.ImageName}}:$GITHUB_SHA /bin/bash -c 'make unit_test' - name: Print Volume Used if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.unit_tests == 'true' diff --git a/web/vtadmin/src/proto/vtadmin.d.ts b/web/vtadmin/src/proto/vtadmin.d.ts index 75454360f09..4ebcf2fb977 100644 --- a/web/vtadmin/src/proto/vtadmin.d.ts +++ b/web/vtadmin/src/proto/vtadmin.d.ts @@ -24991,6 +24991,12 @@ export namespace query { /** ExecuteOptions has_created_temp_tables */ has_created_temp_tables?: (boolean|null); + + /** ExecuteOptions WorkloadName */ + WorkloadName?: (string|null); + + /** ExecuteOptions priority */ + priority?: (string|null); } /** Represents an ExecuteOptions. */ @@ -25026,6 +25032,12 @@ export namespace query { /** ExecuteOptions has_created_temp_tables. */ public has_created_temp_tables: boolean; + /** ExecuteOptions WorkloadName. */ + public WorkloadName: string; + + /** ExecuteOptions priority. */ + public priority: string; + /** * Creates a new ExecuteOptions instance using the specified properties. * @param [properties] Properties to set diff --git a/web/vtadmin/src/proto/vtadmin.js b/web/vtadmin/src/proto/vtadmin.js index 39cf0d0068d..8bb4ceb2c83 100644 --- a/web/vtadmin/src/proto/vtadmin.js +++ b/web/vtadmin/src/proto/vtadmin.js @@ -57353,6 +57353,8 @@ $root.query = (function() { * @property {boolean|null} [skip_query_plan_cache] ExecuteOptions skip_query_plan_cache * @property {query.ExecuteOptions.PlannerVersion|null} [planner_version] ExecuteOptions planner_version * @property {boolean|null} [has_created_temp_tables] ExecuteOptions has_created_temp_tables + * @property {string|null} [WorkloadName] ExecuteOptions WorkloadName + * @property {string|null} [priority] ExecuteOptions priority */ /** @@ -57434,6 +57436,22 @@ $root.query = (function() { */ ExecuteOptions.prototype.has_created_temp_tables = false; + /** + * ExecuteOptions WorkloadName. + * @member {string} WorkloadName + * @memberof query.ExecuteOptions + * @instance + */ + ExecuteOptions.prototype.WorkloadName = ""; + + /** + * ExecuteOptions priority. + * @member {string} priority + * @memberof query.ExecuteOptions + * @instance + */ + ExecuteOptions.prototype.priority = ""; + /** * Creates a new ExecuteOptions instance using the specified properties. * @function create @@ -57474,6 +57492,10 @@ $root.query = (function() { writer.uint32(/* id 11, wireType 0 =*/88).int32(message.planner_version); if (message.has_created_temp_tables != null && Object.hasOwnProperty.call(message, "has_created_temp_tables")) writer.uint32(/* id 12, wireType 0 =*/96).bool(message.has_created_temp_tables); + if (message.WorkloadName != null && Object.hasOwnProperty.call(message, "WorkloadName")) + writer.uint32(/* id 15, wireType 2 =*/122).string(message.WorkloadName); + if (message.priority != null && Object.hasOwnProperty.call(message, "priority")) + writer.uint32(/* id 16, wireType 2 =*/130).string(message.priority); return writer; }; @@ -57532,6 +57554,12 @@ $root.query = (function() { case 12: message.has_created_temp_tables = reader.bool(); break; + case 15: + message.WorkloadName = reader.string(); + break; + case 16: + message.priority = reader.string(); + break; default: reader.skipType(tag & 7); break; @@ -57624,6 +57652,12 @@ $root.query = (function() { if (message.has_created_temp_tables != null && message.hasOwnProperty("has_created_temp_tables")) if (typeof message.has_created_temp_tables !== "boolean") return "has_created_temp_tables: boolean expected"; + if (message.WorkloadName != null && message.hasOwnProperty("WorkloadName")) + if (!$util.isString(message.WorkloadName)) + return "WorkloadName: string expected"; + if (message.priority != null && message.hasOwnProperty("priority")) + if (!$util.isString(message.priority)) + return "priority: string expected"; return null; }; @@ -57746,6 +57780,10 @@ $root.query = (function() { } if (object.has_created_temp_tables != null) message.has_created_temp_tables = Boolean(object.has_created_temp_tables); + if (object.WorkloadName != null) + message.WorkloadName = String(object.WorkloadName); + if (object.priority != null) + message.priority = String(object.priority); return message; }; @@ -57775,6 +57813,8 @@ $root.query = (function() { object.skip_query_plan_cache = false; object.planner_version = options.enums === String ? "DEFAULT_PLANNER" : 0; object.has_created_temp_tables = false; + object.WorkloadName = ""; + object.priority = ""; } if (message.included_fields != null && message.hasOwnProperty("included_fields")) object.included_fields = options.enums === String ? $root.query.ExecuteOptions.IncludedFields[message.included_fields] : message.included_fields; @@ -57795,6 +57835,10 @@ $root.query = (function() { object.planner_version = options.enums === String ? $root.query.ExecuteOptions.PlannerVersion[message.planner_version] : message.planner_version; if (message.has_created_temp_tables != null && message.hasOwnProperty("has_created_temp_tables")) object.has_created_temp_tables = message.has_created_temp_tables; + if (message.WorkloadName != null && message.hasOwnProperty("WorkloadName")) + object.WorkloadName = message.WorkloadName; + if (message.priority != null && message.hasOwnProperty("priority")) + object.priority = message.priority; return object; };