diff --git a/go/vt/throttler/throttler.go b/go/vt/throttler/throttler.go index 03a20013396..9f37887caf7 100644 --- a/go/vt/throttler/throttler.go +++ b/go/vt/throttler/throttler.go @@ -35,6 +35,7 @@ import ( "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/proto/topodata" throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata" ) @@ -212,6 +213,28 @@ func (t *Throttler) Throttle(threadID int) time.Duration { return t.threadThrottlers[threadID].throttle(t.nowFunc()) } +// MaxLag returns the max of all the last replication lag values seen across all tablets of +// the provided type, excluding ignored tablets. +func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 { + cache := t.maxReplicationLagModule.lagCacheByType(tabletType) + + var maxLag uint32 + cacheEntries := cache.entries + + for key := range cacheEntries { + if cache.isIgnored(key) { + continue + } + + lag := cache.latest(key).Stats.ReplicationLagSeconds + if lag > maxLag { + maxLag = lag + } + } + + return maxLag +} + // ThreadFinished marks threadID as finished and redistributes the thread's // rate allotment across the other threads. // After ThreadFinished() is called, Throttle() must not be called anymore. diff --git a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go index 53b827d591a..33b0a0c26ba 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go @@ -12,6 +12,7 @@ import ( discovery "vitess.io/vitess/go/vt/discovery" throttlerdata "vitess.io/vitess/go/vt/proto/throttlerdata" + topodata "vitess.io/vitess/go/vt/proto/topodata" ) // MockThrottlerInterface is a mock of ThrottlerInterface interface. @@ -63,6 +64,20 @@ func (mr *MockThrottlerInterfaceMockRecorder) GetConfiguration() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConfiguration", reflect.TypeOf((*MockThrottlerInterface)(nil).GetConfiguration)) } +// MaxLag mocks base method. +func (m *MockThrottlerInterface) MaxLag(tabletType topodata.TabletType) uint32 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MaxLag", tabletType) + ret0, _ := ret[0].(uint32) + return ret0 +} + +// MaxLag indicates an expected call of LastMaxLagNotIgnoredForTabletType. +func (mr *MockThrottlerInterfaceMockRecorder) MaxLag(tabletType interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxLag", reflect.TypeOf((*MockThrottlerInterface)(nil).MaxLag), tabletType) +} + // MaxRate mocks base method. func (m *MockThrottlerInterface) MaxRate() int64 { m.ctrl.T.Helper() diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 5b724ca97cf..952d1631f32 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -20,6 +20,7 @@ import ( "fmt" "strings" "sync" + "sync/atomic" "time" "google.golang.org/protobuf/proto" @@ -39,7 +40,73 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) +<<<<<<< HEAD // TxThrottler throttles transactions based on replication lag. +======= +// These vars store the functions used to create the topo server, healthcheck, +// and go/vt/throttler. These are provided here so that they can be overridden +// in tests to generate mocks. +type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck +type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) + +var ( + healthCheckFactory healthCheckFactoryFunc + throttlerFactory throttlerFactoryFunc +) + +func resetTxThrottlerFactories() { + healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck { + return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ",")) + } + throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { + return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now) + } +} + +func init() { + resetTxThrottlerFactories() +} + +// TxThrottler defines the interface for the transaction throttler. +type TxThrottler interface { + InitDBConfig(target *querypb.Target) + Open() (err error) + Close() + Throttle(priority int, workload string) (result bool) +} + +// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler +// It is only used here to allow mocking out a throttler object. +type ThrottlerInterface interface { + Throttle(threadID int) time.Duration + ThreadFinished(threadID int) + Close() + MaxRate() int64 + SetMaxRate(rate int64) + RecordReplicationLag(time time.Time, th *discovery.TabletHealth) + GetConfiguration() *throttlerdatapb.Configuration + UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error + ResetConfiguration() + MaxLag(tabletType topodatapb.TabletType) uint32 +} + +// TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with +// go/vt/throttler.GlobalManager. +const TxThrottlerName = "TransactionThrottler" + +// fetchKnownCells gathers a list of known cells from the topology. On error, +// the cell of the local tablet will be used and an error is logged. +func fetchKnownCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) []string { + cells, err := topoServer.GetKnownCells(ctx) + if err != nil { + log.Errorf("txThrottler: falling back to local cell due to error fetching cells from topology: %+v", err) + cells = []string{target.Cell} + } + return cells +} + +// txThrottler implements TxThrottle for throttling transactions based on replication lag. +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) // It's a thin wrapper around the throttler found in vitess/go/vt/throttler. // It uses a discovery.HealthCheck to send replication-lag updates to the wrapped throttler. // @@ -173,7 +240,23 @@ type txThrottlerState struct { stopHealthCheck context.CancelFunc healthCheck discovery.HealthCheck +<<<<<<< HEAD topologyWatchers []TopologyWatcherInterface +======= + healthCheckChan chan *discovery.TabletHealth + healthCheckCells []string + cellsFromTopo bool + + // tabletTypes stores the tablet types for throttling + tabletTypes map[topodatapb.TabletType]bool + + maxLag int64 + done chan bool + waitForTermination sync.WaitGroup +<<<<<<< HEAD +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) +======= +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) } // These vars store the functions used to create the topo server, healthcheck, @@ -265,7 +348,21 @@ func (t *TxThrottler) Throttle() (result bool) { if t.state == nil { panic("BUG: Throttle() called on a closed TxThrottler") } +<<<<<<< HEAD return t.state.throttle() +======= + + // Throttle according to both what the throttler state says and the priority. Workloads with lower priority value + // are less likely to be throttled. + result = rand.Intn(sqlparser.MaxPriorityValue) < priority && t.state.throttle() + + t.requestsTotal.Add(workload, 1) + if result { + t.requestsThrottled.Add(workload, 1) + } + + return result && !t.config.TxThrottlerDryRun +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) } func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string) (*txThrottlerState, error) { @@ -287,6 +384,7 @@ func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string } createTxThrottlerHealthCheck(config, result, cell) +<<<<<<< HEAD result.topologyWatchers = make( []TopologyWatcherInterface, 0, len(config.healthCheckCells)) for _, cell := range config.healthCheckCells { @@ -300,12 +398,30 @@ func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string shard, discovery.DefaultTopologyWatcherRefreshInterval, discovery.DefaultTopoReadConcurrency)) +======= + state := &txThrottlerStateImpl{ + config: config, + healthCheckCells: config.TxThrottlerHealthCheckCells, + tabletTypes: tabletTypes, + throttler: t, + txThrottler: txThrottler, + done: make(chan bool, 1), + } + + // get cells from topo if none defined in tabletenv config + if len(state.healthCheckCells) == 0 { + ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + defer cancel() + state.healthCheckCells = fetchKnownCells(ctx, txThrottler.topoServer, target) + state.cellsFromTopo = true +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) } return result, nil } func createTxThrottlerHealthCheck(config *txThrottlerConfig, result *txThrottlerState, cell string) { ctx, cancel := context.WithCancel(context.Background()) +<<<<<<< HEAD result.stopHealthCheck = cancel result.healthCheck = healthCheckFactory(config.topoServer, cell, config.healthCheckCells) ch := result.healthCheck.Subscribe() @@ -317,6 +433,59 @@ func createTxThrottlerHealthCheck(config *txThrottlerConfig, result *txThrottler case th := <-ch: result.StatsUpdate(th) } +======= + state.stopHealthCheck = cancel + state.initHealthCheckStream(txThrottler.topoServer, target) + go state.healthChecksProcessor(ctx, txThrottler.topoServer, target) + state.waitForTermination.Add(1) + go state.updateMaxLag() + + return state, nil +} + +func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) { + ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells) + ts.healthCheckChan = ts.healthCheck.Subscribe() + +} + +func (ts *txThrottlerStateImpl) closeHealthCheckStream() { + if ts.healthCheck == nil { + return + } + ts.stopHealthCheck() + ts.healthCheck.Close() +} + +func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { + fetchCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer cancel() + + knownCells := fetchKnownCells(fetchCtx, topoServer, target) + if !reflect.DeepEqual(knownCells, ts.healthCheckCells) { + log.Info("txThrottler: restarting healthcheck stream due to topology cells update") + ts.healthCheckCells = knownCells + ts.closeHealthCheckStream() + ts.initHealthCheckStream(topoServer, target) + } +} + +func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { + var cellsUpdateTicks <-chan time.Time + if ts.cellsFromTopo { + ticker := time.NewTicker(ts.config.TxThrottlerTopoRefreshInterval) + cellsUpdateTicks = ticker.C + defer ticker.Stop() + } + for { + select { + case <-ctx.Done(): + return + case <-cellsUpdateTicks: + ts.updateHealthCheckCells(ctx, topoServer, target) + case th := <-ts.healthCheckChan: + ts.StatsUpdate(th) +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) } }(ctx) } @@ -328,7 +497,62 @@ func (ts *txThrottlerState) throttle() bool { // Serialize calls to ts.throttle.Throttle() ts.throttleMu.Lock() defer ts.throttleMu.Unlock() - return ts.throttler.Throttle(0 /* threadId */) > 0 + + maxLag := atomic.LoadInt64(&ts.maxLag) + + return maxLag > ts.config.TxThrottlerConfig.TargetReplicationLagSec && + ts.throttler.Throttle(0 /* threadId */) > 0 +<<<<<<< HEAD +======= +} + +func (ts *txThrottlerStateImpl) updateMaxLag() { + defer ts.waitForTermination.Done() + // We use half of the target lag to ensure we have enough resolution to see changes in lag below that value + ticker := time.NewTicker(time.Duration(ts.config.TxThrottlerConfig.TargetReplicationLagSec/2) * time.Second) + defer ticker.Stop() +outerloop: + for { + select { + case <-ticker.C: + var maxLag uint32 + + for tabletType := range ts.tabletTypes { + maxLagPerTabletType := ts.throttler.MaxLag(tabletType) + if maxLagPerTabletType > maxLag { + maxLag = maxLagPerTabletType + } + } + atomic.StoreInt64(&ts.maxLag, int64(maxLag)) + case <-ts.done: + break outerloop + } + } +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) +} + +func (ts *txThrottlerStateImpl) updateMaxLag() { + defer ts.waitForTermination.Done() + // We use half of the target lag to ensure we have enough resolution to see changes in lag below that value + ticker := time.NewTicker(time.Duration(ts.config.TxThrottlerConfig.TargetReplicationLagSec/2) * time.Second) + defer ticker.Stop() +outerloop: + for { + select { + case <-ticker.C: + var maxLag uint32 + + for tabletType := range ts.tabletTypes { + maxLagPerTabletType := ts.throttler.MaxLag(tabletType) + if maxLagPerTabletType > maxLag { + maxLag = maxLagPerTabletType + } + } + atomic.StoreInt64(&ts.maxLag, int64(maxLag)) + case <-ts.done: + break outerloop + } + } } func (ts *txThrottlerState) deallocateResources() { @@ -343,7 +567,16 @@ func (ts *txThrottlerState) deallocateResources() { ts.healthCheck.Close() ts.healthCheck = nil +<<<<<<< HEAD +<<<<<<< HEAD // After ts.healthCheck is closed txThrottlerState.StatsUpdate() is guaranteed not +======= +======= +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) + ts.done <- true + ts.waitForTermination.Wait() + // After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) // to be executing, so we can safely close the throttler. ts.throttler.Close() ts.throttler = nil diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 1606fa2cf4c..0344bb84f2b 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -22,6 +22,14 @@ package txthrottler //go:generate mockgen -destination mock_topology_watcher_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler TopologyWatcherInterface import ( +<<<<<<< HEAD +======= + "context" + "sync/atomic" +<<<<<<< HEAD +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) +======= +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) "testing" "time" @@ -44,12 +52,19 @@ func TestDisabledThrottler(t *testing.T) { Keyspace: "keyspace", Shard: "shard", }) +<<<<<<< HEAD if err := throttler.Open(); err != nil { t.Fatalf("want: nil, got: %v", err) } if result := throttler.Throttle(); result != false { t.Errorf("want: false, got: %v", result) } +======= + assert.Nil(t, throttler.Open()) + assert.False(t, throttler.Throttle(0, "some-workload")) + throttlerImpl, _ := throttler.(*txThrottler) + assert.Zero(t, throttlerImpl.throttlerRunning.Get()) +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) throttler.Close() } @@ -95,14 +110,23 @@ func TestEnabledThrottler(t *testing.T) { return mockThrottler, nil } - call0 := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */) - call1 := mockThrottler.EXPECT().Throttle(0) - call1.Return(0 * time.Second) + var calls []*gomock.Call + + call := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */) + calls = append(calls, call) + + // 1 + call = mockThrottler.EXPECT().Throttle(0) + call.Return(0 * time.Second) + calls = append(calls, call) + tabletStats := &discovery.TabletHealth{ Target: &querypb.Target{ TabletType: topodatapb.TabletType_REPLICA, }, } +<<<<<<< HEAD +<<<<<<< HEAD call2 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats) call3 := mockThrottler.EXPECT().Throttle(0) call3.Return(1 * time.Second) @@ -111,6 +135,56 @@ func TestEnabledThrottler(t *testing.T) { call2.After(call1) call3.After(call2) call4.After(call3) +======= +======= + + call = mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats) + calls = append(calls, call) + + // 2 + call = mockThrottler.EXPECT().Throttle(0) + call.Return(1 * time.Second) + calls = append(calls, call) + + // 3 + // Nothing gets mocked here because the order of evaluation in txThrottler.Throttle() evaluates first + // whether the priority allows for throttling or not, so no need to mock calls in mockThrottler.Throttle() + + // 4 + // Nothing gets mocked here because the order of evaluation in txThrottlerStateImpl.Throttle() evaluates first + // whether there is lag or not, so no call to the underlying mockThrottler is issued. + + call = mockThrottler.EXPECT().Close() + calls = append(calls, call) + + for i := 1; i < len(calls); i++ { + calls[i].After(calls[i-1]) + } +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) + + call = mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats) + calls = append(calls, call) + + // 2 + call = mockThrottler.EXPECT().Throttle(0) + call.Return(1 * time.Second) + calls = append(calls, call) + + // 3 + // Nothing gets mocked here because the order of evaluation in txThrottler.Throttle() evaluates first + // whether the priority allows for throttling or not, so no need to mock calls in mockThrottler.Throttle() + + // 4 + // Nothing gets mocked here because the order of evaluation in txThrottlerStateImpl.Throttle() evaluates first + // whether there is lag or not, so no call to the underlying mockThrottler is issued. + + call = mockThrottler.EXPECT().Close() + calls = append(calls, call) + + for i := 1; i < len(calls); i++ { + calls[i].After(calls[i-1]) + } +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) config := tabletenv.NewDefaultConfig() config.EnableTxThrottler = true @@ -124,6 +198,7 @@ func TestEnabledThrottler(t *testing.T) { Keyspace: "keyspace", Shard: "shard", }) +<<<<<<< HEAD if err := throttler.Open(); err != nil { t.Fatalf("want: nil, got: %v", err) } @@ -131,16 +206,78 @@ func TestEnabledThrottler(t *testing.T) { t.Errorf("want: false, got: %v", result) } throttler.state.StatsUpdate(tabletStats) +======= + + assert.Nil(t, throttlerImpl.Open()) + throttlerStateImpl, ok := throttlerImpl.state.(*txThrottlerStateImpl) + assert.True(t, ok) + assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerStateImpl.tabletTypes) + assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get()) + + // Stop the go routine that keeps updating the cached shard's max lag to prevent it from changing the value in a + // way that will interfere with how we manipulate that value in our tests to evaluate different cases: + throttlerStateImpl.done <- true + + // 1 should not throttle due to return value of underlying Throttle(), despite high lag + atomic.StoreInt64(&throttlerStateImpl.maxLag, 20) + assert.False(t, throttlerImpl.Throttle(100, "some-workload")) + assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Counts()["some-workload"]) + assert.Zero(t, throttlerImpl.requestsThrottled.Counts()["some-workload"]) + + throttlerImpl.state.StatsUpdate(tabletStats) // This calls replication lag thing + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksReadTotal.Counts()) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts()) +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) rdonlyTabletStats := &discovery.TabletHealth{ Target: &querypb.Target{ TabletType: topodatapb.TabletType_RDONLY, }, } +<<<<<<< HEAD // This call should not be forwarded to the go/vt/throttler.Throttler object. throttler.state.StatsUpdate(rdonlyTabletStats) // The second throttle call should reject. if result := throttler.Throttle(); result != true { t.Errorf("want: true, got: %v", result) +======= + // This call should not be forwarded to the go/vt/throttlerImpl.Throttler object. + throttlerImpl.state.StatsUpdate(rdonlyTabletStats) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1, "cell2.RDONLY": 1}, throttlerImpl.healthChecksReadTotal.Counts()) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts()) + + // 2 should throttle due to return value of underlying Throttle(), high lag & priority = 100 + assert.True(t, throttlerImpl.Throttle(100, "some-workload")) + assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Counts()["some-workload"]) + assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"]) + + // 3 should not throttle despite return value of underlying Throttle() and high lag, due to priority = 0 + assert.False(t, throttlerImpl.Throttle(0, "some-workload")) + assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Counts()["some-workload"]) + assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"]) + + // 4 should not throttle despite return value of underlying Throttle() and priority = 100, due to low lag + atomic.StoreInt64(&throttlerStateImpl.maxLag, 1) + assert.False(t, throttler.Throttle(100, "some-workload")) + assert.Equal(t, int64(4), throttlerImpl.requestsTotal.Counts()["some-workload"]) + assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"]) + + throttler.Close() + assert.Zero(t, throttlerImpl.throttlerRunning.Get()) +} + +func TestFetchKnownCells(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + { + ts := memorytopo.NewServer(ctx, "cell1", "cell2") + cells := fetchKnownCells(context.Background(), ts, &querypb.Target{Cell: "cell1"}) + assert.Equal(t, []string{"cell1", "cell2"}, cells) + } + { + ts := memorytopo.NewServer(ctx) + cells := fetchKnownCells(context.Background(), ts, &querypb.Target{Cell: "cell1"}) + assert.Equal(t, []string{"cell1"}, cells) +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) } throttler.Close() }