Skip to content

Commit

Permalink
rac2: add AdmittedTracker interface for use by RangeController
Browse files Browse the repository at this point in the history
The AdmittedTracker provides the latest AdmittedVector, and will be
implemented by processorImpl.

Informs #129508

Epic: CRDB-37515

Release note: None
  • Loading branch information
sumeerbhola committed Sep 4, 2024
1 parent 4e28f28 commit 4cab332
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 26 deletions.
43 changes: 21 additions & 22 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,17 @@ type RangeController interface {
// TODO(pav-kv): This interface a placeholder for the interface containing raft
// methods. Replace this as part of #128019.
type RaftInterface interface {
// FollowerState returns the current state of a follower. The value of
// Match, Next, Admitted are populated iff in StateReplicate. All entries >=
// FollowerStateRaftMuLocked returns the current state of a follower. The
// value of Match, Next are populated iff in StateReplicate. All entries >=
// Next have not had MsgApps constructed during the lifetime of this
// StateReplicate (they may have been constructed previously).
//
// When a follower transitions from {StateProbe,StateSnapshot} =>
// StateReplicate, we start trying to send MsgApps. We should
// notice such transitions both in HandleRaftEvent and SetReplicasLocked.
//
// RACv1 also cared about three other cases where the follower behaved as if
// it were disconnected (a) paused follower, (b) follower is behind, (c)
// follower is inactive (see
// replicaFlowControlIntegrationImpl.notActivelyReplicatingTo). (b) and (c)
// were needed since it paced at rate of slowest replica, while for regular
// work we will in v2 pace at slowest in quorum (and we don't care about
// elastic experiencing a hiccup, given it paces at rate of slowest). For
// (a), we plan to remove follower pausing. So the v2 code will be
// simplified.
//
// Requires Replica.raftMu to be held, Replica.mu is not held.
FollowerState(replicaID roachpb.ReplicaID) FollowerStateInfo
FollowerStateRaftMuLocked(replicaID roachpb.ReplicaID) FollowerStateInfo
}

type FollowerStateInfo struct {
Expand All @@ -108,10 +98,17 @@ type FollowerStateInfo struct {
// (Match, Next) is in-flight.
Match uint64
Next uint64
// TODO(kvoli): Find a better home for this, we need it for token return.
Term uint64
// Invariant: Admitted[i] <= Match.
Admitted [raftpb.NumPriorities]uint64
}

// AdmittedTracker is used to retrieve the latest admitted vector for a
// replica (including the leader).
type AdmittedTracker interface {
// GetAdmitted returns the latest AdmittedVector for replicaID. It returns
// an empty struct if the replicaID is not known. NB: the
// AdmittedVector.Admitted[i] value can transiently advance past
// FollowerStateInfo.Match, since the admitted tracking subsystem is
// separate from Raft.
GetAdmitted(replicaID roachpb.ReplicaID) AdmittedVector
}

// RaftEvent carries a RACv2-relevant subset of raft state sent to storage.
Expand Down Expand Up @@ -203,6 +200,7 @@ type RangeControllerOptions struct {
RaftInterface RaftInterface
Clock *hlc.Clock
CloseTimerScheduler ProbeToCloseTimerScheduler
AdmittedTracker AdmittedTracker
EvalWaitMetrics *EvalWaitMetrics
}

Expand Down Expand Up @@ -348,7 +346,7 @@ retry:
func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e RaftEvent) error {
shouldWaitChange := false
for r, rs := range rc.replicaMap {
info := rc.opts.RaftInterface.FollowerState(r)
info := rc.opts.RaftInterface.FollowerStateRaftMuLocked(r)
shouldWaitChange = rs.handleReadyState(ctx, info) || shouldWaitChange
}
// If there was a quorum change, update the voter sets, triggering the
Expand Down Expand Up @@ -504,7 +502,7 @@ func NewReplicaState(
sendTokenCounter: parent.opts.SSTokenCounter.Send(stream),
desc: desc,
}
state := parent.opts.RaftInterface.FollowerState(desc.ReplicaID)
state := parent.opts.RaftInterface.FollowerStateRaftMuLocked(desc.ReplicaID)
if state.State == tracker.StateReplicate {
rs.createReplicaSendStream()
}
Expand Down Expand Up @@ -655,7 +653,7 @@ func (rs *replicaState) handleReadyState(
rs.createReplicaSendStream()
shouldWaitChange = true
} else {
shouldWaitChange = rs.sendStream.makeConsistentInStateReplicate(ctx, info)
shouldWaitChange = rs.sendStream.makeConsistentInStateReplicate(ctx)
}

case tracker.StateSnapshot:
Expand Down Expand Up @@ -692,11 +690,12 @@ func (rss *replicaState) closeSendStream(ctx context.Context) {
}

func (rss *replicaSendStream) makeConsistentInStateReplicate(
ctx context.Context, info FollowerStateInfo,
ctx context.Context,
) (shouldWaitChange bool) {
av := rss.parent.parent.opts.AdmittedTracker.GetAdmitted(rss.parent.desc.ReplicaID)
rss.mu.Lock()
defer rss.mu.Unlock()
defer rss.returnTokens(ctx, rss.mu.tracker.Untrack(info.Term, info.Admitted))
defer rss.returnTokens(ctx, rss.mu.tracker.Untrack(av.Term, av.Admitted))

// The leader is always in state replicate.
if rss.parent.parent.opts.LocalReplicaID == rss.parent.desc.ReplicaID {
Expand Down
25 changes: 22 additions & 3 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ func (s *testingRCState) getOrInitRange(r testingRange) *testingRCRange {
RaftInterface: testRC,
Clock: s.clock,
CloseTimerScheduler: s.probeToCloseScheduler,
AdmittedTracker: testRC,
EvalWaitMetrics: s.evalMetrics,
}

Expand Down Expand Up @@ -272,7 +273,7 @@ type testingRCRange struct {
}
}

func (r *testingRCRange) FollowerState(replicaID roachpb.ReplicaID) FollowerStateInfo {
func (r *testingRCRange) FollowerStateRaftMuLocked(replicaID roachpb.ReplicaID) FollowerStateInfo {
r.mu.Lock()
defer r.mu.Unlock()

Expand All @@ -283,6 +284,17 @@ func (r *testingRCRange) FollowerState(replicaID roachpb.ReplicaID) FollowerStat
return replica.info
}

func (r *testingRCRange) GetAdmitted(replicaID roachpb.ReplicaID) AdmittedVector {
r.mu.Lock()
defer r.mu.Unlock()

replica, ok := r.mu.r.replicaSet[replicaID]
if !ok {
return AdmittedVector{}
}
return replica.av
}

func (r *testingRCRange) startWaitForEval(name string, pri admissionpb.WorkPriority) {
r.mu.Lock()
defer r.mu.Unlock()
Expand Down Expand Up @@ -320,8 +332,8 @@ func (r *testingRCRange) admit(
for _, replica := range r.mu.r.replicaSet {
if replica.desc.StoreID == storeID {
replica := replica
replica.info.Admitted[AdmissionToRaftPriority(pri)] = toIndex
replica.info.Term = term
replica.av.Admitted[AdmissionToRaftPriority(pri)] = toIndex
replica.av.Term = term
r.mu.r.replicaSet[replica.desc.ReplicaID] = replica
break
}
Expand Down Expand Up @@ -352,6 +364,7 @@ const invalidTrackerState = tracker.StateSnapshot + 1
type testingReplica struct {
desc roachpb.ReplicaDescriptor
info FollowerStateInfo
av AdmittedVector
}

func scanRanges(t *testing.T, input string) []testingRange {
Expand Down Expand Up @@ -796,6 +809,12 @@ func TestRangeController(t *testing.T) {
term, err = strconv.Atoi(parts[1])
require.NoError(t, err)

// TODO(sumeer): the test input only specifies an
// incremental change to the admitted vector, for a
// single priority. However, in practice, the whole
// vector will be updated, which also cleanly handles
// the case of an advancing term. Consider changing
// this to accept a non-incremental update.
parts[2] = strings.TrimSpace(parts[2])
require.True(t, strings.HasPrefix(parts[2], "to_index="))
parts[2] = strings.TrimPrefix(strings.TrimSpace(parts[2]), "to_index=")
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type tracked struct {

func (dt *Tracker) Init(stream kvflowcontrol.Stream) {
*dt = Tracker{
tracked: [int(raftpb.NumPriorities)][]tracked{},
tracked: [raftpb.NumPriorities][]tracked{},
stream: stream,
}
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,8 @@ type processorImpl struct {

var _ Processor = &processorImpl{}

var _ rac2.AdmittedTracker = &processorImpl{}

func NewProcessor(opts ProcessorOptions) Processor {
p := &processorImpl{opts: opts}
p.mu.enabledWhenLeader = opts.EnabledWhenLeaderLevel
Expand Down Expand Up @@ -963,6 +965,12 @@ func admittedIncreased(prev, next [raftpb.NumPriorities]uint64) bool {
return false
}

// GetAdmitted implements rac2.AdmittedTracker.
func (p *processorImpl) GetAdmitted(replicaID roachpb.ReplicaID) rac2.AdmittedVector {
// TODO(pav-kv): implement
return rac2.AdmittedVector{}
}

// RangeControllerFactoryImpl implements RangeControllerFactory.
//
// TODO(sumeer): replace with real implementation once RangeController impl is
Expand Down

0 comments on commit 4cab332

Please sign in to comment.