diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index c47ac48fd146..a83b731c4c0a 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -95,7 +95,8 @@ type RaftInterface interface { // // When a follower transitions from {StateProbe,StateSnapshot} => // StateReplicate, we start trying to send MsgApps. We should - // notice such transitions both in HandleRaftEvent and SetReplicasLocked. + // notice such transitions both in HandleRaftEvent and + // SetReplicasRaftMuLocked. // // Requires Replica.raftMu to be held, Replica.mu is not held. FollowerStateRaftMuLocked(replicaID roachpb.ReplicaID) FollowerStateInfo @@ -455,6 +456,13 @@ func (rc *rangeController) CloseRaftMuLocked(ctx context.Context) { rc.mu.voterSets = nil close(rc.mu.waiterSetRefreshCh) rc.mu.waiterSetRefreshCh = nil + // Return any tracked token deductions, as we don't expect to receive more + // AdmittedVector updates. + for _, rs := range rc.replicaMap { + if rs.sendStream != nil { + rs.closeSendStream(ctx) + } + } } // InspectRaftMuLocked returns a handle containing the state of the range @@ -497,6 +505,11 @@ func (rc *rangeController) updateReplicaSet(ctx context.Context, newSet ReplicaS for r := range prevSet { desc, ok := newSet[r] if !ok { + if rs := rc.replicaMap[r]; rs.sendStream != nil { + // The replica is no longer part of the range, so we don't expect any + // tracked token deductions to be returned. Return them now. + rs.closeSendStream(ctx) + } delete(rc.replicaMap, r) } else { rs := rc.replicaMap[r] @@ -596,11 +609,9 @@ func NewReplicaState( sendTokenCounter: parent.opts.SSTokenCounter.Send(stream), desc: desc, } - state := parent.opts.RaftInterface.FollowerStateRaftMuLocked(desc.ReplicaID) - if state.State == tracker.StateReplicate { - rs.createReplicaSendStream() - } - + // Don't bother creating the replicaSendStream here. We will do this in + // the next Ready which will be called immediately after. This centralizes + // the logic of replicaSendStream creation. return rs } @@ -611,7 +622,7 @@ type replicaSendStream struct { syncutil.Mutex // connectedStateStart is the time when the connectedState was last // transitioned from one state to another e.g., from replicate to - // probeRecentlyReplicate or snapshot to replicate. + // probeRecentlyReplicate or vice versa. connectedState connectedState connectedStateStart time.Time tracker Tracker @@ -642,13 +653,8 @@ func (rs *replicaState) createReplicaSendStream() { } func (rs *replicaState) isStateReplicate() bool { - if rs.sendStream == nil { - return false - } - rs.sendStream.mu.Lock() - defer rs.sendStream.mu.Unlock() - - return rs.sendStream.mu.connectedState.shouldWaitForElasticEvalTokens() + // probeRecentlyReplicate is also included in this state. + return rs.sendStream != nil } type entryFCState struct { @@ -727,20 +733,7 @@ func (rs *replicaState) handleReadyState( // stream. should = true } else if state != probeRecentlyReplicate { - // This is the first time we've seen the replica change to StateProbe, - // update the connected state and start time. If the state doesn't - // change within probeRecentlyReplicateDuration, we will close the - // stream. Also schedule an event, so that even if there are no - // entries, we will still reliably close the stream if still in - // StateProbe. - // - // TODO(sumeer): think through whether we should actually be returning - // tokens immediately here. Currently we are not. e.g., - // probeRecentlyReplicate only affects whether to wait on this replica - // for eval, and otherwise it behaves like a closed replicaSendStream. - rs.sendStream.changeConnectedStateLocked(probeRecentlyReplicate, now) - rs.parent.opts.CloseTimerScheduler.ScheduleSendStreamCloseRaftMuLocked( - ctx, rs.parent.opts.RangeID, probeRecentlyReplicateDuration()) + rs.sendStream.changeToProbeLocked(ctx, now) } return should }(); shouldClose { @@ -758,41 +751,46 @@ func (rs *replicaState) handleReadyState( case tracker.StateSnapshot: if rs.sendStream != nil { - switch func() connectedState { - rs.sendStream.mu.Lock() - defer rs.sendStream.mu.Unlock() - return rs.sendStream.mu.connectedState - }() { - case replicate: - rs.sendStream.changeToStateSnapshot(ctx) - shouldWaitChange = true - case probeRecentlyReplicate: - rs.closeSendStream(ctx) - shouldWaitChange = true - case snapshot: - } + rs.closeSendStream(ctx) + shouldWaitChange = true } } return shouldWaitChange } +func (rss *replicaState) closeSendStream(ctx context.Context) { + rss.sendStream.mu.Lock() + defer rss.sendStream.mu.Unlock() + + rss.sendStream.closeLocked(ctx) + rss.sendStream = nil +} + func (rs *replicaState) admit(ctx context.Context, av AdmittedVector) { if rss := rs.sendStream; rss != nil { rss.admit(ctx, av) } } -func (rss *replicaState) closeSendStream(ctx context.Context) { - rss.sendStream.mu.Lock() - defer rss.sendStream.mu.Unlock() - - if rss.sendStream.mu.connectedState != snapshot { - // changeToStateSnapshot returns all tokens, as we have no liveness - // guarantee of their return with the send stream now closed. - rss.sendStream.changeToStateSnapshotLocked(ctx) - } - rss.sendStream.mu.closed = true - rss.sendStream = nil +func (rss *replicaSendStream) closeLocked(ctx context.Context) { + // Return all tokens. + rss.returnTokens(ctx, rss.mu.tracker.UntrackAll()) + rss.mu.closed = true +} + +func (rss *replicaSendStream) changeToProbeLocked(ctx context.Context, now time.Time) { + // This is the first time we've seen the replica change to StateProbe, + // update the connected state and start time. If the state doesn't + // change within probeRecentlyReplicateDuration, we will close the + // stream. Also schedule an event, so that even if there are no + // entries, we will still reliably close the stream if still in + // StateProbe. + rss.changeConnectedStateLocked(probeRecentlyReplicate, now) + rss.parent.parent.opts.CloseTimerScheduler.ScheduleSendStreamCloseRaftMuLocked( + ctx, rss.parent.parent.opts.RangeID, probeRecentlyReplicateDuration()) + // Return all tokens since other ranges may need them, and it may be some + // time before this replica transitions back to StateReplicate. + rss.returnTokens(ctx, rss.mu.tracker.UntrackAll()) } func (rss *replicaSendStream) makeConsistentInStateReplicate( @@ -816,35 +814,10 @@ func (rss *replicaSendStream) makeConsistentInStateReplicate( // mutex, but we expect transitions to replicate to be rarer than replicas // remaining in replicate. rss.changeConnectedStateLocked(replicate, rss.parent.parent.opts.Clock.PhysicalTime()) - case snapshot: - rss.changeConnectedStateLocked(replicate, rss.parent.parent.opts.Clock.PhysicalTime()) - shouldWaitChange = true } return shouldWaitChange } -// changeToStateSnapshot changes the connected state to snapshot and returns -// all tracked entries' tokens. -func (rss *replicaSendStream) changeToStateSnapshot(ctx context.Context) { - rss.mu.Lock() - defer rss.mu.Unlock() - - rss.changeToStateSnapshotLocked(ctx) -} - -// changeToStateSnapshot changes the connected state to snapshot and returns -// all tracked entries' tokens. -// -// Requires rs.mu to be held. -func (rss *replicaSendStream) changeToStateSnapshotLocked(ctx context.Context) { - rss.changeConnectedStateLocked(snapshot, rss.parent.parent.opts.Clock.PhysicalTime()) - // Since the replica is now in StateSnapshot, there is no need for Raft to - // send MsgApp pings to discover what has been missed. So there is no - // liveness guarantee on when these tokens will be returned, and therefore we - // return all tokens in the tracker. - rss.returnTokens(ctx, rss.mu.tracker.UntrackAll()) -} - // returnTokens takes the tokens untracked by the tracker and returns them to // the eval and send token counters. func (rss *replicaSendStream) returnTokens( @@ -897,21 +870,12 @@ type connectedState uint32 // latter. // // Initial states: replicate -// State transitions: -// -// replicate <=> {probeRecentlyReplicate, snapshot} -// snapshot => replicaSendStream closed (when observe StateProbe) -// probeRecentlyReplicate => replicaSendStream closed (after short delay) +// State transitions: replicate <=> probeRecentlyReplicate const ( replicate connectedState = iota probeRecentlyReplicate - snapshot ) -func (cs connectedState) shouldWaitForElasticEvalTokens() bool { - return cs == replicate || cs == probeRecentlyReplicate -} - func (cs connectedState) String() string { return redact.StringWithoutMarkers(cs) } @@ -923,8 +887,6 @@ func (cs connectedState) SafeFormat(w redact.SafePrinter, _ rune) { w.SafeString("replicate") case probeRecentlyReplicate: w.SafeString("probeRecentlyReplicate") - case snapshot: - w.SafeString("snapshot") default: panic(fmt.Sprintf("unknown connectedState %v", cs)) } diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go index 79d87d9a9ef1..94cf89307051 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go @@ -226,7 +226,7 @@ func (s *testingRCState) maybeSetInitialTokens(r testingRange) { } } -func (s *testingRCState) getOrInitRange(r testingRange) *testingRCRange { +func (s *testingRCState) getOrInitRange(t *testing.T, r testingRange) *testingRCRange { testRC, ok := s.ranges[r.rangeID] if !ok { testRC = &testingRCRange{} @@ -251,6 +251,9 @@ func (s *testingRCState) getOrInitRange(r testingRange) *testingRCRange { s.ranges[r.rangeID] = testRC } s.maybeSetInitialTokens(r) + // Send through an empty raft event to trigger creating necessary replica + // send streams for the range. + require.NoError(t, testRC.rc.HandleRaftEventRaftMuLocked(s.testCtx, RaftEvent{})) return testRC } @@ -658,7 +661,7 @@ func TestRangeController(t *testing.T) { } for _, r := range scanRanges(t, d.Input) { - state.getOrInitRange(r) + state.getOrInitRange(t, r) } return state.rangeStateString() + state.tokenCountsString() @@ -731,7 +734,7 @@ func TestRangeController(t *testing.T) { case "set_replicas": for _, r := range scanRanges(t, d.Input) { - testRC := state.getOrInitRange(r) + testRC := state.getOrInitRange(t, r) func() { testRC.mu.Lock() defer testRC.mu.Unlock() diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/close b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/close new file mode 100644 index 000000000000..d60d98d723c9 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/close @@ -0,0 +1,112 @@ +# This test asserts that closing a range controller, or switching a replica +# state to StateSnapshot causes all tracked token deductions to be returned to +# the underlying stream token counts. +init +range_id=1 tenant_id=1 local_replica_id=1 + store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate + store_id=2 replica_id=2 type=VOTER_FULL state=StateReplicate + store_id=3 replica_id=3 type=VOTER_FULL state=StateReplicate +---- +r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3] +t1/s1: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB +t1/s2: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB +t1/s3: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB + +# There should be no tracked entries for the range. +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false +++++ +(n2,s2):2: state=replicate closed=false +++++ +(n3,s3):3: state=replicate closed=false +++++ + + +# Submit a few entries to the range. We will partially admit these on different +# replica stores before closing streams, and ultimately the range controller. +raft_event +range_id=1 + term=1 index=1 pri=NormalPri size=1MiB + term=1 index=2 pri=NormalPri size=1MiB + term=1 index=3 pri=NormalPri size=1MiB +---- +t1/s1: reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB +t1/s2: reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB +t1/s3: reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB + +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false +NormalPri: + term=1 index=1 tokens=1048576 + term=1 index=2 tokens=1048576 + term=1 index=3 tokens=1048576 +++++ +(n2,s2):2: state=replicate closed=false +NormalPri: + term=1 index=1 tokens=1048576 + term=1 index=2 tokens=1048576 + term=1 index=3 tokens=1048576 +++++ +(n3,s3):3: state=replicate closed=false +NormalPri: + term=1 index=1 tokens=1048576 + term=1 index=2 tokens=1048576 + term=1 index=3 tokens=1048576 +++++ + +admit +range_id=1 + store_id=1 term=1 to_index=2 pri=NormalPri + store_id=2 term=1 to_index=1 pri=NormalPri +---- +t1/s1: reg=+15 MiB/+16 MiB ela=+7.0 MiB/+8.0 MiB +t1/s2: reg=+14 MiB/+16 MiB ela=+6.0 MiB/+8.0 MiB +t1/s3: reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB + +# Change s3 to StateSnapshot. This should close the send stream and return the +# tracked deductions for entries in [1,3]. +set_replicas +range_id=1 tenant_id=1 local_replica_id=1 + store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate + store_id=2 replica_id=2 type=VOTER_FULL state=StateReplicate + store_id=3 replica_id=3 type=VOTER_FULL state=StateSnapshot +---- +r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3] + +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false +NormalPri: + term=1 index=3 tokens=1048576 +++++ +(n2,s2):2: state=replicate closed=false +NormalPri: + term=1 index=2 tokens=1048576 + term=1 index=3 tokens=1048576 +++++ +(n3,s3):3: closed + +# The tokens should be returned as well, s3 should have no outstanding tokens. +adjust_tokens + store_id=1 pri=HighPri tokens=0 +---- +t1/s1: reg=+15 MiB/+16 MiB ela=+7.0 MiB/+8.0 MiB +t1/s2: reg=+14 MiB/+16 MiB ela=+6.0 MiB/+8.0 MiB +t1/s3: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB + +# Close the range controller. This should return the tracked deductions for +# entry 3 on s1 and [2,3] on s2. +close_rcs +---- +range_id=1 tenant_id={1} local_replica_id=1 + +# Print out the available token state. There should be no outstanding tokens. +# Send a noop token adjustment to get the state. +adjust_tokens + store_id=1 pri=HighPri tokens=0 +---- +t1/s1: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB +t1/s2: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB +t1/s3: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/handle_raft_event b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/handle_raft_event index 2bf6aefdb8bf..074d8375c76b 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/handle_raft_event +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/handle_raft_event @@ -95,7 +95,8 @@ range_id=1 tenant_id=1 local_replica_id=1 r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3] # Tick the clock by less than the probe to close delay, the stream should still -# be open in state probeRecentlyReplicate. +# be open in state probeRecentlyReplicate but all deducted tokens should be +# returned. tick duration=500ms ---- now=500ms @@ -107,10 +108,6 @@ stream_state range_id=1 (n2,s2):2: state=replicate closed=false ++++ (n3,s3):3: state=probeRecentlyReplicate closed=false -NormalPri: - term=1 index=1 tokens=1048576 - term=1 index=2 tokens=1048576 - term=1 index=3 tokens=1048576 ++++ # Tick the clock by the remaining probe to close delay, the stream should now @@ -176,7 +173,7 @@ range_id=1 term=1 index=7 pri=NormalPri size=1MiB ---- t1/s1: reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB -t1/s2: reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB +t1/s2: reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB t1/s3: reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB # The operation should now be done and have waited for s1 and s3. diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/inspect b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/inspect index 854868c97efc..20283d496268 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/inspect +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/inspect @@ -311,20 +311,6 @@ inspect range_id=1 } } ] - }, - { - "stream": { - "tenant_id": { - "id": "1" - }, - "store_id": 3, - "available_eval_regular_tokens": "16777216", - "available_eval_elastic_tokens": "8388608", - "available_send_regular_tokens": "16777216", - "available_send_elastic_tokens": "8388608" - }, - "tracked_deductions": [ - ] } ] } @@ -396,20 +382,6 @@ inspect range_id=1 } } ] - }, - { - "stream": { - "tenant_id": { - "id": "1" - }, - "store_id": 3, - "available_eval_regular_tokens": "16777216", - "available_eval_elastic_tokens": "8388608", - "available_send_regular_tokens": "16777216", - "available_send_elastic_tokens": "8388608" - }, - "tracked_deductions": [ - ] } ] }