Skip to content

Commit

Permalink
Merge #130427
Browse files Browse the repository at this point in the history
130427: replica_rac2: integrate admitted tracking on leader r=sumeerbhola,kvoli a=pav-kv

This PR plumbs the admitted vectors into the RACv2 `Processor`. Admitted vectors are applied to `replicaSendStream` and cause releasing the tokens held by the leader.

The admitted vectors are plumbed to `replicaSendStream` via 3 paths:
- The leader's own admitted vector is applied from `HandleRaftReadyRaftMuLocked`, calling into `RangeController.AdmitRaftMuLocked` directly.
- The followers' admitted vectors in most cases are received via annotated `RaftMessageRequest.AdmittedState`, which is dispatched from `stepRaftGroupRaftMuLocked` into the `Processor` via `Processor.AdmitRaftMuLocked` method.
- The followers' piggybacked admitted vectors from `RaftMessageRequestBatch` are queued on the `Processor` via `EnqueuePiggybackedAdmittedAtLeader` method, and later applied from `HandleRaftReadyRaftMuLocked`.

Part of #129508

Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
craig[bot] and pav-kv committed Sep 12, 2024
2 parents 31794d8 + 49e476c commit 1978cde
Show file tree
Hide file tree
Showing 13 changed files with 203 additions and 191 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/flow_control_raft_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,6 @@ func TestFlowControlRaftTransportV2(t *testing.T) {
type noopPiggybackedAdmittedResponseScheduler struct{}

func (s noopPiggybackedAdmittedResponseScheduler) ScheduleAdmittedResponseForRangeRACv2(
ctx context.Context, msgs []kvflowcontrolpb.AdmittedResponseForRange,
context.Context, []kvflowcontrolpb.PiggybackedAdmittedState,
) {
}
12 changes: 6 additions & 6 deletions pkg/kv/kvserver/flow_control_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ type StoresForRACv2 interface {
// processing.
type PiggybackedAdmittedResponseScheduler interface {
ScheduleAdmittedResponseForRangeRACv2(
ctx context.Context, msgs []kvflowcontrolpb.AdmittedResponseForRange)
ctx context.Context, msgs []kvflowcontrolpb.PiggybackedAdmittedState)
}

func MakeStoresForRACv2(stores *Stores) StoresForRACv2 {
Expand Down Expand Up @@ -332,20 +332,20 @@ func (ss *storesForRACv2) lookup(

// ScheduleAdmittedResponseForRangeRACv2 implements PiggybackedAdmittedResponseScheduler.
func (ss *storesForRACv2) ScheduleAdmittedResponseForRangeRACv2(
ctx context.Context, msgs []kvflowcontrolpb.AdmittedResponseForRange,
ctx context.Context, msgs []kvflowcontrolpb.PiggybackedAdmittedState,
) {
ls := (*Stores)(ss)
for _, m := range msgs {
s, err := ls.GetStore(m.LeaderStoreID)
s, err := ls.GetStore(m.ToStoreID)
if err != nil {
log.Errorf(ctx, "store %s not found", m.LeaderStoreID)
log.Errorf(ctx, "store %s not found", m.ToStoreID)
continue
}
repl := s.GetReplicaIfExists(m.RangeID)
if repl == nil {
if repl == nil || repl.replicaID != m.ToReplicaID {
continue
}
repl.flowControlV2.EnqueuePiggybackedAdmittedAtLeader(m.Msg)
repl.flowControlV2.EnqueuePiggybackedAdmittedAtLeader(m.FromReplicaID, m.Admitted)
s.scheduler.EnqueueRACv2PiggybackAdmitted(m.RangeID)
}
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,23 @@ type AdmittedVector struct {
Admitted [raftpb.NumPriorities]uint64
}

// Merge merges two admitted vectors into one. A higher-term vector always wins.
// If the terms match, the admitted indices are computed as max of the two, for
// each priority.
func (av AdmittedVector) Merge(other AdmittedVector) AdmittedVector {
if other.Term > av.Term {
return other
} else if other.Term < av.Term {
return av
}
for pri, my := range av.Admitted {
if their := other.Admitted[pri]; their > my {
av.Admitted[pri] = their
}
}
return av
}

// LogTracker tracks the durable and logically admitted state of a raft log.
//
// Writes to a raft log are ordered by LogMark (term, index) where term is the
Expand Down
23 changes: 23 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,29 @@ func (l *LogTracker) check(t *testing.T) {
}
}

func TestAdmittedVectorMerge(t *testing.T) {
av := func(term uint64, indices ...uint64) AdmittedVector {
av := AdmittedVector{Term: term}
require.Len(t, indices, len(av.Admitted))
copy(av.Admitted[:], indices)
return av
}
for _, tt := range [][3]AdmittedVector{
// Different terms. Higher term wins. Merge is symmetric.
{av(3, 10, 11, 12, 12), av(4, 10, 10, 20, 20), av(4, 10, 10, 20, 20)},
{av(4, 10, 10, 20, 20), av(3, 10, 11, 12, 12), av(4, 10, 10, 20, 20)},
// Same term. Highest index wins at each priority.
{av(3, 10, 10, 10, 10), av(3, 20, 20, 20, 20), av(3, 20, 20, 20, 20)},
{av(3, 20, 20, 20, 20), av(3, 10, 10, 10, 10), av(3, 20, 20, 20, 20)},
{av(3, 10, 11, 12, 12), av(3, 8, 9, 20, 20), av(3, 10, 11, 20, 20)},
{av(3, 5, 10, 5, 10), av(3, 10, 5, 10, 5), av(3, 10, 10, 10, 10)},
} {
t.Run("", func(t *testing.T) {
require.Equal(t, tt[2], tt[0].Merge(tt[1]))
})
}
}

func TestLogTrackerAppend(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
54 changes: 34 additions & 20 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ type RangeController interface {
//
// Requires replica.raftMu to be held.
HandleSchedulerEventRaftMuLocked(ctx context.Context) error
// AdmitRaftMuLocked handles the notification about the given replica's
// admitted vector change. No-op if the replica is not known, or the admitted
// vector is stale (either in Term, or the indices).
//
// Requires replica.raftMu to be held.
AdmitRaftMuLocked(context.Context, roachpb.ReplicaID, AdmittedVector)
// SetReplicasRaftMuLocked sets the replicas of the range. The caller will
// never mutate replicas, and neither should the callee.
//
Expand Down Expand Up @@ -100,17 +106,6 @@ type FollowerStateInfo struct {
Next uint64
}

// AdmittedTracker is used to retrieve the latest admitted vector for a
// replica (including the leader).
type AdmittedTracker interface {
// GetAdmitted returns the latest AdmittedVector for replicaID. It returns
// an empty struct if the replicaID is not known. NB: the
// AdmittedVector.Admitted[i] value can transiently advance past
// FollowerStateInfo.Match, since the admitted tracking subsystem is
// separate from Raft.
GetAdmitted(replicaID roachpb.ReplicaID) AdmittedVector
}

// RaftEvent carries a RACv2-relevant subset of raft state sent to storage.
type RaftEvent struct {
// Term is the leader term on whose behalf the entries or snapshot are
Expand Down Expand Up @@ -200,7 +195,6 @@ type RangeControllerOptions struct {
RaftInterface RaftInterface
Clock *hlc.Clock
CloseTimerScheduler ProbeToCloseTimerScheduler
AdmittedTracker AdmittedTracker
EvalWaitMetrics *EvalWaitMetrics
}

Expand Down Expand Up @@ -411,6 +405,19 @@ func (rc *rangeController) HandleSchedulerEventRaftMuLocked(ctx context.Context)
panic("unimplemented")
}

// AdmitRaftMuLocked handles the notification about the given replica's
// admitted vector change. No-op if the replica is not known, or the admitted
// vector is stale (either in Term, or the indices).
//
// Requires replica.raftMu to be held.
func (rc *rangeController) AdmitRaftMuLocked(
ctx context.Context, replicaID roachpb.ReplicaID, av AdmittedVector,
) {
if rs, ok := rc.replicaMap[replicaID]; ok {
rs.admit(ctx, av)
}
}

// SetReplicasRaftMuLocked sets the replicas of the range. The caller will
// never mutate replicas, and neither should the callee.
//
Expand Down Expand Up @@ -578,6 +585,12 @@ func (rss *replicaSendStream) changeConnectedStateLocked(state connectedState, n
rss.mu.connectedStateStart = now
}

func (rss *replicaSendStream) admit(ctx context.Context, av AdmittedVector) {
rss.mu.Lock()
defer rss.mu.Unlock()
rss.returnTokens(ctx, rss.mu.tracker.Untrack(av.Term, av.Admitted))
}

func (rs *replicaState) createReplicaSendStream() {
// Must be in StateReplicate on creation.
rs.sendStream = &replicaSendStream{
Expand Down Expand Up @@ -724,6 +737,12 @@ func (rs *replicaState) handleReadyState(
return shouldWaitChange
}

func (rs *replicaState) admit(ctx context.Context, av AdmittedVector) {
if rss := rs.sendStream; rss != nil {
rss.admit(ctx, av)
}
}

func (rss *replicaState) closeSendStream(ctx context.Context) {
rss.sendStream.mu.Lock()
defer rss.sendStream.mu.Unlock()
Expand All @@ -740,11 +759,6 @@ func (rss *replicaState) closeSendStream(ctx context.Context) {
func (rss *replicaSendStream) makeConsistentInStateReplicate(
ctx context.Context,
) (shouldWaitChange bool) {
av := rss.parent.parent.opts.AdmittedTracker.GetAdmitted(rss.parent.desc.ReplicaID)
rss.mu.Lock()
defer rss.mu.Unlock()
defer rss.returnTokens(ctx, rss.mu.tracker.Untrack(av.Term, av.Admitted))

// The leader is always in state replicate.
if rss.parent.parent.opts.LocalReplicaID == rss.parent.desc.ReplicaID {
if rss.mu.connectedState != replicate {
Expand Down Expand Up @@ -798,10 +812,10 @@ func (rss *replicaSendStream) returnTokens(
ctx context.Context, returned [raftpb.NumPriorities]kvflowcontrol.Tokens,
) {
for pri, tokens := range returned {
pri := raftpb.Priority(pri)
if tokens > 0 {
rss.parent.evalTokenCounter.Return(ctx, WorkClassFromRaftPriority(pri), tokens)
rss.parent.sendTokenCounter.Return(ctx, WorkClassFromRaftPriority(pri), tokens)
pri := WorkClassFromRaftPriority(raftpb.Priority(pri))
rss.parent.evalTokenCounter.Return(ctx, pri, tokens)
rss.parent.sendTokenCounter.Return(ctx, pri, tokens)
}
}
}
Expand Down
40 changes: 8 additions & 32 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ func (s *testingRCState) getOrInitRange(r testingRange) *testingRCRange {
RaftInterface: testRC,
Clock: s.clock,
CloseTimerScheduler: s.probeToCloseScheduler,
AdmittedTracker: testRC,
EvalWaitMetrics: s.evalMetrics,
}

Expand Down Expand Up @@ -284,17 +283,6 @@ func (r *testingRCRange) FollowerStateRaftMuLocked(replicaID roachpb.ReplicaID)
return replica.info
}

func (r *testingRCRange) GetAdmitted(replicaID roachpb.ReplicaID) AdmittedVector {
r.mu.Lock()
defer r.mu.Unlock()

replica, ok := r.mu.r.replicaSet[replicaID]
if !ok {
return AdmittedVector{}
}
return replica.av
}

func (r *testingRCRange) startWaitForEval(name string, pri admissionpb.WorkPriority) {
r.mu.Lock()
defer r.mu.Unlock()
Expand All @@ -319,29 +307,15 @@ func (r *testingRCRange) startWaitForEval(name string, pri admissionpb.WorkPrior
}()
}

func (r *testingRCRange) admit(
ctx context.Context,
t *testing.T,
storeID roachpb.StoreID,
term uint64,
toIndex uint64,
pri admissionpb.WorkPriority,
) {
func (r *testingRCRange) admit(ctx context.Context, storeID roachpb.StoreID, av AdmittedVector) {
r.mu.Lock()

defer r.mu.Unlock()
for _, replica := range r.mu.r.replicaSet {
if replica.desc.StoreID == storeID {
replica := replica
replica.av.Admitted[AdmissionToRaftPriority(pri)] = toIndex
replica.av.Term = term
r.mu.r.replicaSet[replica.desc.ReplicaID] = replica
break
r.rc.AdmitRaftMuLocked(ctx, replica.desc.ReplicaID, av)
return
}
}

r.mu.Unlock()
// Send an empty raft event in order to trigger potential token return.
require.NoError(t, r.rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{}))
}

type testingRange struct {
Expand All @@ -364,7 +338,6 @@ const invalidTrackerState = tracker.StateSnapshot + 1
type testingReplica struct {
desc roachpb.ReplicaDescriptor
info FollowerStateInfo
av AdmittedVector
}

func scanRanges(t *testing.T, input string) []testingRange {
Expand Down Expand Up @@ -825,7 +798,10 @@ func TestRangeController(t *testing.T) {
require.True(t, strings.HasPrefix(parts[3], "pri="))
parts[3] = strings.TrimPrefix(strings.TrimSpace(parts[3]), "pri=")
pri := parsePriority(t, parts[3])
state.ranges[lastRangeID].admit(ctx, t, roachpb.StoreID(storeID), uint64(term), uint64(to_index), pri)

av := AdmittedVector{Term: uint64(term)}
av.Admitted[AdmissionToRaftPriority(pri)] = uint64(to_index)
state.ranges[lastRangeID].admit(ctx, roachpb.StoreID(storeID), av)
}
}
return state.tokenCountsString()
Expand Down
Loading

0 comments on commit 1978cde

Please sign in to comment.