Skip to content

Commit

Permalink
kvserver: add leader-side TODOs for admitted vectors
Browse files Browse the repository at this point in the history
Epic: none
Release note: none
  • Loading branch information
pav-kv committed Sep 4, 2024
1 parent 379a101 commit 60ecf7b
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 10 deletions.
20 changes: 10 additions & 10 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,16 +453,6 @@ func (t *RaftTransport) handleRaftRequest(
log.Infof(ctx, "informed of below-raft %s", admittedEntries)
}
}
if len(req.AdmittedResponse) > 0 {
// NB: we do this via this special path instead of using the
// incomingMessageHandler since we don't have a full-fledged
// RaftMessageRequest for each range (each of these responses could be for
// a different range), and because what we need to do wrt queueing is much
// simpler (we don't need to worry about queue size since we only keep the
// latest message from each replica).
t.kvflowcontrol2.piggybackedResponseScheduler.ScheduleAdmittedResponseForRangeRACv2(
ctx, req.AdmittedResponse)
}
if req.ToReplica.StoreID == roachpb.StoreID(0) && len(req.AdmittedRaftLogEntries) > 0 {
// The fallback token dispatch mechanism does not specify a destination
// replica, and as such, there's no handler for it. We don't want to
Expand Down Expand Up @@ -543,6 +533,16 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer
t.kvflowControl.mu.Lock()
t.kvflowControl.mu.connectionTracker.markStoresConnected(storeIDs)
t.kvflowControl.mu.Unlock()
if len(batch.AdmittedStates) != 0 {
// TODO(pav-kv): dispatch admitted vectors to RACv2.
// NB: we do this via this special path instead of using the
// handleRaftRequest path since we don't have a full-fledged
// RaftMessageRequest for each range (each of these responses could
// be for a different range), and because what we need to do w.r.t.
// queueing is much simpler (we don't need to worry about queue size
// since we only keep the latest message from each replica).
_ = t.kvflowcontrol2.piggybackedResponseScheduler.ScheduleAdmittedResponseForRangeRACv2
}
if len(batch.Requests) == 0 {
continue
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,11 @@ func (r *Replica) stepRaftGroupRaftMuLocked(req *kvserverpb.RaftMessageRequest)
LowPriOverride: req.LowPriorityOverride,
}
}
case raftpb.MsgAppResp:
if req.AdmittedState.Term != 0 {
// TODO(pav-kv): dispatch admitted vector to RACv2 if one is attached.
_ = 0
}
}
err := raftGroup.Step(req.Message)
if errors.Is(err, raft.ErrProposalDropped) {
Expand Down

0 comments on commit 60ecf7b

Please sign in to comment.