Skip to content

Commit

Permalink
Merge #130074
Browse files Browse the repository at this point in the history
130074: replica_rac2: add admitted state protocol r=sumeerbhola a=pav-kv

This PR introduces `AdmittedState` and `PiggybackedAdmittedState` protos, and converts replica- and leader-side code (such as `AdmittedPiggybacker` and raft transport) to use them.

The protos are not populated at the moment, and the leader-side code is a bunch of TODOs. More conversion code will follow, to replace the `AdmittedResponseForRange` flows.

Part of #129508

Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
craig[bot] and pav-kv committed Sep 4, 2024
2 parents 4e28f28 + 60ecf7b commit 6c51566
Show file tree
Hide file tree
Showing 14 changed files with 177 additions and 81 deletions.
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/flow_control_raft_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,8 +681,11 @@ func TestFlowControlRaftTransportV2(t *testing.T) {
toNodeID := parseNodeID(t, d, "node")
toStoreID := parseStoreID(t, d, "store")
rangeID := parseRangeID(t, d, "range")
control.piggybacker.AddMsgAppRespForLeader(
toNodeID, toStoreID, rangeID, raftpb.Message{})
// TODO(pav-kv): test that these messages are actually sent in
// RaftMessageRequestBatch.
control.piggybacker.Add(toNodeID, kvflowcontrolpb.PiggybackedAdmittedState{
RangeID: rangeID, ToStoreID: toStoreID,
})
return ""

case "fallback-piggyback":
Expand Down
17 changes: 17 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,20 @@ func (a AdmittedResponseForRange) String() string {
func (a AdmittedResponseForRange) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("admitted-response (s%s r%s %s)", a.LeaderStoreID, a.RangeID, a.Msg.String())
}

func (a AdmittedState) String() string {
return redact.StringWithoutMarkers(a)
}

func (a AdmittedState) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("admitted=t%d/%s", a.Term, a.Admitted)
}

func (a PiggybackedAdmittedState) String() string {
return redact.StringWithoutMarkers(a)
}

func (a PiggybackedAdmittedState) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("[r%s,s%s,%d->%d] %s",
a.RangeID, a.ToStoreID, a.FromReplicaID, a.ToReplicaID, a.Admitted.String())
}
43 changes: 43 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ message RaftLogPosition {
// AdmittedResponseForRange is only used in RACv2. It contains a MsgAppResp
// from a follower to a leader, that was generated to advance the admitted
// vector for that follower, maintained by the leader.
//
// TODO(pav-kv): remove this type and use PiggybackedAdmittedState.
message AdmittedResponseForRange {
option (gogoproto.goproto_stringer) = false;

Expand All @@ -149,3 +151,44 @@ message AdmittedResponseForRange {
// Msg is the MsgAppResp containing the admitted vector.
raftpb.Message msg = 3 [(gogoproto.nullable) = false];
}

// AdmittedState communicates a replica's vector of admitted log indices at
// different priorities to the leader of a range.
//
// Used only in RACv2.
message AdmittedState {
option (gogoproto.goproto_stringer) = false;
// Term is the leader term of the log for which the Admitted indices were
// computed. The indices are consistent with this leader's log.
uint64 term = 1;
// Admitted contains admitted log indices for each priority < NumPriorities.
repeated uint64 admitted = 2;
}

// PiggybackedAdmittedState wraps the AdmittedState with the routing information
// needed to deliver the admitted vector to a particular leader replica, and for
// it to know who sent it.
//
// Used only in RACv2.
message PiggybackedAdmittedState {
option (gogoproto.goproto_stringer) = false;

// RangeID is the ID of the range to which this message is related. Used for
// routing at the leader node.
uint64 range_id = 1 [(gogoproto.customname) = "RangeID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeID"];
// ToStoreID is the store at the leader containing the leader replica. Used
// for routing at the leader node.
uint64 to_store_id = 2 [(gogoproto.customname) = "ToStoreID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.StoreID"];

// FromReplicaID is the replica sending this message.
uint64 from_replica_id = 3 [(gogoproto.customname) = "FromReplicaID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.ReplicaID"];
// ToReplicaID is the leader replica receiving this message.
uint64 to_replica_id = 4 [(gogoproto.customname) = "ToReplicaID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.ReplicaID"];

// Admitted is the admitted vector at the sending replica.
AdmittedState admitted = 5 [(gogoproto.nullable) = false];
}
2 changes: 0 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/node_rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ go_library(
deps = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol/replica_rac2",
"//pkg/raft/raftpb",
"//pkg/roachpb",
"//pkg/util/syncutil",
],
Expand All @@ -24,7 +23,6 @@ go_test(
embed = [":node_rac2"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/raft/raftpb",
"//pkg/roachpb",
"//pkg/testutils/datapathutils",
"//pkg/util/leaktest",
Expand Down
36 changes: 17 additions & 19 deletions pkg/kv/kvserver/kvflowcontrol/node_rac2/admitted_piggybacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)
Expand All @@ -29,7 +28,7 @@ type PiggybackMsgReader interface {
// least one message will be popped.
PopMsgsForNode(
now time.Time, nodeID roachpb.NodeID, maxBytes int64,
) (msgs []kvflowcontrolpb.AdmittedResponseForRange, remainingMsgs int)
) (_ []kvflowcontrolpb.PiggybackedAdmittedState, remainingMsgs int)
// NodesWithMsgs is used to periodically drop msgs from disconnected nodes.
// See RaftTransport.dropFlowTokensForDisconnectedNodes.
NodesWithMsgs(now time.Time) []roachpb.NodeID
Expand All @@ -46,7 +45,7 @@ type AdmittedPiggybacker struct {
}

type rangeMap struct {
rangeMap map[roachpb.RangeID]kvflowcontrolpb.AdmittedResponseForRange
rangeMap map[roachpb.RangeID]kvflowcontrolpb.PiggybackedAdmittedState
transitionToEmptyTime time.Time
}

Expand All @@ -59,32 +58,28 @@ func NewAdmittedPiggybacker() *AdmittedPiggybacker {
var _ PiggybackMsgReader = &AdmittedPiggybacker{}
var _ replica_rac2.AdmittedPiggybacker = &AdmittedPiggybacker{}

// AddMsgAppRespForLeader implements replica_rac2.AdmittedPiggybacker.
func (ap *AdmittedPiggybacker) AddMsgAppRespForLeader(
nodeID roachpb.NodeID, storeID roachpb.StoreID, rangeID roachpb.RangeID, msg raftpb.Message,
// Add implements replica_rac2.AdmittedPiggybacker.
func (ap *AdmittedPiggybacker) Add(
nodeID roachpb.NodeID, msg kvflowcontrolpb.PiggybackedAdmittedState,
) {
ap.mu.Lock()
defer ap.mu.Unlock()
rm, ok := ap.mu.msgsForNode[nodeID]
if !ok {
rm = &rangeMap{rangeMap: map[roachpb.RangeID]kvflowcontrolpb.AdmittedResponseForRange{}}
rm = &rangeMap{rangeMap: map[roachpb.RangeID]kvflowcontrolpb.PiggybackedAdmittedState{}}
ap.mu.msgsForNode[nodeID] = rm
}
rm.rangeMap[rangeID] = kvflowcontrolpb.AdmittedResponseForRange{
LeaderStoreID: storeID,
RangeID: rangeID,
Msg: msg,
}
rm.rangeMap[msg.RangeID] = msg
}

// Made-up number. There are 10+ integers, all varint encoded, many of which
// Made-up number. There are < 10 integers, all varint encoded, many of which
// like nodeID, storeID, replicaIDs etc. will be small.
const admittedForRangeRACv2SizeBytes = 50
const admittedForRangeRACv2SizeBytes = 40

// PopMsgsForNode implements PiggybackMsgReader.
func (ap *AdmittedPiggybacker) PopMsgsForNode(
now time.Time, nodeID roachpb.NodeID, maxBytes int64,
) (msgs []kvflowcontrolpb.AdmittedResponseForRange, remainingMsgs int) {
) (_ []kvflowcontrolpb.PiggybackedAdmittedState, remainingMsgs int) {
if ap == nil {
return nil, 0
}
Expand All @@ -94,13 +89,16 @@ func (ap *AdmittedPiggybacker) PopMsgsForNode(
if !ok || len(rm.rangeMap) == 0 {
return nil, 0
}
maxEntries := maxBytes / admittedForRangeRACv2SizeBytes
// NB: +1 to include at least one entry.
maxEntries := maxBytes/admittedForRangeRACv2SizeBytes + 1
msgs := make([]kvflowcontrolpb.PiggybackedAdmittedState, 0,
min(int64(len(rm.rangeMap)), maxEntries))
for rangeID, msg := range rm.rangeMap {
msgs = append(msgs, msg)
delete(rm.rangeMap, rangeID)
if int64(len(msgs)) > maxEntries {
if len(msgs) == cap(msgs) {
break
}
msgs = append(msgs, msg)
delete(rm.rangeMap, rangeID)
}
n := len(rm.rangeMap)
if n == 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -38,14 +37,20 @@ func TestPiggybacker(t *testing.T) {
func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "add":
var nodeID, storeID, rangeID, match int
var nodeID, storeID, rangeID, from, to, term int
d.ScanArgs(t, "node-id", &nodeID)
d.ScanArgs(t, "store-id", &storeID)
d.ScanArgs(t, "range-id", &rangeID)
// Match is just a placeholder to differentiate messages in the test.
d.ScanArgs(t, "match", &match)
p.AddMsgAppRespForLeader(roachpb.NodeID(nodeID), roachpb.StoreID(storeID),
roachpb.RangeID(rangeID), raftpb.Message{Match: uint64(match)})
d.ScanArgs(t, "from", &from)
d.ScanArgs(t, "to", &to)
d.ScanArgs(t, "term", &term)
p.Add(roachpb.NodeID(nodeID), kvflowcontrolpb.PiggybackedAdmittedState{
RangeID: roachpb.RangeID(rangeID),
ToStoreID: roachpb.StoreID(storeID),
FromReplicaID: roachpb.ReplicaID(from),
ToReplicaID: roachpb.ReplicaID(to),
Admitted: kvflowcontrolpb.AdmittedState{Term: uint64(term)},
})
return ""

case "nodes-with-msgs":
Expand All @@ -71,13 +76,13 @@ func TestPiggybacker(t *testing.T) {
var nodeID int
d.ScanArgs(t, "node-id", &nodeID)
msgs, remaining := p.PopMsgsForNode(ts, roachpb.NodeID(nodeID), math.MaxInt64)
slices.SortFunc(msgs, func(a, b kvflowcontrolpb.AdmittedResponseForRange) int {
slices.SortFunc(msgs, func(a, b kvflowcontrolpb.PiggybackedAdmittedState) int {
return cmp.Compare(a.RangeID, b.RangeID)
})
var b strings.Builder
fmt.Fprintf(&b, "msgs:\n")
for _, msg := range msgs {
fmt.Fprintf(&b, "s%s, r%s, match=%d\n", msg.LeaderStoreID, msg.RangeID, msg.Msg.Match)
fmt.Fprintf(&b, "%s\n", msg)
}
fmt.Fprintf(&b, "remaining-msgs: %d\n", remaining)
return b.String()
Expand All @@ -100,15 +105,15 @@ func TestPiggybackerMaxBytes(t *testing.T) {
defer log.Scope(t).Close(t)

p := NewAdmittedPiggybacker()
p.AddMsgAppRespForLeader(1, 1, 1, raftpb.Message{})
p.AddMsgAppRespForLeader(1, 1, 2, raftpb.Message{})
p.Add(1, kvflowcontrolpb.PiggybackedAdmittedState{RangeID: 1, ToStoreID: 1})
p.Add(1, kvflowcontrolpb.PiggybackedAdmittedState{RangeID: 2, ToStoreID: 1})
// Both are popped.
msgs, remaining := p.PopMsgsForNode(time.UnixMilli(1), 1, 60)
require.Equal(t, 2, len(msgs))
require.Equal(t, 0, remaining)

p.AddMsgAppRespForLeader(1, 1, 1, raftpb.Message{})
p.AddMsgAppRespForLeader(1, 1, 2, raftpb.Message{})
p.Add(1, kvflowcontrolpb.PiggybackedAdmittedState{RangeID: 1, ToStoreID: 1})
p.Add(1, kvflowcontrolpb.PiggybackedAdmittedState{RangeID: 2, ToStoreID: 1})
// Only one is popped.
msgs, remaining = p.PopMsgsForNode(time.UnixMilli(1), 1, 20)
require.Equal(t, 1, len(msgs))
Expand Down
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/kvflowcontrol/node_rac2/testdata/piggybacker
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ msgs:
remaining-msgs: 0

# Add for node 1.
add node-id=1 store-id=2 range-id=3 match=6
add node-id=1 store-id=2 range-id=3 from=2 to=1 term=6
----

# Add for node 11.
add node-id=11 store-id=12 range-id=13 match=14
add node-id=11 store-id=12 range-id=13 from=3 to=1 term=14
----

nodes-with-msgs time-sec=2
Expand All @@ -23,15 +23,15 @@ n1 n11
map len: 2

# Add another for node 11, for a different range.
add node-id=11 store-id=22 range-id=23 match=24
add node-id=11 store-id=22 range-id=23 from=2 to=1 term=24
----

# Pop both for node 11.
pop node-id=11 time-sec=2
----
msgs:
s12, r13, match=14
s22, r23, match=24
[r13,s12,3->1] admitted=t14/[]
[r23,s22,2->1] admitted=t24/[]
remaining-msgs: 0

# There is still an empty map entry for node 11.
Expand All @@ -47,14 +47,14 @@ n1
map len: 1

# Overwrite the msg for the range at node 1.
add node-id=1 store-id=2 range-id=3 match=7
add node-id=1 store-id=2 range-id=3 from=2 to=1 term=25
----

# Pop for node 1. There was only one msg.
pop node-id=1 time-sec=64
----
msgs:
s2, r3, match=7
[r3,s2,2->1] admitted=t25/[]
remaining-msgs: 0

# The map entry for node 1 is garbage collected.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol/rac2",
"//pkg/kv/kvserver/raftlog",
"//pkg/raft/raftpb",
Expand Down
23 changes: 15 additions & 8 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
Expand Down Expand Up @@ -132,13 +133,13 @@ type RaftNode interface {
StepMsgAppRespForAdmittedLocked(raftpb.Message) error
}

// AdmittedPiggybacker is used to enqueue MsgAppResp messages whose purpose is
// to advance Admitted. For efficiency, these need to be piggybacked on other
// messages being sent to the given leader node. The StoreID and RangeID are
// provided so that the leader node can route the incoming message to the
// relevant range.
// AdmittedPiggybacker is used to enqueue admitted vector messages addressed to
// replicas on a particular node. For efficiency, these need to be piggybacked
// on other messages being sent to the given leader node. The store / range /
// replica IDs are provided so that the leader node can route the incoming
// message to the relevant range.
type AdmittedPiggybacker interface {
AddMsgAppRespForLeader(roachpb.NodeID, roachpb.StoreID, roachpb.RangeID, raftpb.Message)
Add(roachpb.NodeID, kvflowcontrolpb.PiggybackedAdmittedState)
}

// EntryForAdmission is the information provided to the admission control (AC)
Expand Down Expand Up @@ -750,8 +751,14 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2.
p.opts.Replica.MuUnlock()
if p.mu.leader.rc == nil && p.mu.leaderNodeID != 0 {
// Follower, and know leaderNodeID, leaderStoreID.
p.opts.AdmittedPiggybacker.AddMsgAppRespForLeader(
p.mu.leaderNodeID, p.mu.leaderStoreID, p.opts.RangeID, msgResp)
// TODO(pav-kv): populate the message correctly.
p.opts.AdmittedPiggybacker.Add(p.mu.leaderNodeID, kvflowcontrolpb.PiggybackedAdmittedState{
RangeID: p.opts.RangeID,
ToStoreID: p.mu.leaderStoreID,
FromReplicaID: p.opts.ReplicaID,
ToReplicaID: roachpb.ReplicaID(msgResp.To),
Admitted: kvflowcontrolpb.AdmittedState{},
})
}
// Else if the local replica is the leader, we have already told it
// about the update by calling SetAdmittedLocked. If the leader is not
Expand Down
7 changes: 3 additions & 4 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,10 @@ type testAdmittedPiggybacker struct {
b *strings.Builder
}

func (p *testAdmittedPiggybacker) AddMsgAppRespForLeader(
n roachpb.NodeID, s roachpb.StoreID, r roachpb.RangeID, msg raftpb.Message,
func (p *testAdmittedPiggybacker) Add(
n roachpb.NodeID, m kvflowcontrolpb.PiggybackedAdmittedState,
) {
fmt.Fprintf(p.b, " Piggybacker.AddMsgAppRespForLeader(leader=(n%s,s%s,r%s), msg=%s)\n",
n, s, r, msgString(msg))
fmt.Fprintf(p.b, " Piggybacker.Add(n%s, %s)\n", n, m)
}

type testACWorkQueue struct {
Expand Down
Loading

0 comments on commit 6c51566

Please sign in to comment.