Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvsever: hacky raft proposal tracer #131863

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1515,6 +1515,7 @@ GO_TARGETS = [
"//pkg/kv/kvserver/raftentry:raftentry_test",
"//pkg/kv/kvserver/raftlog:raftlog",
"//pkg/kv/kvserver/raftlog:raftlog_test",
"//pkg/kv/kvserver/rafttrace:rafttrace",
"//pkg/kv/kvserver/raftutil:raftutil",
"//pkg/kv/kvserver/raftutil:raftutil_test",
"//pkg/kv/kvserver/rangefeed:rangefeed",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ go_library(
"//pkg/kv/kvserver/multiqueue",
"//pkg/kv/kvserver/raftentry",
"//pkg/kv/kvserver/raftlog",
"//pkg/kv/kvserver/rafttrace",
"//pkg/kv/kvserver/rangefeed",
"//pkg/kv/kvserver/rditer",
"//pkg/kv/kvserver/readsummary",
Expand Down
36 changes: 36 additions & 0 deletions pkg/kv/kvserver/client_raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,42 @@ func TestRaftLogQueue(t *testing.T) {
}
}

func TestRaftTracerDemo(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
RaftConfig: base.RaftConfig{
RangeLeaseDuration: 24 * time.Hour, // disable lease moves
RaftElectionTimeoutTicks: 1 << 30, // disable elections
},
},
})
ctx := context.Background()
defer tc.Stopper().Stop(ctx)
store := tc.GetFirstStoreFromServer(t, 0)

// Write a single value to ensure we have a leader on n1.
key := tc.ScratchRange(t)
_, pErr := kv.SendWrapped(ctx, store.TestSender(), putArgs(key, []byte("value")))
require.NoError(t, pErr.GoError())
require.NoError(t, tc.WaitForSplitAndInitialization(key))
// Set to have 3 voters.
tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...)
tc.WaitForVotersOrFatal(t, key, tc.Targets(1, 2)...)

for i := 0; i < 100; i++ {
value := fmt.Sprintf("value-%d", i)
_, pErr := kv.SendWrapped(ctx, store.TestSender(), putArgs(key, []byte(value)))
require.NoError(t, pErr.GoError())
}

time.Sleep(3 * time.Second)
t.FailNow()
}

// TestCrashWhileTruncatingSideloadedEntries emulates a process crash in the
// middle of applying a raft log truncation command that removes some entries
// from the sideloaded storage. The test expects that storage remains in a
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/kvserverpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ message RaftMessageRequest {
// TODO(pav-kv): remove.
repeated kv.kvserver.kvflowcontrol.kvflowcontrolpb.AdmittedResponseForRange admitted_response = 15 [(gogoproto.nullable) = false];
reserved 10;

LogMark trace_mark = 16;
}

message LogMark {
uint64 term = 1;
uint64 index = 2;
}

message RaftMessageRequestBatch {
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ type raftLogger struct {
ctx context.Context
}

func (r *raftLogger) Tracef(format string, v ...any) {
log.Infof(r.ctx, format, v...)
}

func (r *raftLogger) Debug(v ...interface{}) {
if log.V(3) {
log.InfofDepth(r.ctx, 1, "", v...)
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvserver/rafttrace/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "rafttrace",
srcs = ["rafttrace.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace",
visibility = ["//visibility:public"],
deps = [
"//pkg/raft",
"//pkg/raft/raftpb",
"//pkg/util/syncutil",
],
)
131 changes: 131 additions & 0 deletions pkg/kv/kvserver/rafttrace/rafttrace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package rafttrace

import (
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

type Logger interface {
Tracef(format string, v ...any)
}

type RaftTracer struct {
mu syncutil.RWMutex
mark raft.LogMark
log Logger
}

func New(logger Logger) *RaftTracer {
return &RaftTracer{log: logger}
}

func (r *RaftTracer) MaybeRegisterProposal(mark raft.LogMark, id string) {
r.mu.Lock()
defer r.mu.Unlock()
if r.mark.Term == 0 {
r.mark = mark
r.log.Tracef("[raft] registered proposal %x at log mark (t%d,i%d) for tracing",
id, mark.Term, mark.Index)
}
}

func (r *RaftTracer) MaybeRegister(mark raft.LogMark) {
r.mu.Lock()
defer r.mu.Unlock()
if r.mark.Term == 0 {
r.mark = mark
r.log.Tracef("[raft] registered log mark (t%d,i%d) for tracing", mark.Term, mark.Index)
}
}

func (r *RaftTracer) Unregister() {
r.mu.Lock()
defer r.mu.Unlock()
r.log.Tracef("[raft] unregistered log mark (t%d,i%d) from tracing", r.mark.Term, r.mark.Index)
r.mark = raft.LogMark{}
}

func (r *RaftTracer) MaybeTrace(m raftpb.Message) (raft.LogMark, bool) {
mark := r.logMark()
if mark.Term == 0 {
return raft.LogMark{}, false
}
switch m.Type {
case raftpb.MsgProp, raftpb.MsgApp, raftpb.MsgStorageAppend, raftpb.MsgStorageApply:
return mark, r.traceIfCovered(mark, m)
case raftpb.MsgAppResp, raftpb.MsgStorageAppendResp, raftpb.MsgStorageApplyResp:
if !r.traceIfPast(mark, m) {
return raft.LogMark{}, false
}
if m.Type == raftpb.MsgStorageApplyResp {
r.Unregister()
return raft.LogMark{}, false
}
return mark, true
}
return raft.LogMark{}, false
}

func (r *RaftTracer) traceIfCovered(mark raft.LogMark, m raftpb.Message) bool {
covered := false
switch m.Type {
case raftpb.MsgProp:
covered = inRange(mark.Index, m.Entries)
case raftpb.MsgApp:
covered = coveredBy(mark, m.Term, m.Entries)
case raftpb.MsgStorageAppend:
covered = coveredBy(mark, m.LogTerm, m.Entries)
case raftpb.MsgStorageApply:
covered = inRange(mark.Index, m.Entries)
}
if !covered {
return false
}
r.log.Tracef("[raft] message covering mark (t%d,i%d): %s",
mark.Term, mark.Index,
raft.DescribeMessage(m, func(bytes []byte) string { return "" }))
return true
}

func (r *RaftTracer) traceIfPast(mark raft.LogMark, m raftpb.Message) bool {
if m.Reject {
return false
}
passed := false
switch m.Type {
case raftpb.MsgAppResp:
passed = !mark.After(raft.LogMark{Term: m.Term, Index: m.Index})
case raftpb.MsgStorageAppendResp:
passed = !mark.After(raft.LogMark{Term: m.LogTerm, Index: m.Index})
case raftpb.MsgStorageApplyResp:
passed = len(m.Entries) != 0 && m.Entries[len(m.Entries)-1].Index >= mark.Index
}
if !passed {
return false
}
r.log.Tracef("[raft] message past mark (t%d,i%d): %s",
mark.Term, mark.Index,
raft.DescribeMessage(m, func(bytes []byte) string { return "" }))
return true
}

func coveredBy(mark raft.LogMark, term uint64, entries []raftpb.Entry) bool {
return term == mark.Term && inRange(mark.Index, entries)
}

func inRange(index uint64, entries []raftpb.Entry) bool {
return len(entries) != 0 &&
entries[0].Index <= index && index <= entries[len(entries)-1].Index
}

func (r *RaftTracer) logMark() raft.LogMark {
r.mu.RLock()
defer r.mu.RUnlock()
return r.mark
}
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/load"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
Expand Down Expand Up @@ -757,6 +758,7 @@ type Replica struct {
//
// TODO(erikgrinaker): make this never be nil.
internalRaftGroup *raft.RawNode
raftTracer *rafttrace.RaftTracer

// The ID of the leader replica within the Raft group. NB: this is updated
// in a separate critical section from the Raft group, and can therefore
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,5 @@ func (r *Replica) disconnectReplicationRaftMuLocked(ctx context.Context) {
log.Fatalf(ctx, "removing raft group before destroying replica %s", r)
}
r.mu.internalRaftGroup = nil
r.mu.raftTracer = nil
}
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/load"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/raft"
Expand Down Expand Up @@ -308,19 +309,21 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked(s kvstorage.LoadedReplicaState
// replica, replacing the existing Raft group if any.
func (r *Replica) initRaftGroupRaftMuLockedReplicaMuLocked() error {
ctx := r.AnnotateCtx(context.Background())
logger := &raftLogger{ctx: ctx}
rg, err := raft.NewRawNode(newRaftConfig(
ctx,
(*replicaRaftStorage)(r),
raftpb.PeerID(r.replicaID),
r.shMu.state.RaftAppliedIndex,
r.store.cfg,
&raftLogger{ctx: ctx},
logger,
(*replicaRLockedStoreLiveness)(r),
))
if err != nil {
return err
}
r.mu.internalRaftGroup = rg
r.mu.raftTracer = rafttrace.New(logger)
r.flowControlV2.InitRaftLocked(ctx, replica_rac2.NewRaftNode(rg, (*replicaForRACv2)(r)))
return nil
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/leases"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace"
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -126,6 +127,7 @@ type singleBatchProposer interface {
getReplicaID() roachpb.ReplicaID
flowControlHandle(ctx context.Context) kvflowcontrol.Handle
onErrProposalDropped([]raftpb.Entry, []*ProposalData, raft.StateType)
getRaftTracer() *rafttrace.RaftTracer
}

// A proposer is an object that uses a propBuf to coordinate Raft proposals.
Expand Down Expand Up @@ -164,6 +166,7 @@ type proposer interface {
// proposerRaft abstracts the propBuf's dependency on *raft.RawNode, to help
// testing.
type proposerRaft interface {
Term() uint64
Step(raftpb.Message) error
Status() raft.Status
BasicStatus() raft.BasicStatus
Expand Down Expand Up @@ -887,6 +890,15 @@ func proposeBatch(
// API where it populates the index and term for the passed in
// slice of entries. See etcd-io/raft#57.
maybeDeductFlowTokens(ctx, p.flowControlHandle(ctx), handles, ents)

const index = uint64(50)
if ents[0].Index <= index && ents[len(ents)-1].Index >= index {
offset := index - ents[0].Index
p.getRaftTracer().MaybeRegisterProposal(raft.LogMark{
Term: raftGroup.Term(),
Index: index,
}, string(props[offset].idKey))
}
}
return err
}
Expand Down Expand Up @@ -1194,6 +1206,10 @@ func (rp *replicaProposer) onErrProposalDropped(
}
}

func (rp *replicaProposer) getRaftTracer() *rafttrace.RaftTracer {
return rp.mu.raftTracer
}

func (rp *replicaProposer) leaseDebugRLocked() string {
return rp.shMu.state.Lease.String()
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/leases"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil"
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
Expand Down Expand Up @@ -94,6 +95,10 @@ type testProposerRaft struct {

var _ proposerRaft = &testProposerRaft{}

func (t *testProposerRaft) Term() uint64 {
return 0
}

func (t *testProposerRaft) Step(msg raftpb.Message) error {
if msg.Type != raftpb.MsgProp {
return nil
Expand Down Expand Up @@ -153,6 +158,10 @@ func (t *testProposer) getReplicaID() roachpb.ReplicaID {
return 1
}

func (rp *testProposer) getRaftTracer() *rafttrace.RaftTracer {
return nil
}

func (t *testProposer) getReplicaDesc() roachpb.ReplicaDescriptor {
return roachpb.ReplicaDescriptor{StoreID: t.getStoreID(), ReplicaID: t.getReplicaID()}
}
Expand Down
Loading
Loading