Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rac2: return tracked deductions on close/removal #130684

Merged
merged 1 commit into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 51 additions & 89 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ type RaftInterface interface {
//
// When a follower transitions from {StateProbe,StateSnapshot} =>
// StateReplicate, we start trying to send MsgApps. We should
// notice such transitions both in HandleRaftEvent and SetReplicasLocked.
// notice such transitions both in HandleRaftEvent and
// SetReplicasRaftMuLocked.
//
// Requires Replica.raftMu to be held, Replica.mu is not held.
FollowerStateRaftMuLocked(replicaID roachpb.ReplicaID) FollowerStateInfo
Expand Down Expand Up @@ -455,6 +456,13 @@ func (rc *rangeController) CloseRaftMuLocked(ctx context.Context) {
rc.mu.voterSets = nil
close(rc.mu.waiterSetRefreshCh)
rc.mu.waiterSetRefreshCh = nil
// Return any tracked token deductions, as we don't expect to receive more
// AdmittedVector updates.
for _, rs := range rc.replicaMap {
if rs.sendStream != nil {
rs.closeSendStream(ctx)
}
}
}

// InspectRaftMuLocked returns a handle containing the state of the range
Expand Down Expand Up @@ -497,6 +505,11 @@ func (rc *rangeController) updateReplicaSet(ctx context.Context, newSet ReplicaS
for r := range prevSet {
desc, ok := newSet[r]
if !ok {
if rs := rc.replicaMap[r]; rs.sendStream != nil {
// The replica is no longer part of the range, so we don't expect any
// tracked token deductions to be returned. Return them now.
rs.closeSendStream(ctx)
}
delete(rc.replicaMap, r)
} else {
rs := rc.replicaMap[r]
Expand Down Expand Up @@ -596,11 +609,9 @@ func NewReplicaState(
sendTokenCounter: parent.opts.SSTokenCounter.Send(stream),
desc: desc,
}
state := parent.opts.RaftInterface.FollowerStateRaftMuLocked(desc.ReplicaID)
if state.State == tracker.StateReplicate {
rs.createReplicaSendStream()
}

// Don't bother creating the replicaSendStream here. We will do this in
// the next Ready which will be called immediately after. This centralizes
// the logic of replicaSendStream creation.
return rs
}

Expand All @@ -611,7 +622,7 @@ type replicaSendStream struct {
syncutil.Mutex
// connectedStateStart is the time when the connectedState was last
// transitioned from one state to another e.g., from replicate to
// probeRecentlyReplicate or snapshot to replicate.
// probeRecentlyReplicate or vice versa.
connectedState connectedState
connectedStateStart time.Time
tracker Tracker
Expand Down Expand Up @@ -642,13 +653,8 @@ func (rs *replicaState) createReplicaSendStream() {
}

func (rs *replicaState) isStateReplicate() bool {
if rs.sendStream == nil {
return false
}
rs.sendStream.mu.Lock()
defer rs.sendStream.mu.Unlock()

return rs.sendStream.mu.connectedState.shouldWaitForElasticEvalTokens()
// probeRecentlyReplicate is also included in this state.
return rs.sendStream != nil
}

type entryFCState struct {
Expand Down Expand Up @@ -727,20 +733,7 @@ func (rs *replicaState) handleReadyState(
// stream.
should = true
} else if state != probeRecentlyReplicate {
// This is the first time we've seen the replica change to StateProbe,
// update the connected state and start time. If the state doesn't
// change within probeRecentlyReplicateDuration, we will close the
// stream. Also schedule an event, so that even if there are no
// entries, we will still reliably close the stream if still in
// StateProbe.
//
// TODO(sumeer): think through whether we should actually be returning
// tokens immediately here. Currently we are not. e.g.,
// probeRecentlyReplicate only affects whether to wait on this replica
// for eval, and otherwise it behaves like a closed replicaSendStream.
rs.sendStream.changeConnectedStateLocked(probeRecentlyReplicate, now)
rs.parent.opts.CloseTimerScheduler.ScheduleSendStreamCloseRaftMuLocked(
ctx, rs.parent.opts.RangeID, probeRecentlyReplicateDuration())
rs.sendStream.changeToProbeLocked(ctx, now)
}
return should
}(); shouldClose {
Expand All @@ -758,41 +751,46 @@ func (rs *replicaState) handleReadyState(

case tracker.StateSnapshot:
if rs.sendStream != nil {
switch func() connectedState {
rs.sendStream.mu.Lock()
defer rs.sendStream.mu.Unlock()
return rs.sendStream.mu.connectedState
}() {
case replicate:
rs.sendStream.changeToStateSnapshot(ctx)
shouldWaitChange = true
case probeRecentlyReplicate:
rs.closeSendStream(ctx)
shouldWaitChange = true
case snapshot:
}
rs.closeSendStream(ctx)
shouldWaitChange = true
}
}
return shouldWaitChange
}

func (rss *replicaState) closeSendStream(ctx context.Context) {
rss.sendStream.mu.Lock()
defer rss.sendStream.mu.Unlock()

rss.sendStream.closeLocked(ctx)
rss.sendStream = nil
}

func (rs *replicaState) admit(ctx context.Context, av AdmittedVector) {
if rss := rs.sendStream; rss != nil {
rss.admit(ctx, av)
}
}

func (rss *replicaState) closeSendStream(ctx context.Context) {
rss.sendStream.mu.Lock()
defer rss.sendStream.mu.Unlock()

if rss.sendStream.mu.connectedState != snapshot {
// changeToStateSnapshot returns all tokens, as we have no liveness
// guarantee of their return with the send stream now closed.
rss.sendStream.changeToStateSnapshotLocked(ctx)
}
rss.sendStream.mu.closed = true
rss.sendStream = nil
func (rss *replicaSendStream) closeLocked(ctx context.Context) {
// Return all tokens.
rss.returnTokens(ctx, rss.mu.tracker.UntrackAll())
rss.mu.closed = true
}

func (rss *replicaSendStream) changeToProbeLocked(ctx context.Context, now time.Time) {
// This is the first time we've seen the replica change to StateProbe,
// update the connected state and start time. If the state doesn't
// change within probeRecentlyReplicateDuration, we will close the
// stream. Also schedule an event, so that even if there are no
// entries, we will still reliably close the stream if still in
// StateProbe.
rss.changeConnectedStateLocked(probeRecentlyReplicate, now)
rss.parent.parent.opts.CloseTimerScheduler.ScheduleSendStreamCloseRaftMuLocked(
ctx, rss.parent.parent.opts.RangeID, probeRecentlyReplicateDuration())
// Return all tokens since other ranges may need them, and it may be some
// time before this replica transitions back to StateReplicate.
rss.returnTokens(ctx, rss.mu.tracker.UntrackAll())
}

func (rss *replicaSendStream) makeConsistentInStateReplicate(
Expand All @@ -816,35 +814,10 @@ func (rss *replicaSendStream) makeConsistentInStateReplicate(
// mutex, but we expect transitions to replicate to be rarer than replicas
// remaining in replicate.
rss.changeConnectedStateLocked(replicate, rss.parent.parent.opts.Clock.PhysicalTime())
case snapshot:
rss.changeConnectedStateLocked(replicate, rss.parent.parent.opts.Clock.PhysicalTime())
shouldWaitChange = true
}
return shouldWaitChange
}

// changeToStateSnapshot changes the connected state to snapshot and returns
// all tracked entries' tokens.
func (rss *replicaSendStream) changeToStateSnapshot(ctx context.Context) {
rss.mu.Lock()
defer rss.mu.Unlock()

rss.changeToStateSnapshotLocked(ctx)
}

// changeToStateSnapshot changes the connected state to snapshot and returns
// all tracked entries' tokens.
//
// Requires rs.mu to be held.
func (rss *replicaSendStream) changeToStateSnapshotLocked(ctx context.Context) {
rss.changeConnectedStateLocked(snapshot, rss.parent.parent.opts.Clock.PhysicalTime())
// Since the replica is now in StateSnapshot, there is no need for Raft to
// send MsgApp pings to discover what has been missed. So there is no
// liveness guarantee on when these tokens will be returned, and therefore we
// return all tokens in the tracker.
rss.returnTokens(ctx, rss.mu.tracker.UntrackAll())
}

// returnTokens takes the tokens untracked by the tracker and returns them to
// the eval and send token counters.
func (rss *replicaSendStream) returnTokens(
Expand Down Expand Up @@ -897,21 +870,12 @@ type connectedState uint32
// latter.
//
// Initial states: replicate
// State transitions:
//
// replicate <=> {probeRecentlyReplicate, snapshot}
// snapshot => replicaSendStream closed (when observe StateProbe)
// probeRecentlyReplicate => replicaSendStream closed (after short delay)
// State transitions: replicate <=> probeRecentlyReplicate
const (
replicate connectedState = iota
probeRecentlyReplicate
snapshot
)

func (cs connectedState) shouldWaitForElasticEvalTokens() bool {
return cs == replicate || cs == probeRecentlyReplicate
}

func (cs connectedState) String() string {
return redact.StringWithoutMarkers(cs)
}
Expand All @@ -923,8 +887,6 @@ func (cs connectedState) SafeFormat(w redact.SafePrinter, _ rune) {
w.SafeString("replicate")
case probeRecentlyReplicate:
w.SafeString("probeRecentlyReplicate")
case snapshot:
w.SafeString("snapshot")
default:
panic(fmt.Sprintf("unknown connectedState %v", cs))
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (s *testingRCState) maybeSetInitialTokens(r testingRange) {
}
}

func (s *testingRCState) getOrInitRange(r testingRange) *testingRCRange {
func (s *testingRCState) getOrInitRange(t *testing.T, r testingRange) *testingRCRange {
testRC, ok := s.ranges[r.rangeID]
if !ok {
testRC = &testingRCRange{}
Expand All @@ -251,6 +251,9 @@ func (s *testingRCState) getOrInitRange(r testingRange) *testingRCRange {
s.ranges[r.rangeID] = testRC
}
s.maybeSetInitialTokens(r)
// Send through an empty raft event to trigger creating necessary replica
// send streams for the range.
require.NoError(t, testRC.rc.HandleRaftEventRaftMuLocked(s.testCtx, RaftEvent{}))
return testRC
}

Expand Down Expand Up @@ -658,7 +661,7 @@ func TestRangeController(t *testing.T) {
}

for _, r := range scanRanges(t, d.Input) {
state.getOrInitRange(r)
state.getOrInitRange(t, r)
}
return state.rangeStateString() + state.tokenCountsString()

Expand Down Expand Up @@ -731,7 +734,7 @@ func TestRangeController(t *testing.T) {

case "set_replicas":
for _, r := range scanRanges(t, d.Input) {
testRC := state.getOrInitRange(r)
testRC := state.getOrInitRange(t, r)
func() {
testRC.mu.Lock()
defer testRC.mu.Unlock()
Expand Down
112 changes: 112 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/close
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# This test asserts that closing a range controller, or switching a replica
# state to StateSnapshot causes all tracked token deductions to be returned to
# the underlying stream token counts.
init
range_id=1 tenant_id=1 local_replica_id=1
store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate
store_id=2 replica_id=2 type=VOTER_FULL state=StateReplicate
store_id=3 replica_id=3 type=VOTER_FULL state=StateReplicate
----
r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3]
t1/s1: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB
t1/s2: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB
t1/s3: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB

# There should be no tracked entries for the range.
stream_state range_id=1
----
(n1,s1):1: state=replicate closed=false
++++
(n2,s2):2: state=replicate closed=false
++++
(n3,s3):3: state=replicate closed=false
++++


# Submit a few entries to the range. We will partially admit these on different
# replica stores before closing streams, and ultimately the range controller.
raft_event
range_id=1
term=1 index=1 pri=NormalPri size=1MiB
term=1 index=2 pri=NormalPri size=1MiB
term=1 index=3 pri=NormalPri size=1MiB
----
t1/s1: reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB
t1/s2: reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB
t1/s3: reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB

stream_state range_id=1
----
(n1,s1):1: state=replicate closed=false
NormalPri:
term=1 index=1 tokens=1048576
term=1 index=2 tokens=1048576
term=1 index=3 tokens=1048576
++++
(n2,s2):2: state=replicate closed=false
NormalPri:
term=1 index=1 tokens=1048576
term=1 index=2 tokens=1048576
term=1 index=3 tokens=1048576
++++
(n3,s3):3: state=replicate closed=false
NormalPri:
term=1 index=1 tokens=1048576
term=1 index=2 tokens=1048576
term=1 index=3 tokens=1048576
++++

admit
range_id=1
store_id=1 term=1 to_index=2 pri=NormalPri
store_id=2 term=1 to_index=1 pri=NormalPri
----
t1/s1: reg=+15 MiB/+16 MiB ela=+7.0 MiB/+8.0 MiB
t1/s2: reg=+14 MiB/+16 MiB ela=+6.0 MiB/+8.0 MiB
t1/s3: reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB

# Change s3 to StateSnapshot. This should close the send stream and return the
# tracked deductions for entries in [1,3].
set_replicas
range_id=1 tenant_id=1 local_replica_id=1
store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate
store_id=2 replica_id=2 type=VOTER_FULL state=StateReplicate
store_id=3 replica_id=3 type=VOTER_FULL state=StateSnapshot
----
r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3]

stream_state range_id=1
----
(n1,s1):1: state=replicate closed=false
NormalPri:
term=1 index=3 tokens=1048576
++++
(n2,s2):2: state=replicate closed=false
NormalPri:
term=1 index=2 tokens=1048576
term=1 index=3 tokens=1048576
++++
(n3,s3):3: closed

# The tokens should be returned as well, s3 should have no outstanding tokens.
adjust_tokens
store_id=1 pri=HighPri tokens=0
----
t1/s1: reg=+15 MiB/+16 MiB ela=+7.0 MiB/+8.0 MiB
t1/s2: reg=+14 MiB/+16 MiB ela=+6.0 MiB/+8.0 MiB
t1/s3: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB

# Close the range controller. This should return the tracked deductions for
# entry 3 on s1 and [2,3] on s2.
close_rcs
----
range_id=1 tenant_id={1} local_replica_id=1

# Print out the available token state. There should be no outstanding tokens.
# Send a noop token adjustment to get the state.
adjust_tokens
store_id=1 pri=HighPri tokens=0
----
t1/s1: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB
t1/s2: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB
t1/s3: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB
Loading
Loading