Skip to content

Commit

Permalink
replica_rac2: convert piggybacker to new proto
Browse files Browse the repository at this point in the history
Epic: none
Release note: none
  • Loading branch information
pav-kv committed Sep 4, 2024
1 parent 8fc31ce commit 379a101
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 70 deletions.
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/flow_control_raft_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,8 +681,11 @@ func TestFlowControlRaftTransportV2(t *testing.T) {
toNodeID := parseNodeID(t, d, "node")
toStoreID := parseStoreID(t, d, "store")
rangeID := parseRangeID(t, d, "range")
control.piggybacker.AddMsgAppRespForLeader(
toNodeID, toStoreID, rangeID, raftpb.Message{})
// TODO(pav-kv): test that these messages are actually sent in
// RaftMessageRequestBatch.
control.piggybacker.Add(toNodeID, kvflowcontrolpb.PiggybackedAdmittedState{
RangeID: rangeID, ToStoreID: toStoreID,
})
return ""

case "fallback-piggyback":
Expand Down
2 changes: 0 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/node_rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ go_library(
deps = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol/replica_rac2",
"//pkg/raft/raftpb",
"//pkg/roachpb",
"//pkg/util/syncutil",
],
Expand All @@ -24,7 +23,6 @@ go_test(
embed = [":node_rac2"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/raft/raftpb",
"//pkg/roachpb",
"//pkg/testutils/datapathutils",
"//pkg/util/leaktest",
Expand Down
36 changes: 17 additions & 19 deletions pkg/kv/kvserver/kvflowcontrol/node_rac2/admitted_piggybacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)
Expand All @@ -29,7 +28,7 @@ type PiggybackMsgReader interface {
// least one message will be popped.
PopMsgsForNode(
now time.Time, nodeID roachpb.NodeID, maxBytes int64,
) (msgs []kvflowcontrolpb.AdmittedResponseForRange, remainingMsgs int)
) (_ []kvflowcontrolpb.PiggybackedAdmittedState, remainingMsgs int)
// NodesWithMsgs is used to periodically drop msgs from disconnected nodes.
// See RaftTransport.dropFlowTokensForDisconnectedNodes.
NodesWithMsgs(now time.Time) []roachpb.NodeID
Expand All @@ -46,7 +45,7 @@ type AdmittedPiggybacker struct {
}

type rangeMap struct {
rangeMap map[roachpb.RangeID]kvflowcontrolpb.AdmittedResponseForRange
rangeMap map[roachpb.RangeID]kvflowcontrolpb.PiggybackedAdmittedState
transitionToEmptyTime time.Time
}

Expand All @@ -59,32 +58,28 @@ func NewAdmittedPiggybacker() *AdmittedPiggybacker {
var _ PiggybackMsgReader = &AdmittedPiggybacker{}
var _ replica_rac2.AdmittedPiggybacker = &AdmittedPiggybacker{}

// AddMsgAppRespForLeader implements replica_rac2.AdmittedPiggybacker.
func (ap *AdmittedPiggybacker) AddMsgAppRespForLeader(
nodeID roachpb.NodeID, storeID roachpb.StoreID, rangeID roachpb.RangeID, msg raftpb.Message,
// Add implements replica_rac2.AdmittedPiggybacker.
func (ap *AdmittedPiggybacker) Add(
nodeID roachpb.NodeID, msg kvflowcontrolpb.PiggybackedAdmittedState,
) {
ap.mu.Lock()
defer ap.mu.Unlock()
rm, ok := ap.mu.msgsForNode[nodeID]
if !ok {
rm = &rangeMap{rangeMap: map[roachpb.RangeID]kvflowcontrolpb.AdmittedResponseForRange{}}
rm = &rangeMap{rangeMap: map[roachpb.RangeID]kvflowcontrolpb.PiggybackedAdmittedState{}}
ap.mu.msgsForNode[nodeID] = rm
}
rm.rangeMap[rangeID] = kvflowcontrolpb.AdmittedResponseForRange{
LeaderStoreID: storeID,
RangeID: rangeID,
Msg: msg,
}
rm.rangeMap[msg.RangeID] = msg
}

// Made-up number. There are 10+ integers, all varint encoded, many of which
// Made-up number. There are < 10 integers, all varint encoded, many of which
// like nodeID, storeID, replicaIDs etc. will be small.
const admittedForRangeRACv2SizeBytes = 50
const admittedForRangeRACv2SizeBytes = 40

// PopMsgsForNode implements PiggybackMsgReader.
func (ap *AdmittedPiggybacker) PopMsgsForNode(
now time.Time, nodeID roachpb.NodeID, maxBytes int64,
) (msgs []kvflowcontrolpb.AdmittedResponseForRange, remainingMsgs int) {
) (_ []kvflowcontrolpb.PiggybackedAdmittedState, remainingMsgs int) {
if ap == nil {
return nil, 0
}
Expand All @@ -94,13 +89,16 @@ func (ap *AdmittedPiggybacker) PopMsgsForNode(
if !ok || len(rm.rangeMap) == 0 {
return nil, 0
}
maxEntries := maxBytes / admittedForRangeRACv2SizeBytes
// NB: +1 to include at least one entry.
maxEntries := maxBytes/admittedForRangeRACv2SizeBytes + 1
msgs := make([]kvflowcontrolpb.PiggybackedAdmittedState, 0,
min(int64(len(rm.rangeMap)), maxEntries))
for rangeID, msg := range rm.rangeMap {
msgs = append(msgs, msg)
delete(rm.rangeMap, rangeID)
if int64(len(msgs)) > maxEntries {
if len(msgs) == cap(msgs) {
break
}
msgs = append(msgs, msg)
delete(rm.rangeMap, rangeID)
}
n := len(rm.rangeMap)
if n == 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -38,14 +37,20 @@ func TestPiggybacker(t *testing.T) {
func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "add":
var nodeID, storeID, rangeID, match int
var nodeID, storeID, rangeID, from, to, term int
d.ScanArgs(t, "node-id", &nodeID)
d.ScanArgs(t, "store-id", &storeID)
d.ScanArgs(t, "range-id", &rangeID)
// Match is just a placeholder to differentiate messages in the test.
d.ScanArgs(t, "match", &match)
p.AddMsgAppRespForLeader(roachpb.NodeID(nodeID), roachpb.StoreID(storeID),
roachpb.RangeID(rangeID), raftpb.Message{Match: uint64(match)})
d.ScanArgs(t, "from", &from)
d.ScanArgs(t, "to", &to)
d.ScanArgs(t, "term", &term)
p.Add(roachpb.NodeID(nodeID), kvflowcontrolpb.PiggybackedAdmittedState{
RangeID: roachpb.RangeID(rangeID),
ToStoreID: roachpb.StoreID(storeID),
FromReplicaID: roachpb.ReplicaID(from),
ToReplicaID: roachpb.ReplicaID(to),
Admitted: kvflowcontrolpb.AdmittedState{Term: uint64(term)},
})
return ""

case "nodes-with-msgs":
Expand All @@ -71,13 +76,13 @@ func TestPiggybacker(t *testing.T) {
var nodeID int
d.ScanArgs(t, "node-id", &nodeID)
msgs, remaining := p.PopMsgsForNode(ts, roachpb.NodeID(nodeID), math.MaxInt64)
slices.SortFunc(msgs, func(a, b kvflowcontrolpb.AdmittedResponseForRange) int {
slices.SortFunc(msgs, func(a, b kvflowcontrolpb.PiggybackedAdmittedState) int {
return cmp.Compare(a.RangeID, b.RangeID)
})
var b strings.Builder
fmt.Fprintf(&b, "msgs:\n")
for _, msg := range msgs {
fmt.Fprintf(&b, "s%s, r%s, match=%d\n", msg.LeaderStoreID, msg.RangeID, msg.Msg.Match)
fmt.Fprintf(&b, "%s\n", msg)
}
fmt.Fprintf(&b, "remaining-msgs: %d\n", remaining)
return b.String()
Expand All @@ -100,15 +105,15 @@ func TestPiggybackerMaxBytes(t *testing.T) {
defer log.Scope(t).Close(t)

p := NewAdmittedPiggybacker()
p.AddMsgAppRespForLeader(1, 1, 1, raftpb.Message{})
p.AddMsgAppRespForLeader(1, 1, 2, raftpb.Message{})
p.Add(1, kvflowcontrolpb.PiggybackedAdmittedState{RangeID: 1, ToStoreID: 1})
p.Add(1, kvflowcontrolpb.PiggybackedAdmittedState{RangeID: 2, ToStoreID: 1})
// Both are popped.
msgs, remaining := p.PopMsgsForNode(time.UnixMilli(1), 1, 60)
require.Equal(t, 2, len(msgs))
require.Equal(t, 0, remaining)

p.AddMsgAppRespForLeader(1, 1, 1, raftpb.Message{})
p.AddMsgAppRespForLeader(1, 1, 2, raftpb.Message{})
p.Add(1, kvflowcontrolpb.PiggybackedAdmittedState{RangeID: 1, ToStoreID: 1})
p.Add(1, kvflowcontrolpb.PiggybackedAdmittedState{RangeID: 2, ToStoreID: 1})
// Only one is popped.
msgs, remaining = p.PopMsgsForNode(time.UnixMilli(1), 1, 20)
require.Equal(t, 1, len(msgs))
Expand Down
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/kvflowcontrol/node_rac2/testdata/piggybacker
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ msgs:
remaining-msgs: 0

# Add for node 1.
add node-id=1 store-id=2 range-id=3 match=6
add node-id=1 store-id=2 range-id=3 from=2 to=1 term=6
----

# Add for node 11.
add node-id=11 store-id=12 range-id=13 match=14
add node-id=11 store-id=12 range-id=13 from=3 to=1 term=14
----

nodes-with-msgs time-sec=2
Expand All @@ -23,15 +23,15 @@ n1 n11
map len: 2

# Add another for node 11, for a different range.
add node-id=11 store-id=22 range-id=23 match=24
add node-id=11 store-id=22 range-id=23 from=2 to=1 term=24
----

# Pop both for node 11.
pop node-id=11 time-sec=2
----
msgs:
s12, r13, match=14
s22, r23, match=24
[r13,s12,3->1] admitted=t14/[]
[r23,s22,2->1] admitted=t24/[]
remaining-msgs: 0

# There is still an empty map entry for node 11.
Expand All @@ -47,14 +47,14 @@ n1
map len: 1

# Overwrite the msg for the range at node 1.
add node-id=1 store-id=2 range-id=3 match=7
add node-id=1 store-id=2 range-id=3 from=2 to=1 term=25
----

# Pop for node 1. There was only one msg.
pop node-id=1 time-sec=64
----
msgs:
s2, r3, match=7
[r3,s2,2->1] admitted=t25/[]
remaining-msgs: 0

# The map entry for node 1 is garbage collected.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol/rac2",
"//pkg/kv/kvserver/raftlog",
"//pkg/raft/raftpb",
Expand Down
23 changes: 15 additions & 8 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
Expand Down Expand Up @@ -132,13 +133,13 @@ type RaftNode interface {
StepMsgAppRespForAdmittedLocked(raftpb.Message) error
}

// AdmittedPiggybacker is used to enqueue MsgAppResp messages whose purpose is
// to advance Admitted. For efficiency, these need to be piggybacked on other
// messages being sent to the given leader node. The StoreID and RangeID are
// provided so that the leader node can route the incoming message to the
// relevant range.
// AdmittedPiggybacker is used to enqueue admitted vector messages addressed to
// replicas on a particular node. For efficiency, these need to be piggybacked
// on other messages being sent to the given leader node. The store / range /
// replica IDs are provided so that the leader node can route the incoming
// message to the relevant range.
type AdmittedPiggybacker interface {
AddMsgAppRespForLeader(roachpb.NodeID, roachpb.StoreID, roachpb.RangeID, raftpb.Message)
Add(roachpb.NodeID, kvflowcontrolpb.PiggybackedAdmittedState)
}

// EntryForAdmission is the information provided to the admission control (AC)
Expand Down Expand Up @@ -752,8 +753,14 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2.
p.opts.Replica.MuUnlock()
if p.mu.leader.rc == nil && p.mu.leaderNodeID != 0 {
// Follower, and know leaderNodeID, leaderStoreID.
p.opts.AdmittedPiggybacker.AddMsgAppRespForLeader(
p.mu.leaderNodeID, p.mu.leaderStoreID, p.opts.RangeID, msgResp)
// TODO(pav-kv): populate the message correctly.
p.opts.AdmittedPiggybacker.Add(p.mu.leaderNodeID, kvflowcontrolpb.PiggybackedAdmittedState{
RangeID: p.opts.RangeID,
ToStoreID: p.mu.leaderStoreID,
FromReplicaID: p.opts.ReplicaID,
ToReplicaID: roachpb.ReplicaID(msgResp.To),
Admitted: kvflowcontrolpb.AdmittedState{},
})
}
// Else if the local replica is the leader, we have already told it
// about the update by calling SetAdmittedLocked. If the leader is not
Expand Down
7 changes: 3 additions & 4 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,10 @@ type testAdmittedPiggybacker struct {
b *strings.Builder
}

func (p *testAdmittedPiggybacker) AddMsgAppRespForLeader(
n roachpb.NodeID, s roachpb.StoreID, r roachpb.RangeID, msg raftpb.Message,
func (p *testAdmittedPiggybacker) Add(
n roachpb.NodeID, m kvflowcontrolpb.PiggybackedAdmittedState,
) {
fmt.Fprintf(p.b, " Piggybacker.AddMsgAppRespForLeader(leader=(n%s,s%s,r%s), msg=%s)\n",
n, s, r, msgString(msg))
fmt.Fprintf(p.b, " Piggybacker.Add(n%s, %s)\n", n, m)
}

type testACWorkQueue struct {
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ HandleRaftReady:
Replica.MuLock
RaftNode.SetAdmittedLocked([24, 25, 25, 25]) = type: MsgAppResp from: 0 to: 0
Replica.MuUnlock
Piggybacker.AddMsgAppRespForLeader(leader=(n11,s11,r3), msg=type: MsgAppResp from: 0 to: 0)
Piggybacker.Add(n11, [r3,s11,5->0] admitted=t0/[])
.....

# Side channel for entries [26, 26] with no low-pri override.
Expand Down Expand Up @@ -197,7 +197,7 @@ HandleRaftReady:
Replica.MuLock
RaftNode.SetAdmittedLocked([24, 26, 25, 26]) = type: MsgAppResp from: 0 to: 0
Replica.MuUnlock
Piggybacker.AddMsgAppRespForLeader(leader=(n11,s11,r3), msg=type: MsgAppResp from: 0 to: 0)
Piggybacker.Add(n11, [r3,s11,5->0] admitted=t0/[])
.....

# Callback is accurate and index 25 is admitted.
Expand All @@ -220,7 +220,7 @@ HandleRaftReady:
Replica.MuLock
RaftNode.SetAdmittedLocked([26, 26, 25, 26]) = type: MsgAppResp from: 0 to: 0
Replica.MuUnlock
Piggybacker.AddMsgAppRespForLeader(leader=(n11,s11,r3), msg=type: MsgAppResp from: 0 to: 0)
Piggybacker.Add(n11, [r3,s11,5->0] admitted=t0/[])
.....

# Side channel for entries [27,27] indicate a low-pri override.
Expand Down Expand Up @@ -270,7 +270,7 @@ HandleRaftReady:
Replica.MuLock
RaftNode.SetAdmittedLocked([26, 26, 26, 26]) = type: MsgAppResp from: 0 to: 0
Replica.MuUnlock
Piggybacker.AddMsgAppRespForLeader(leader=(n11,s11,r3), msg=type: MsgAppResp from: 0 to: 0)
Piggybacker.Add(n11, [r3,s11,5->0] admitted=t0/[])
.....

# index 27 is still waiting for admission, but we switch to a new leader that
Expand Down Expand Up @@ -340,7 +340,7 @@ HandleRaftReady:
Replica.MuLock
RaftNode.SetAdmittedLocked([27, 27, 27, 27]) = type: MsgAppResp from: 0 to: 0
Replica.MuUnlock
Piggybacker.AddMsgAppRespForLeader(leader=(n11,s11,r3), msg=type: MsgAppResp from: 0 to: 0)
Piggybacker.Add(n11, [r3,s11,5->0] admitted=t0/[])
.....

# Noop, since not the leader.
Expand Down
Loading

0 comments on commit 379a101

Please sign in to comment.