diff --git a/pkg/kv/kvserver/flow_control_raft_transport_test.go b/pkg/kv/kvserver/flow_control_raft_transport_test.go index e03bf17e3ed0..b41113a7ee7c 100644 --- a/pkg/kv/kvserver/flow_control_raft_transport_test.go +++ b/pkg/kv/kvserver/flow_control_raft_transport_test.go @@ -681,8 +681,11 @@ func TestFlowControlRaftTransportV2(t *testing.T) { toNodeID := parseNodeID(t, d, "node") toStoreID := parseStoreID(t, d, "store") rangeID := parseRangeID(t, d, "range") - control.piggybacker.AddMsgAppRespForLeader( - toNodeID, toStoreID, rangeID, raftpb.Message{}) + // TODO(pav-kv): test that these messages are actually sent in + // RaftMessageRequestBatch. + control.piggybacker.Add(toNodeID, kvflowcontrolpb.PiggybackedAdmittedState{ + RangeID: rangeID, ToStoreID: toStoreID, + }) return "" case "fallback-piggyback": diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.go index af6323704bbb..77a185202dd2 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.go @@ -61,3 +61,20 @@ func (a AdmittedResponseForRange) String() string { func (a AdmittedResponseForRange) SafeFormat(w redact.SafePrinter, _ rune) { w.Printf("admitted-response (s%s r%s %s)", a.LeaderStoreID, a.RangeID, a.Msg.String()) } + +func (a AdmittedState) String() string { + return redact.StringWithoutMarkers(a) +} + +func (a AdmittedState) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("admitted=t%d/%s", a.Term, a.Admitted) +} + +func (a PiggybackedAdmittedState) String() string { + return redact.StringWithoutMarkers(a) +} + +func (a PiggybackedAdmittedState) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("[r%s,s%s,%d->%d] %s", + a.RangeID, a.ToStoreID, a.FromReplicaID, a.ToReplicaID, a.Admitted.String()) +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto index 3291e73584f8..e6f41eb239cc 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto @@ -133,6 +133,8 @@ message RaftLogPosition { // AdmittedResponseForRange is only used in RACv2. It contains a MsgAppResp // from a follower to a leader, that was generated to advance the admitted // vector for that follower, maintained by the leader. +// +// TODO(pav-kv): remove this type and use PiggybackedAdmittedState. message AdmittedResponseForRange { option (gogoproto.goproto_stringer) = false; @@ -149,3 +151,44 @@ message AdmittedResponseForRange { // Msg is the MsgAppResp containing the admitted vector. raftpb.Message msg = 3 [(gogoproto.nullable) = false]; } + +// AdmittedState communicates a replica's vector of admitted log indices at +// different priorities to the leader of a range. +// +// Used only in RACv2. +message AdmittedState { + option (gogoproto.goproto_stringer) = false; + // Term is the leader term of the log for which the Admitted indices were + // computed. The indices are consistent with this leader's log. + uint64 term = 1; + // Admitted contains admitted log indices for each priority < NumPriorities. + repeated uint64 admitted = 2; +} + +// PiggybackedAdmittedState wraps the AdmittedState with the routing information +// needed to deliver the admitted vector to a particular leader replica, and for +// it to know who sent it. +// +// Used only in RACv2. +message PiggybackedAdmittedState { + option (gogoproto.goproto_stringer) = false; + + // RangeID is the ID of the range to which this message is related. Used for + // routing at the leader node. + uint64 range_id = 1 [(gogoproto.customname) = "RangeID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeID"]; + // ToStoreID is the store at the leader containing the leader replica. Used + // for routing at the leader node. + uint64 to_store_id = 2 [(gogoproto.customname) = "ToStoreID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.StoreID"]; + + // FromReplicaID is the replica sending this message. + uint64 from_replica_id = 3 [(gogoproto.customname) = "FromReplicaID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.ReplicaID"]; + // ToReplicaID is the leader replica receiving this message. + uint64 to_replica_id = 4 [(gogoproto.customname) = "ToReplicaID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.ReplicaID"]; + + // Admitted is the admitted vector at the sending replica. + AdmittedState admitted = 5 [(gogoproto.nullable) = false]; +} diff --git a/pkg/kv/kvserver/kvflowcontrol/node_rac2/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/node_rac2/BUILD.bazel index dd51ab7702d8..2700c4070a7a 100644 --- a/pkg/kv/kvserver/kvflowcontrol/node_rac2/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/node_rac2/BUILD.bazel @@ -11,7 +11,6 @@ go_library( deps = [ "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/kv/kvserver/kvflowcontrol/replica_rac2", - "//pkg/raft/raftpb", "//pkg/roachpb", "//pkg/util/syncutil", ], @@ -24,7 +23,6 @@ go_test( embed = [":node_rac2"], deps = [ "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", - "//pkg/raft/raftpb", "//pkg/roachpb", "//pkg/testutils/datapathutils", "//pkg/util/leaktest", diff --git a/pkg/kv/kvserver/kvflowcontrol/node_rac2/admitted_piggybacker.go b/pkg/kv/kvserver/kvflowcontrol/node_rac2/admitted_piggybacker.go index 4c443bab6c4d..580b6dd8b43a 100644 --- a/pkg/kv/kvserver/kvflowcontrol/node_rac2/admitted_piggybacker.go +++ b/pkg/kv/kvserver/kvflowcontrol/node_rac2/admitted_piggybacker.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2" - "github.com/cockroachdb/cockroach/pkg/raft/raftpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) @@ -29,7 +28,7 @@ type PiggybackMsgReader interface { // least one message will be popped. PopMsgsForNode( now time.Time, nodeID roachpb.NodeID, maxBytes int64, - ) (msgs []kvflowcontrolpb.AdmittedResponseForRange, remainingMsgs int) + ) (_ []kvflowcontrolpb.PiggybackedAdmittedState, remainingMsgs int) // NodesWithMsgs is used to periodically drop msgs from disconnected nodes. // See RaftTransport.dropFlowTokensForDisconnectedNodes. NodesWithMsgs(now time.Time) []roachpb.NodeID @@ -46,7 +45,7 @@ type AdmittedPiggybacker struct { } type rangeMap struct { - rangeMap map[roachpb.RangeID]kvflowcontrolpb.AdmittedResponseForRange + rangeMap map[roachpb.RangeID]kvflowcontrolpb.PiggybackedAdmittedState transitionToEmptyTime time.Time } @@ -59,32 +58,28 @@ func NewAdmittedPiggybacker() *AdmittedPiggybacker { var _ PiggybackMsgReader = &AdmittedPiggybacker{} var _ replica_rac2.AdmittedPiggybacker = &AdmittedPiggybacker{} -// AddMsgAppRespForLeader implements replica_rac2.AdmittedPiggybacker. -func (ap *AdmittedPiggybacker) AddMsgAppRespForLeader( - nodeID roachpb.NodeID, storeID roachpb.StoreID, rangeID roachpb.RangeID, msg raftpb.Message, +// Add implements replica_rac2.AdmittedPiggybacker. +func (ap *AdmittedPiggybacker) Add( + nodeID roachpb.NodeID, msg kvflowcontrolpb.PiggybackedAdmittedState, ) { ap.mu.Lock() defer ap.mu.Unlock() rm, ok := ap.mu.msgsForNode[nodeID] if !ok { - rm = &rangeMap{rangeMap: map[roachpb.RangeID]kvflowcontrolpb.AdmittedResponseForRange{}} + rm = &rangeMap{rangeMap: map[roachpb.RangeID]kvflowcontrolpb.PiggybackedAdmittedState{}} ap.mu.msgsForNode[nodeID] = rm } - rm.rangeMap[rangeID] = kvflowcontrolpb.AdmittedResponseForRange{ - LeaderStoreID: storeID, - RangeID: rangeID, - Msg: msg, - } + rm.rangeMap[msg.RangeID] = msg } -// Made-up number. There are 10+ integers, all varint encoded, many of which +// Made-up number. There are < 10 integers, all varint encoded, many of which // like nodeID, storeID, replicaIDs etc. will be small. -const admittedForRangeRACv2SizeBytes = 50 +const admittedForRangeRACv2SizeBytes = 40 // PopMsgsForNode implements PiggybackMsgReader. func (ap *AdmittedPiggybacker) PopMsgsForNode( now time.Time, nodeID roachpb.NodeID, maxBytes int64, -) (msgs []kvflowcontrolpb.AdmittedResponseForRange, remainingMsgs int) { +) (_ []kvflowcontrolpb.PiggybackedAdmittedState, remainingMsgs int) { if ap == nil { return nil, 0 } @@ -94,13 +89,16 @@ func (ap *AdmittedPiggybacker) PopMsgsForNode( if !ok || len(rm.rangeMap) == 0 { return nil, 0 } - maxEntries := maxBytes / admittedForRangeRACv2SizeBytes + // NB: +1 to include at least one entry. + maxEntries := maxBytes/admittedForRangeRACv2SizeBytes + 1 + msgs := make([]kvflowcontrolpb.PiggybackedAdmittedState, 0, + min(int64(len(rm.rangeMap)), maxEntries)) for rangeID, msg := range rm.rangeMap { - msgs = append(msgs, msg) - delete(rm.rangeMap, rangeID) - if int64(len(msgs)) > maxEntries { + if len(msgs) == cap(msgs) { break } + msgs = append(msgs, msg) + delete(rm.rangeMap, rangeID) } n := len(rm.rangeMap) if n == 0 { diff --git a/pkg/kv/kvserver/kvflowcontrol/node_rac2/admitted_piggybacker_test.go b/pkg/kv/kvserver/kvflowcontrol/node_rac2/admitted_piggybacker_test.go index 19beb0d9b48f..7aa9ada90306 100644 --- a/pkg/kv/kvserver/kvflowcontrol/node_rac2/admitted_piggybacker_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/node_rac2/admitted_piggybacker_test.go @@ -20,7 +20,6 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" - "github.com/cockroachdb/cockroach/pkg/raft/raftpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -38,14 +37,20 @@ func TestPiggybacker(t *testing.T) { func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "add": - var nodeID, storeID, rangeID, match int + var nodeID, storeID, rangeID, from, to, term int d.ScanArgs(t, "node-id", &nodeID) d.ScanArgs(t, "store-id", &storeID) d.ScanArgs(t, "range-id", &rangeID) - // Match is just a placeholder to differentiate messages in the test. - d.ScanArgs(t, "match", &match) - p.AddMsgAppRespForLeader(roachpb.NodeID(nodeID), roachpb.StoreID(storeID), - roachpb.RangeID(rangeID), raftpb.Message{Match: uint64(match)}) + d.ScanArgs(t, "from", &from) + d.ScanArgs(t, "to", &to) + d.ScanArgs(t, "term", &term) + p.Add(roachpb.NodeID(nodeID), kvflowcontrolpb.PiggybackedAdmittedState{ + RangeID: roachpb.RangeID(rangeID), + ToStoreID: roachpb.StoreID(storeID), + FromReplicaID: roachpb.ReplicaID(from), + ToReplicaID: roachpb.ReplicaID(to), + Admitted: kvflowcontrolpb.AdmittedState{Term: uint64(term)}, + }) return "" case "nodes-with-msgs": @@ -71,13 +76,13 @@ func TestPiggybacker(t *testing.T) { var nodeID int d.ScanArgs(t, "node-id", &nodeID) msgs, remaining := p.PopMsgsForNode(ts, roachpb.NodeID(nodeID), math.MaxInt64) - slices.SortFunc(msgs, func(a, b kvflowcontrolpb.AdmittedResponseForRange) int { + slices.SortFunc(msgs, func(a, b kvflowcontrolpb.PiggybackedAdmittedState) int { return cmp.Compare(a.RangeID, b.RangeID) }) var b strings.Builder fmt.Fprintf(&b, "msgs:\n") for _, msg := range msgs { - fmt.Fprintf(&b, "s%s, r%s, match=%d\n", msg.LeaderStoreID, msg.RangeID, msg.Msg.Match) + fmt.Fprintf(&b, "%s\n", msg) } fmt.Fprintf(&b, "remaining-msgs: %d\n", remaining) return b.String() @@ -100,15 +105,15 @@ func TestPiggybackerMaxBytes(t *testing.T) { defer log.Scope(t).Close(t) p := NewAdmittedPiggybacker() - p.AddMsgAppRespForLeader(1, 1, 1, raftpb.Message{}) - p.AddMsgAppRespForLeader(1, 1, 2, raftpb.Message{}) + p.Add(1, kvflowcontrolpb.PiggybackedAdmittedState{RangeID: 1, ToStoreID: 1}) + p.Add(1, kvflowcontrolpb.PiggybackedAdmittedState{RangeID: 2, ToStoreID: 1}) // Both are popped. msgs, remaining := p.PopMsgsForNode(time.UnixMilli(1), 1, 60) require.Equal(t, 2, len(msgs)) require.Equal(t, 0, remaining) - p.AddMsgAppRespForLeader(1, 1, 1, raftpb.Message{}) - p.AddMsgAppRespForLeader(1, 1, 2, raftpb.Message{}) + p.Add(1, kvflowcontrolpb.PiggybackedAdmittedState{RangeID: 1, ToStoreID: 1}) + p.Add(1, kvflowcontrolpb.PiggybackedAdmittedState{RangeID: 2, ToStoreID: 1}) // Only one is popped. msgs, remaining = p.PopMsgsForNode(time.UnixMilli(1), 1, 20) require.Equal(t, 1, len(msgs)) diff --git a/pkg/kv/kvserver/kvflowcontrol/node_rac2/testdata/piggybacker b/pkg/kv/kvserver/kvflowcontrol/node_rac2/testdata/piggybacker index cf7004e8e4cf..f3bc91adf162 100644 --- a/pkg/kv/kvserver/kvflowcontrol/node_rac2/testdata/piggybacker +++ b/pkg/kv/kvserver/kvflowcontrol/node_rac2/testdata/piggybacker @@ -10,11 +10,11 @@ msgs: remaining-msgs: 0 # Add for node 1. -add node-id=1 store-id=2 range-id=3 match=6 +add node-id=1 store-id=2 range-id=3 from=2 to=1 term=6 ---- # Add for node 11. -add node-id=11 store-id=12 range-id=13 match=14 +add node-id=11 store-id=12 range-id=13 from=3 to=1 term=14 ---- nodes-with-msgs time-sec=2 @@ -23,15 +23,15 @@ n1 n11 map len: 2 # Add another for node 11, for a different range. -add node-id=11 store-id=22 range-id=23 match=24 +add node-id=11 store-id=22 range-id=23 from=2 to=1 term=24 ---- # Pop both for node 11. pop node-id=11 time-sec=2 ---- msgs: -s12, r13, match=14 -s22, r23, match=24 +[r13,s12,3->1] admitted=t14/[] +[r23,s22,2->1] admitted=t24/[] remaining-msgs: 0 # There is still an empty map entry for node 11. @@ -47,14 +47,14 @@ n1 map len: 1 # Overwrite the msg for the range at node 1. -add node-id=1 store-id=2 range-id=3 match=7 +add node-id=1 store-id=2 range-id=3 from=2 to=1 term=25 ---- # Pop for node 1. There was only one msg. pop node-id=1 time-sec=64 ---- msgs: -s2, r3, match=7 +[r3,s2,2->1] admitted=t25/[] remaining-msgs: 0 # The map entry for node 1 is garbage collected. diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel index 045c0cc95bf8..2187b2a9c063 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel @@ -12,6 +12,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/kv/kvserver/kvflowcontrol", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/kv/kvserver/kvflowcontrol/rac2", "//pkg/kv/kvserver/raftlog", "//pkg/raft/raftpb", diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index b1ed8db41735..97de3ebeb341 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -16,6 +16,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" @@ -132,13 +133,13 @@ type RaftNode interface { StepMsgAppRespForAdmittedLocked(raftpb.Message) error } -// AdmittedPiggybacker is used to enqueue MsgAppResp messages whose purpose is -// to advance Admitted. For efficiency, these need to be piggybacked on other -// messages being sent to the given leader node. The StoreID and RangeID are -// provided so that the leader node can route the incoming message to the -// relevant range. +// AdmittedPiggybacker is used to enqueue admitted vector messages addressed to +// replicas on a particular node. For efficiency, these need to be piggybacked +// on other messages being sent to the given leader node. The store / range / +// replica IDs are provided so that the leader node can route the incoming +// message to the relevant range. type AdmittedPiggybacker interface { - AddMsgAppRespForLeader(roachpb.NodeID, roachpb.StoreID, roachpb.RangeID, raftpb.Message) + Add(roachpb.NodeID, kvflowcontrolpb.PiggybackedAdmittedState) } // EntryForAdmission is the information provided to the admission control (AC) @@ -750,8 +751,14 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2. p.opts.Replica.MuUnlock() if p.mu.leader.rc == nil && p.mu.leaderNodeID != 0 { // Follower, and know leaderNodeID, leaderStoreID. - p.opts.AdmittedPiggybacker.AddMsgAppRespForLeader( - p.mu.leaderNodeID, p.mu.leaderStoreID, p.opts.RangeID, msgResp) + // TODO(pav-kv): populate the message correctly. + p.opts.AdmittedPiggybacker.Add(p.mu.leaderNodeID, kvflowcontrolpb.PiggybackedAdmittedState{ + RangeID: p.opts.RangeID, + ToStoreID: p.mu.leaderStoreID, + FromReplicaID: p.opts.ReplicaID, + ToReplicaID: roachpb.ReplicaID(msgResp.To), + Admitted: kvflowcontrolpb.AdmittedState{}, + }) } // Else if the local replica is the leader, we have already told it // about the update by calling SetAdmittedLocked. If the leader is not diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go index 62386ec3fe52..0a26461de784 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go @@ -172,11 +172,10 @@ type testAdmittedPiggybacker struct { b *strings.Builder } -func (p *testAdmittedPiggybacker) AddMsgAppRespForLeader( - n roachpb.NodeID, s roachpb.StoreID, r roachpb.RangeID, msg raftpb.Message, +func (p *testAdmittedPiggybacker) Add( + n roachpb.NodeID, m kvflowcontrolpb.PiggybackedAdmittedState, ) { - fmt.Fprintf(p.b, " Piggybacker.AddMsgAppRespForLeader(leader=(n%s,s%s,r%s), msg=%s)\n", - n, s, r, msgString(msg)) + fmt.Fprintf(p.b, " Piggybacker.Add(n%s, %s)\n", n, m) } type testACWorkQueue struct { diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor index 2932ac013f64..3622bc89aa1b 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor @@ -131,7 +131,7 @@ HandleRaftReady: Replica.MuLock RaftNode.SetAdmittedLocked([24, 25, 25, 25]) = type: MsgAppResp from: 0 to: 0 Replica.MuUnlock - Piggybacker.AddMsgAppRespForLeader(leader=(n11,s11,r3), msg=type: MsgAppResp from: 0 to: 0) + Piggybacker.Add(n11, [r3,s11,5->0] admitted=t0/[]) ..... # Side channel for entries [26, 26] with no low-pri override. @@ -197,7 +197,7 @@ HandleRaftReady: Replica.MuLock RaftNode.SetAdmittedLocked([24, 26, 25, 26]) = type: MsgAppResp from: 0 to: 0 Replica.MuUnlock - Piggybacker.AddMsgAppRespForLeader(leader=(n11,s11,r3), msg=type: MsgAppResp from: 0 to: 0) + Piggybacker.Add(n11, [r3,s11,5->0] admitted=t0/[]) ..... # Callback is accurate and index 25 is admitted. @@ -220,7 +220,7 @@ HandleRaftReady: Replica.MuLock RaftNode.SetAdmittedLocked([26, 26, 25, 26]) = type: MsgAppResp from: 0 to: 0 Replica.MuUnlock - Piggybacker.AddMsgAppRespForLeader(leader=(n11,s11,r3), msg=type: MsgAppResp from: 0 to: 0) + Piggybacker.Add(n11, [r3,s11,5->0] admitted=t0/[]) ..... # Side channel for entries [27,27] indicate a low-pri override. @@ -270,7 +270,7 @@ HandleRaftReady: Replica.MuLock RaftNode.SetAdmittedLocked([26, 26, 26, 26]) = type: MsgAppResp from: 0 to: 0 Replica.MuUnlock - Piggybacker.AddMsgAppRespForLeader(leader=(n11,s11,r3), msg=type: MsgAppResp from: 0 to: 0) + Piggybacker.Add(n11, [r3,s11,5->0] admitted=t0/[]) ..... # index 27 is still waiting for admission, but we switch to a new leader that @@ -340,7 +340,7 @@ HandleRaftReady: Replica.MuLock RaftNode.SetAdmittedLocked([27, 27, 27, 27]) = type: MsgAppResp from: 0 to: 0 Replica.MuUnlock - Piggybacker.AddMsgAppRespForLeader(leader=(n11,s11,r3), msg=type: MsgAppResp from: 0 to: 0) + Piggybacker.Add(n11, [r3,s11,5->0] admitted=t0/[]) ..... # Noop, since not the leader. diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index 761821b6bc38..23e9d262c1b0 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -104,10 +104,15 @@ message RaftMessageRequest { // priority of the Entries in the Message are overridden to be // raftpb.LowPri. bool low_priority_override = 13; + // AdmittedState annotates a MsgAppResp message with a vector of admitted log + // indices. Used only with RACv2. + kv.kvserver.kvflowcontrol.kvflowcontrolpb.AdmittedState admitted_state = 14 [(gogoproto.nullable) = false]; // AdmittedResponse is used in RACv2, for piggybacking MsgAppResp messages // from a follower to a leader, that advance admitted for a follower. - repeated kv.kvserver.kvflowcontrol.kvflowcontrolpb.AdmittedResponseForRange admitted_response = 14 [(gogoproto.nullable) = false]; + // + // TODO(pav-kv): remove. + repeated kv.kvserver.kvflowcontrol.kvflowcontrolpb.AdmittedResponseForRange admitted_response = 15 [(gogoproto.nullable) = false]; reserved 10; } @@ -153,6 +158,10 @@ message RaftMessageRequestBatch { // provide a disjointness guarantee to leader leases. util.hlc.Timestamp now = 3 [(gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.ClockTimestamp"]; + + // AdmittedStates contains admitted vector messages addressed to replicas + // located on the receiver node of this batch. + repeated kv.kvserver.kvflowcontrol.kvflowcontrolpb.PiggybackedAdmittedState admitted_states = 4 [(gogoproto.nullable) = false]; } message RaftMessageResponseUnion { diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 9c1b478f8228..5155aa8f8152 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -453,16 +453,6 @@ func (t *RaftTransport) handleRaftRequest( log.Infof(ctx, "informed of below-raft %s", admittedEntries) } } - if len(req.AdmittedResponse) > 0 { - // NB: we do this via this special path instead of using the - // incomingMessageHandler since we don't have a full-fledged - // RaftMessageRequest for each range (each of these responses could be for - // a different range), and because what we need to do wrt queueing is much - // simpler (we don't need to worry about queue size since we only keep the - // latest message from each replica). - t.kvflowcontrol2.piggybackedResponseScheduler.ScheduleAdmittedResponseForRangeRACv2( - ctx, req.AdmittedResponse) - } if req.ToReplica.StoreID == roachpb.StoreID(0) && len(req.AdmittedRaftLogEntries) > 0 { // The fallback token dispatch mechanism does not specify a destination // replica, and as such, there's no handler for it. We don't want to @@ -543,6 +533,16 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer t.kvflowControl.mu.Lock() t.kvflowControl.mu.connectionTracker.markStoresConnected(storeIDs) t.kvflowControl.mu.Unlock() + if len(batch.AdmittedStates) != 0 { + // TODO(pav-kv): dispatch admitted vectors to RACv2. + // NB: we do this via this special path instead of using the + // handleRaftRequest path since we don't have a full-fledged + // RaftMessageRequest for each range (each of these responses could + // be for a different range), and because what we need to do w.r.t. + // queueing is much simpler (we don't need to worry about queue size + // since we only keep the latest message from each replica). + _ = t.kvflowcontrol2.piggybackedResponseScheduler.ScheduleAdmittedResponseForRangeRACv2 + } if len(batch.Requests) == 0 { continue } @@ -745,9 +745,14 @@ func (t *RaftTransport) processQueue( } // For replication admission control v2. - maybeAnnotateWithAdmittedResponses := func( - req *kvserverpb.RaftMessageRequest, admitted []kvflowcontrolpb.AdmittedResponseForRange) { - req.AdmittedResponse = append(req.AdmittedResponse, admitted...) + maybeAnnotateWithAdmittedStates := func( + batch *kvserverpb.RaftMessageRequestBatch, admitted []kvflowcontrolpb.PiggybackedAdmittedState, + ) { + // TODO(pav-kv): send these protos once they are populated correctly. + if true { + return + } + batch.AdmittedStates = append(batch.AdmittedStates, admitted...) } annotateWithClockTimestamp := func(batch *kvserverpb.RaftMessageRequestBatch) { @@ -763,6 +768,10 @@ func (t *RaftTransport) processQueue( batch.Requests = batch.Requests[:0] batch.StoreIDs = nil batch.Now = hlc.ClockTimestamp{} + for i := range batch.AdmittedStates { + batch.AdmittedStates[i] = kvflowcontrolpb.PiggybackedAdmittedState{} + } + batch.AdmittedStates = batch.AdmittedStates[:0] } var raftIdleTimer timeutil.Timer @@ -802,7 +811,7 @@ func (t *RaftTransport) processQueue( budget := targetRaftOutgoingBatchSize.Get(&t.st.SV) - size var pendingDispatches []kvflowcontrolpb.AdmittedRaftLogEntries - var admittedResponses []kvflowcontrolpb.AdmittedResponseForRange + var admittedStates []kvflowcontrolpb.PiggybackedAdmittedState if disableFn := t.knobs.DisablePiggyBackedFlowTokenDispatch; disableFn == nil || !disableFn() { // RACv1. // @@ -826,9 +835,9 @@ func (t *RaftTransport) processQueue( maybeAnnotateWithAdmittedRaftLogEntries(req, pendingDispatches) // RACv2. - admittedResponses, _ = t.kvflowcontrol2.piggybackReader.PopMsgsForNode( + admittedStates, _ = t.kvflowcontrol2.piggybackReader.PopMsgsForNode( timeutil.Now(), q.nodeID, kvadmission.FlowTokenDispatchMaxBytes.Get(&t.st.SV)) - maybeAnnotateWithAdmittedResponses(req, admittedResponses) + maybeAnnotateWithAdmittedStates(batch, admittedStates) } batch.Requests = append(batch.Requests, *req) @@ -850,9 +859,10 @@ func (t *RaftTransport) processQueue( maybeAnnotateWithStoreIDs(batch) annotateWithClockTimestamp(batch) + if err := stream.Send(batch); err != nil { t.metrics.FlowTokenDispatchesDropped.Inc(int64( - len(pendingDispatches) + len(admittedResponses))) + len(pendingDispatches) + len(admittedStates))) return err } t.metrics.MessagesSent.Inc(int64(len(batch.Requests))) @@ -872,9 +882,9 @@ func (t *RaftTransport) processQueue( kvadmission.FlowTokenDispatchMaxBytes.Get(&t.st.SV), ) // RACv2. - admittedResponses, remainingAdmittedResponses := t.kvflowcontrol2.piggybackReader.PopMsgsForNode( + admittedStates, remainingAdmittedResponses := t.kvflowcontrol2.piggybackReader.PopMsgsForNode( timeutil.Now(), q.nodeID, kvadmission.FlowTokenDispatchMaxBytes.Get(&t.st.SV)) - if len(pendingDispatches) == 0 && len(admittedResponses) == 0 { + if len(pendingDispatches) == 0 && len(admittedStates) == 0 { continue // nothing to do } // If there are remaining dispatches/responses, schedule them @@ -885,15 +895,16 @@ func (t *RaftTransport) processQueue( req := newRaftMessageRequest() maybeAnnotateWithAdmittedRaftLogEntries(req, pendingDispatches) - maybeAnnotateWithAdmittedResponses(req, admittedResponses) batch.Requests = append(batch.Requests, *req) releaseRaftMessageRequest(req) maybeAnnotateWithStoreIDs(batch) annotateWithClockTimestamp(batch) + maybeAnnotateWithAdmittedStates(batch, admittedStates) + if err := stream.Send(batch); err != nil { t.metrics.FlowTokenDispatchesDropped.Inc(int64( - len(pendingDispatches) + len(admittedResponses))) + len(pendingDispatches) + len(admittedStates))) return err } t.metrics.MessagesSent.Inc(int64(len(batch.Requests))) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index c0551cb5d0cf..2e71729f6ff4 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -651,6 +651,11 @@ func (r *Replica) stepRaftGroupRaftMuLocked(req *kvserverpb.RaftMessageRequest) LowPriOverride: req.LowPriorityOverride, } } + case raftpb.MsgAppResp: + if req.AdmittedState.Term != 0 { + // TODO(pav-kv): dispatch admitted vector to RACv2 if one is attached. + _ = 0 + } } err := raftGroup.Step(req.Message) if errors.Is(err, raft.ErrProposalDropped) {