diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 8628469fdc5..ea43f16c9df 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -52,6 +52,7 @@ Flags: --config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s) --config-type string Config file type (omit to infer config type from file extension). --consolidator-query-waiter-cap int Configure the maximum number of clients allowed to wait on the consolidator. + --consolidator-query-waiter-cap-method string Configure the method when consolidator waiter cap is exceeded. Options: fallthrough, reject. (default "fallthrough") --consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152) --consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728) --consul_auth_static_file string JSON File to read the topos/tokens from. diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 29cdde0d4a1..72c8edbd9d4 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -86,6 +86,7 @@ Flags: --config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s) --config-type string Config file type (omit to infer config type from file extension). --consolidator-query-waiter-cap int Configure the maximum number of clients allowed to wait on the consolidator. + --consolidator-query-waiter-cap-method string Configure the method when consolidator waiter cap is exceeded. Options: fallthrough, reject. (default "fallthrough") --consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152) --consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728) --consul_auth_static_file string JSON File to read the topos/tokens from. diff --git a/go/sync2/fake_consolidator.go b/go/sync2/fake_consolidator.go index aadee1d37ce..272e3b8b6f2 100644 --- a/go/sync2/fake_consolidator.go +++ b/go/sync2/fake_consolidator.go @@ -51,8 +51,12 @@ type FakePendingResult struct { BroadcastCalls int // WaitCalls can be used to inspect Wait calls. WaitCalls int - err error - result *sqltypes.Result + // AddWaiterCounterCalls can be used to inspect AddWaiterCounter calls. + AddWaiterCounterCalls []int64 + // WaiterCount simulates the current waiter count + WaiterCount int64 + err error + result *sqltypes.Result } var ( @@ -113,7 +117,9 @@ func (fr *FakePendingResult) Wait() { fr.WaitCalls++ } -// AddWaiterCounter is currently a no-op. -func (fr *FakePendingResult) AddWaiterCounter(int64) *int64 { - return new(int64) +// AddWaiterCounter records the call and simulates waiter count changes. +func (fr *FakePendingResult) AddWaiterCounter(delta int64) *int64 { + fr.AddWaiterCounterCalls = append(fr.AddWaiterCounterCalls, delta) + fr.WaiterCount += delta + return &fr.WaiterCount } diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 6b4c0557e7f..5aa73c19d90 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -698,6 +698,8 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) { // Check tablet type. if qre.shouldConsolidate() { q, original := qre.tsv.qe.consolidator.Create(sqlWithoutComments) + waiterCapExceeded := false + if original { defer q.Broadcast() conn, err := qre.getConn() @@ -717,13 +719,26 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) { startTime := time.Now() q.Wait() qre.tsv.stats.WaitTimings.Record("Consolidations", startTime) + q.AddWaiterCounter(-1) + } else { + // Waiter cap exceeded, handle based on configured method + q.AddWaiterCounter(-1) + if qre.tsv.config.ConsolidatorQueryWaiterCapMethod == "reject" { + return nil, vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "consolidator waiter cap (%d) exceeded", waiterCap) + } + // Default to fallback to independent query execution + waiterCapExceeded = true } - q.AddWaiterCounter(-1) } - if q.Err() != nil { - return nil, q.Err() + + // Return consolidation results unless waiter cap was exceeded + if !waiterCapExceeded { + if q.Err() != nil { + return nil, q.Err() + } + return q.Result(), nil } - return q.Result(), nil + // If waiter cap exceeded, fall through to independent execution } conn, err := qre.getConn() if err != nil { diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index 84dd00b8eb1..4918d256ca0 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -1437,6 +1437,150 @@ func TestQueryExecutorShouldConsolidate(t *testing.T) { } } +func TestQueryExecutorConsolidatorWaiterCapFallback(t *testing.T) { + // Test that when the consolidator waiter cap is reached, queries fall back + // to independent execution instead of returning empty results. + + db := setUpQueryExecutorTest(t) + defer db.Close() + + ctx := context.Background() + tsv := newTestTabletServer(ctx, enableConsolidator, db) + defer tsv.StopService() + + // Set a waiter cap of 1 + tsv.config.ConsolidatorQueryWaiterCap = 1 + + fakeConsolidator := sync2.NewFakeConsolidator() + tsv.qe.consolidator = fakeConsolidator + + input := "select * from t limit 10001" + result := &sqltypes.Result{ + Fields: getTestTableFields(), + Rows: [][]sqltypes.Value{{ + sqltypes.NewInt32(1), // pk + sqltypes.NewInt32(100), // name + sqltypes.NewInt32(200), // addr + }}, + } + + // Set up consolidator to simulate an identical query already running (Created=false) + fakePendingResult := &sync2.FakePendingResult{} + fakePendingResult.SetResult(result) + // Start with waiter count above the cap (2 > 1), so the condition fails + fakePendingResult.WaiterCount = 2 + + fakeConsolidator.CreateReturn = &sync2.FakeConsolidatorCreateReturn{ + Created: false, // Simulate identical query already running + PendingResult: fakePendingResult, + } + + // Set up database query/response for fallback execution + db.AddQuery(input, result) + + qre := newTestQueryExecutor(context.Background(), tsv, input, 0) + qre.options = &querypb.ExecuteOptions{Consolidator: querypb.ExecuteOptions_CONSOLIDATOR_ENABLED} + + // Execute query + actualResult, err := qre.Execute() + require.NoError(t, err) + require.NotNil(t, actualResult) + + // Verify we got the correct result (not empty) + require.Equal(t, result.Fields, actualResult.Fields) + require.Equal(t, result.Rows, actualResult.Rows) + + // Verify consolidator was attempted + require.Len(t, fakeConsolidator.CreateCalls, 1) + + // Verify we did NOT wait (because waiter cap was exceeded) + require.Equal(t, 0, fakePendingResult.WaitCalls) + + // Verify we did NOT broadcast (because we're not the original) + require.Equal(t, 0, fakePendingResult.BroadcastCalls) + + // Verify AddWaiterCounter was called: once with 0 (to check count), once with -1 (cleanup) + require.Len(t, fakePendingResult.AddWaiterCounterCalls, 2) + require.Equal(t, int64(0), fakePendingResult.AddWaiterCounterCalls[0]) // Check current count + require.Equal(t, int64(-1), fakePendingResult.AddWaiterCounterCalls[1]) // Decrement + + // Verify fallback executed the query independently + require.Equal(t, 1, db.GetQueryCalledNum(input)) + + db.VerifyAllExecutedOrFail() +} + +func TestQueryExecutorConsolidatorWaiterCapReject(t *testing.T) { + // Test that when the consolidator waiter cap is reached and method is "reject", + // queries are rejected with RESOURCE_EXHAUSTED error. + + db := setUpQueryExecutorTest(t) + defer db.Close() + + ctx := context.Background() + tsv := newTestTabletServer(ctx, enableConsolidator, db) + defer tsv.StopService() + + // Set waiter cap of 1 and method to "reject" + tsv.config.ConsolidatorQueryWaiterCap = 1 + tsv.config.ConsolidatorQueryWaiterCapMethod = "reject" + + fakeConsolidator := sync2.NewFakeConsolidator() + tsv.qe.consolidator = fakeConsolidator + + input := "select * from t limit 10001" + result := &sqltypes.Result{ + Fields: getTestTableFields(), + Rows: [][]sqltypes.Value{{ + sqltypes.NewInt32(1), // pk + sqltypes.NewInt32(100), // name + sqltypes.NewInt32(200), // addr + }}, + } + + // Set up consolidator to simulate an identical query already running (Created=false) + fakePendingResult := &sync2.FakePendingResult{} + fakePendingResult.SetResult(result) + + // Start with waiter count above the cap (2 > 1), so the condition fails + fakePendingResult.WaiterCount = 2 + + fakeConsolidator.CreateReturn = &sync2.FakeConsolidatorCreateReturn{ + Created: false, // Simulate identical query already running + PendingResult: fakePendingResult, + } + + qre := newTestQueryExecutor(context.Background(), tsv, input, 0) + qre.options = &querypb.ExecuteOptions{Consolidator: querypb.ExecuteOptions_CONSOLIDATOR_ENABLED} + + // Execute query - should get RESOURCE_EXHAUSTED error + actualResult, err := qre.Execute() + require.Error(t, err) + require.Nil(t, actualResult) + + // Verify error is RESOURCE_EXHAUSTED + require.Contains(t, err.Error(), "consolidator waiter cap") + require.Contains(t, err.Error(), "exceeded") + require.Equal(t, vtrpcpb.Code_RESOURCE_EXHAUSTED, vterrors.Code(err)) + + // Verify consolidator was attempted + require.Len(t, fakeConsolidator.CreateCalls, 1) + + // Verify we did NOT wait (because waiter cap was exceeded and method is reject) + require.Equal(t, 0, fakePendingResult.WaitCalls) + + // Verify we did NOT broadcast (because we're not the original) + require.Equal(t, 0, fakePendingResult.BroadcastCalls) + + // Verify AddWaiterCounter was called: once with 0 (to check count), once with -1 (cleanup) + require.Len(t, fakePendingResult.AddWaiterCounterCalls, 2) + require.Equal(t, int64(0), fakePendingResult.AddWaiterCounterCalls[0]) // Check current count + require.Equal(t, int64(-1), fakePendingResult.AddWaiterCounterCalls[1]) // Decrement + + // Verify no database query was executed (rejected before fallback) + require.Equal(t, 0, db.GetQueryCalledNum(input)) +} + func TestGetConnectionLogStats(t *testing.T) { db := setUpQueryExecutorTest(t) defer db.Close() diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index e960ac20f6f..7e0f09d74e0 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -198,6 +198,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) { fs.Int64Var(¤tConfig.ConsolidatorStreamTotalSize, "consolidator-stream-total-size", defaultConfig.ConsolidatorStreamTotalSize, "Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator.") fs.Int64Var(¤tConfig.ConsolidatorQueryWaiterCap, "consolidator-query-waiter-cap", 0, "Configure the maximum number of clients allowed to wait on the consolidator.") + fs.StringVar(¤tConfig.ConsolidatorQueryWaiterCapMethod, "consolidator-query-waiter-cap-method", "fallthrough", "Configure the method when consolidator waiter cap is exceeded. Options: fallthrough, reject.") fs.DurationVar(&healthCheckInterval, "health_check_interval", defaultConfig.Healthcheck.Interval, "Interval between health checks") fs.DurationVar(°radedThreshold, "degraded_threshold", defaultConfig.Healthcheck.DegradedThreshold, "replication lag after which a replica is considered degraded") fs.DurationVar(&unhealthyThreshold, "unhealthy_threshold", defaultConfig.Healthcheck.UnhealthyThreshold, "replication lag after which a replica is considered unhealthy") @@ -249,6 +250,16 @@ func Init() { currentConfig.Consolidator = Disable } + switch currentConfig.ConsolidatorQueryWaiterCapMethod { + case "fallthrough", "reject": + // Valid options + case "": + // Empty string defaults to fallthrough + currentConfig.ConsolidatorQueryWaiterCapMethod = "fallthrough" + default: + log.Exitf("Invalid consolidator-query-waiter-cap-method value %v: must be either 'fallthrough' or 'reject'", currentConfig.ConsolidatorQueryWaiterCapMethod) + } + if heartbeatInterval == 0 { heartbeatInterval = defaultConfig.ReplicationTracker.HeartbeatInterval } @@ -319,6 +330,7 @@ type TabletConfig struct { ConsolidatorStreamTotalSize int64 `json:"consolidatorStreamTotalSize,omitempty"` ConsolidatorStreamQuerySize int64 `json:"consolidatorStreamQuerySize,omitempty"` ConsolidatorQueryWaiterCap int64 `json:"consolidatorMaxQueryWait,omitempty"` + ConsolidatorQueryWaiterCapMethod string `json:"consolidatorQueryWaiterCapMethod,omitempty"` QueryCacheMemory int64 `json:"queryCacheMemory,omitempty"` QueryCacheDoorkeeper bool `json:"queryCacheDoorkeeper,omitempty"` SchemaReloadInterval time.Duration `json:"schemaReloadIntervalSeconds,omitempty"` @@ -982,9 +994,10 @@ var defaultConfig = TabletConfig{ // of them ready in MySQL and profit from a pipelining effect. MaxConcurrency: 5, }, - Consolidator: Enable, - ConsolidatorStreamTotalSize: 128 * 1024 * 1024, - ConsolidatorStreamQuerySize: 2 * 1024 * 1024, + Consolidator: Enable, + ConsolidatorStreamTotalSize: 128 * 1024 * 1024, + ConsolidatorStreamQuerySize: 2 * 1024 * 1024, + ConsolidatorQueryWaiterCapMethod: "fallthrough", // The value for StreamBufferSize was chosen after trying out a few of // them. Too small buffers force too many packets to be sent. Too big // buffers force the clients to read them in multiple chunks and make diff --git a/go/vt/vttablet/tabletserver/tabletenv/config_test.go b/go/vt/vttablet/tabletserver/tabletenv/config_test.go index 6311384162d..12d95bc0bbb 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config_test.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config_test.go @@ -125,6 +125,7 @@ func TestDefaultConfig(t *testing.T) { gotBytes, err := yaml2.Marshal(NewDefaultConfig()) require.NoError(t, err) want := `consolidator: enable +consolidatorQueryWaiterCapMethod: fallthrough consolidatorStreamQuerySize: 2097152 consolidatorStreamTotalSize: 134217728 gracePeriods: {}