diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index b498f2443779..f8ee06268f02 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -77,8 +77,8 @@ type RangeController interface { // TODO(pav-kv): This interface a placeholder for the interface containing raft // methods. Replace this as part of #128019. type RaftInterface interface { - // FollowerState returns the current state of a follower. The value of - // Match, Next, Admitted are populated iff in StateReplicate. All entries >= + // FollowerStateRaftMuLocked returns the current state of a follower. The + // value of Match, Next are populated iff in StateReplicate. All entries >= // Next have not had MsgApps constructed during the lifetime of this // StateReplicate (they may have been constructed previously). // @@ -86,18 +86,8 @@ type RaftInterface interface { // StateReplicate, we start trying to send MsgApps. We should // notice such transitions both in HandleRaftEvent and SetReplicasLocked. // - // RACv1 also cared about three other cases where the follower behaved as if - // it were disconnected (a) paused follower, (b) follower is behind, (c) - // follower is inactive (see - // replicaFlowControlIntegrationImpl.notActivelyReplicatingTo). (b) and (c) - // were needed since it paced at rate of slowest replica, while for regular - // work we will in v2 pace at slowest in quorum (and we don't care about - // elastic experiencing a hiccup, given it paces at rate of slowest). For - // (a), we plan to remove follower pausing. So the v2 code will be - // simplified. - // // Requires Replica.raftMu to be held, Replica.mu is not held. - FollowerState(replicaID roachpb.ReplicaID) FollowerStateInfo + FollowerStateRaftMuLocked(replicaID roachpb.ReplicaID) FollowerStateInfo } type FollowerStateInfo struct { @@ -107,10 +97,17 @@ type FollowerStateInfo struct { // (Match, Next) is in-flight. Match uint64 Next uint64 - // TODO(kvoli): Find a better home for this, we need it for token return. - Term uint64 - // Invariant: Admitted[i] <= Match. - Admitted [raftpb.NumPriorities]uint64 +} + +// AdmittedTracker is used to retrieve the latest admitted vector for a +// replica (including the leader). +type AdmittedTracker interface { + // GetAdmitted returns the latest AdmittedVector for replicaID. It returns + // an empty struct if the replicaID is not known. NB: the + // AdmittedVector.Admitted[i] value can transiently advance past + // FollowerStateInfo.Match, since the admitted tracking subsystem is + // separate from Raft. + GetAdmitted(replicaID roachpb.ReplicaID) AdmittedVector } // RaftEvent carries a RACv2-relevant subset of raft state sent to storage. @@ -182,6 +179,7 @@ type RangeControllerOptions struct { // needed (keyed by (tenantID, storeID)). SSTokenCounter *StreamTokenCounterProvider RaftInterface RaftInterface + AdmittedTracker AdmittedTracker Clock *hlc.Clock EvalWaitMetrics *EvalWaitMetrics } @@ -328,7 +326,7 @@ retry: func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e RaftEvent) error { shouldWaitChange := false for r, rs := range rc.replicaMap { - info := rc.opts.RaftInterface.FollowerState(r) + info := rc.opts.RaftInterface.FollowerStateRaftMuLocked(r) shouldWaitChange = shouldWaitChange || rs.handleReadyState(ctx, info) } // If there was a quorum change, update the voter sets, triggering the @@ -484,7 +482,7 @@ func NewReplicaState( sendTokenCounter: parent.opts.SSTokenCounter.Send(stream), desc: desc, } - state := parent.opts.RaftInterface.FollowerState(desc.ReplicaID) + state := parent.opts.RaftInterface.FollowerStateRaftMuLocked(desc.ReplicaID) if state.State == tracker.StateReplicate { rs.createReplicaSendStream() } @@ -592,7 +590,7 @@ func (rs *replicaState) handleReadyState( rs.createReplicaSendStream() shouldWaitChange = true } else { - shouldWaitChange = rs.sendStream.makeConsistentInStateReplicate(ctx, info) + shouldWaitChange = rs.sendStream.makeConsistentInStateReplicate(ctx) } case tracker.StateSnapshot: if rs.sendStream != nil { @@ -628,11 +626,12 @@ func (rss *replicaState) closeSendStream(ctx context.Context) { } func (rss *replicaSendStream) makeConsistentInStateReplicate( - ctx context.Context, info FollowerStateInfo, + ctx context.Context, ) (shouldWaitChange bool) { + av := rss.parent.parent.opts.AdmittedTracker.GetAdmitted(rss.parent.desc.ReplicaID) rss.mu.Lock() defer rss.mu.Unlock() - defer rss.returnTokens(ctx, rss.mu.tracker.Untrack(info.Term, info.Admitted)) + defer rss.returnTokens(ctx, rss.mu.tracker.Untrack(av.Term, av.Admitted)) // The leader is always in state replicate. if rss.parent.parent.opts.LocalReplicaID == rss.parent.desc.ReplicaID { diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go index c18e31624d58..3c80834759c6 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go @@ -59,7 +59,7 @@ type testingRCRange struct { } } -func (r *testingRCRange) FollowerState(replicaID roachpb.ReplicaID) FollowerStateInfo { +func (r *testingRCRange) FollowerStateRaftMuLocked(replicaID roachpb.ReplicaID) FollowerStateInfo { r.mu.Lock() defer r.mu.Unlock() @@ -70,6 +70,17 @@ func (r *testingRCRange) FollowerState(replicaID roachpb.ReplicaID) FollowerStat return replica.info } +func (r *testingRCRange) GetAdmitted(replicaID roachpb.ReplicaID) AdmittedVector { + r.mu.Lock() + defer r.mu.Unlock() + + replica, ok := r.mu.r.replicaSet[replicaID] + if !ok { + return AdmittedVector{} + } + return replica.av +} + func (r *testingRCRange) startWaitForEval(name string, pri admissionpb.WorkPriority) { r.mu.Lock() defer r.mu.Unlock() @@ -107,8 +118,8 @@ func (r *testingRCRange) admit( for _, replica := range r.mu.r.replicaSet { if replica.desc.StoreID == storeID { replica := replica - replica.info.Admitted[AdmissionToRaftPriority(pri)] = toIndex - replica.info.Term = term + replica.av.Admitted[AdmissionToRaftPriority(pri)] = toIndex + replica.av.Term = term r.mu.r.replicaSet[replica.desc.ReplicaID] = replica break } @@ -139,6 +150,7 @@ const invalidTrackerState = tracker.StateSnapshot + 1 type testingReplica struct { desc roachpb.ReplicaDescriptor info FollowerStateInfo + av AdmittedVector } func scanRanges(t *testing.T, input string) []testingRange { @@ -539,6 +551,7 @@ func TestRangeController(t *testing.T) { LocalReplicaID: r.localReplicaID, SSTokenCounter: ssTokenCounter, RaftInterface: testRC, + AdmittedTracker: testRC, Clock: clock, EvalWaitMetrics: evalMetrics, } diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go b/pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go index f66789fac666..dd7f855a036b 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go @@ -38,7 +38,7 @@ type tracked struct { func (dt *Tracker) Init(stream kvflowcontrol.Stream) { *dt = Tracker{ - tracked: [int(raftpb.NumPriorities)][]tracked{}, + tracked: [raftpb.NumPriorities][]tracked{}, stream: stream, } } diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index 9feac78bfea6..b437df2f5e6d 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -473,6 +473,8 @@ type processorImpl struct { var _ Processor = &processorImpl{} +var _ rac2.AdmittedTracker = &processorImpl{} + func NewProcessor(opts ProcessorOptions) Processor { p := &processorImpl{opts: opts} p.mu.enabledWhenLeader = opts.EnabledWhenLeaderLevel @@ -965,6 +967,12 @@ func admittedIncreased(prev, next [raftpb.NumPriorities]uint64) bool { return false } +// GetAdmitted implements rac2.AdmittedTracker. +func (p *processorImpl) GetAdmitted(replicaID roachpb.ReplicaID) rac2.AdmittedVector { + // TODO(pav-kv): implement + return rac2.AdmittedVector{} +} + // RangeControllerFactoryImpl implements RangeControllerFactory. // // TODO(sumeer): replace with real implementation once RangeController impl is