From 50a5ef8a90035fa53e70691c953553cabd6da502 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 4 Oct 2024 00:11:58 +0100 Subject: [PATCH] kvsever: hacky raft proposal tracer Epic: none Release note: none --- pkg/BUILD.bazel | 1 + pkg/kv/kvserver/BUILD.bazel | 1 + pkg/kv/kvserver/client_raft_log_queue_test.go | 36 +++++ pkg/kv/kvserver/kvserverpb/raft.proto | 7 + pkg/kv/kvserver/raft.go | 4 + pkg/kv/kvserver/rafttrace/BUILD.bazel | 13 ++ pkg/kv/kvserver/rafttrace/rafttrace.go | 131 ++++++++++++++++++ pkg/kv/kvserver/replica.go | 2 + pkg/kv/kvserver/replica_destroy.go | 1 + pkg/kv/kvserver/replica_init.go | 5 +- pkg/kv/kvserver/replica_proposal_buf.go | 16 +++ pkg/kv/kvserver/replica_proposal_buf_test.go | 9 ++ pkg/kv/kvserver/replica_raft.go | 13 ++ .../lint/passes/fmtsafe/functions.go | 1 + 14 files changed, 239 insertions(+), 1 deletion(-) create mode 100644 pkg/kv/kvserver/rafttrace/BUILD.bazel create mode 100644 pkg/kv/kvserver/rafttrace/rafttrace.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 8b311413b705..88ecc112d15d 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1515,6 +1515,7 @@ GO_TARGETS = [ "//pkg/kv/kvserver/raftentry:raftentry_test", "//pkg/kv/kvserver/raftlog:raftlog", "//pkg/kv/kvserver/raftlog:raftlog_test", + "//pkg/kv/kvserver/rafttrace:rafttrace", "//pkg/kv/kvserver/raftutil:raftutil", "//pkg/kv/kvserver/raftutil:raftutil_test", "//pkg/kv/kvserver/rangefeed:rangefeed", diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index b98055d2a36f..a8a2283a8e3d 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -162,6 +162,7 @@ go_library( "//pkg/kv/kvserver/multiqueue", "//pkg/kv/kvserver/raftentry", "//pkg/kv/kvserver/raftlog", + "//pkg/kv/kvserver/rafttrace", "//pkg/kv/kvserver/rangefeed", "//pkg/kv/kvserver/rditer", "//pkg/kv/kvserver/readsummary", diff --git a/pkg/kv/kvserver/client_raft_log_queue_test.go b/pkg/kv/kvserver/client_raft_log_queue_test.go index 9a466877c684..fb3cd912c5bb 100644 --- a/pkg/kv/kvserver/client_raft_log_queue_test.go +++ b/pkg/kv/kvserver/client_raft_log_queue_test.go @@ -132,6 +132,42 @@ func TestRaftLogQueue(t *testing.T) { } } +func TestRaftTracerDemo(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + RaftConfig: base.RaftConfig{ + RangeLeaseDuration: 24 * time.Hour, // disable lease moves + RaftElectionTimeoutTicks: 1 << 30, // disable elections + }, + }, + }) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) + store := tc.GetFirstStoreFromServer(t, 0) + + // Write a single value to ensure we have a leader on n1. + key := tc.ScratchRange(t) + _, pErr := kv.SendWrapped(ctx, store.TestSender(), putArgs(key, []byte("value"))) + require.NoError(t, pErr.GoError()) + require.NoError(t, tc.WaitForSplitAndInitialization(key)) + // Set to have 3 voters. + tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...) + tc.WaitForVotersOrFatal(t, key, tc.Targets(1, 2)...) + + for i := 0; i < 100; i++ { + value := fmt.Sprintf("value-%d", i) + _, pErr := kv.SendWrapped(ctx, store.TestSender(), putArgs(key, []byte(value))) + require.NoError(t, pErr.GoError()) + } + + time.Sleep(3 * time.Second) + t.FailNow() +} + // TestCrashWhileTruncatingSideloadedEntries emulates a process crash in the // middle of applying a raft log truncation command that removes some entries // from the sideloaded storage. The test expects that storage remains in a diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index cd1052db9a32..dd02f6bc8ece 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -109,6 +109,13 @@ message RaftMessageRequest { // TODO(pav-kv): remove. repeated kv.kvserver.kvflowcontrol.kvflowcontrolpb.AdmittedResponseForRange admitted_response = 15 [(gogoproto.nullable) = false]; reserved 10; + + LogMark trace_mark = 16; +} + +message LogMark { + uint64 term = 1; + uint64 index = 2; } message RaftMessageRequestBatch { diff --git a/pkg/kv/kvserver/raft.go b/pkg/kv/kvserver/raft.go index 42f2ebbc6d8b..9e92c2ea3f40 100644 --- a/pkg/kv/kvserver/raft.go +++ b/pkg/kv/kvserver/raft.go @@ -53,6 +53,10 @@ type raftLogger struct { ctx context.Context } +func (r *raftLogger) Tracef(format string, v ...any) { + log.Infof(r.ctx, format, v...) +} + func (r *raftLogger) Debug(v ...interface{}) { if log.V(3) { log.InfofDepth(r.ctx, 1, "", v...) diff --git a/pkg/kv/kvserver/rafttrace/BUILD.bazel b/pkg/kv/kvserver/rafttrace/BUILD.bazel new file mode 100644 index 000000000000..5ab09b3c7eee --- /dev/null +++ b/pkg/kv/kvserver/rafttrace/BUILD.bazel @@ -0,0 +1,13 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "rafttrace", + srcs = ["rafttrace.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace", + visibility = ["//visibility:public"], + deps = [ + "//pkg/raft", + "//pkg/raft/raftpb", + "//pkg/util/syncutil", + ], +) diff --git a/pkg/kv/kvserver/rafttrace/rafttrace.go b/pkg/kv/kvserver/rafttrace/rafttrace.go new file mode 100644 index 000000000000..b2132988f5a7 --- /dev/null +++ b/pkg/kv/kvserver/rafttrace/rafttrace.go @@ -0,0 +1,131 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package rafttrace + +import ( + "github.com/cockroachdb/cockroach/pkg/raft" + "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +type Logger interface { + Tracef(format string, v ...any) +} + +type RaftTracer struct { + mu syncutil.RWMutex + mark raft.LogMark + log Logger +} + +func New(logger Logger) *RaftTracer { + return &RaftTracer{log: logger} +} + +func (r *RaftTracer) MaybeRegisterProposal(mark raft.LogMark, id string) { + r.mu.Lock() + defer r.mu.Unlock() + if r.mark.Term == 0 { + r.mark = mark + r.log.Tracef("[raft] registered proposal %x at log mark (t%d,i%d) for tracing", + id, mark.Term, mark.Index) + } +} + +func (r *RaftTracer) MaybeRegister(mark raft.LogMark) { + r.mu.Lock() + defer r.mu.Unlock() + if r.mark.Term == 0 { + r.mark = mark + r.log.Tracef("[raft] registered log mark (t%d,i%d) for tracing", mark.Term, mark.Index) + } +} + +func (r *RaftTracer) Unregister() { + r.mu.Lock() + defer r.mu.Unlock() + r.log.Tracef("[raft] unregistered log mark (t%d,i%d) from tracing", r.mark.Term, r.mark.Index) + r.mark = raft.LogMark{} +} + +func (r *RaftTracer) MaybeTrace(m raftpb.Message) (raft.LogMark, bool) { + mark := r.logMark() + if mark.Term == 0 { + return raft.LogMark{}, false + } + switch m.Type { + case raftpb.MsgProp, raftpb.MsgApp, raftpb.MsgStorageAppend, raftpb.MsgStorageApply: + return mark, r.traceIfCovered(mark, m) + case raftpb.MsgAppResp, raftpb.MsgStorageAppendResp, raftpb.MsgStorageApplyResp: + if !r.traceIfPast(mark, m) { + return raft.LogMark{}, false + } + if m.Type == raftpb.MsgStorageApplyResp { + r.Unregister() + return raft.LogMark{}, false + } + return mark, true + } + return raft.LogMark{}, false +} + +func (r *RaftTracer) traceIfCovered(mark raft.LogMark, m raftpb.Message) bool { + covered := false + switch m.Type { + case raftpb.MsgProp: + covered = inRange(mark.Index, m.Entries) + case raftpb.MsgApp: + covered = coveredBy(mark, m.Term, m.Entries) + case raftpb.MsgStorageAppend: + covered = coveredBy(mark, m.LogTerm, m.Entries) + case raftpb.MsgStorageApply: + covered = inRange(mark.Index, m.Entries) + } + if !covered { + return false + } + r.log.Tracef("[raft] message covering mark (t%d,i%d): %s", + mark.Term, mark.Index, + raft.DescribeMessage(m, func(bytes []byte) string { return "" })) + return true +} + +func (r *RaftTracer) traceIfPast(mark raft.LogMark, m raftpb.Message) bool { + if m.Reject { + return false + } + passed := false + switch m.Type { + case raftpb.MsgAppResp: + passed = !mark.After(raft.LogMark{Term: m.Term, Index: m.Index}) + case raftpb.MsgStorageAppendResp: + passed = !mark.After(raft.LogMark{Term: m.LogTerm, Index: m.Index}) + case raftpb.MsgStorageApplyResp: + passed = len(m.Entries) != 0 && m.Entries[len(m.Entries)-1].Index >= mark.Index + } + if !passed { + return false + } + r.log.Tracef("[raft] message past mark (t%d,i%d): %s", + mark.Term, mark.Index, + raft.DescribeMessage(m, func(bytes []byte) string { return "" })) + return true +} + +func coveredBy(mark raft.LogMark, term uint64, entries []raftpb.Entry) bool { + return term == mark.Term && inRange(mark.Index, entries) +} + +func inRange(index uint64, entries []raftpb.Entry) bool { + return len(entries) != 0 && + entries[0].Index <= index && index <= entries[len(entries)-1].Index +} + +func (r *RaftTracer) logMark() raft.LogMark { + r.mu.RLock() + defer r.mu.RUnlock() + return r.mark +} diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 1543eca3bffa..6582f46897ba 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" @@ -757,6 +758,7 @@ type Replica struct { // // TODO(erikgrinaker): make this never be nil. internalRaftGroup *raft.RawNode + raftTracer *rafttrace.RaftTracer // The ID of the leader replica within the Raft group. NB: this is updated // in a separate critical section from the Raft group, and can therefore diff --git a/pkg/kv/kvserver/replica_destroy.go b/pkg/kv/kvserver/replica_destroy.go index 553b5e012fd7..b4b99ffb0eda 100644 --- a/pkg/kv/kvserver/replica_destroy.go +++ b/pkg/kv/kvserver/replica_destroy.go @@ -181,4 +181,5 @@ func (r *Replica) disconnectReplicationRaftMuLocked(ctx context.Context) { log.Fatalf(ctx, "removing raft group before destroying replica %s", r) } r.mu.internalRaftGroup = nil + r.mu.raftTracer = nil } diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 199ad8f9d13e..929446062aad 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/raft" @@ -308,19 +309,21 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked(s kvstorage.LoadedReplicaState // replica, replacing the existing Raft group if any. func (r *Replica) initRaftGroupRaftMuLockedReplicaMuLocked() error { ctx := r.AnnotateCtx(context.Background()) + logger := &raftLogger{ctx: ctx} rg, err := raft.NewRawNode(newRaftConfig( ctx, (*replicaRaftStorage)(r), raftpb.PeerID(r.replicaID), r.shMu.state.RaftAppliedIndex, r.store.cfg, - &raftLogger{ctx: ctx}, + logger, (*replicaRLockedStoreLiveness)(r), )) if err != nil { return err } r.mu.internalRaftGroup = rg + r.mu.raftTracer = rafttrace.New(logger) r.flowControlV2.InitRaftLocked(ctx, replica_rac2.NewRaftNode(rg, (*replicaForRACv2)(r))) return nil } diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index 96358f99bade..df90b0955c74 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/leases" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace" "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -126,6 +127,7 @@ type singleBatchProposer interface { getReplicaID() roachpb.ReplicaID flowControlHandle(ctx context.Context) kvflowcontrol.Handle onErrProposalDropped([]raftpb.Entry, []*ProposalData, raft.StateType) + getRaftTracer() *rafttrace.RaftTracer } // A proposer is an object that uses a propBuf to coordinate Raft proposals. @@ -164,6 +166,7 @@ type proposer interface { // proposerRaft abstracts the propBuf's dependency on *raft.RawNode, to help // testing. type proposerRaft interface { + Term() uint64 Step(raftpb.Message) error Status() raft.Status BasicStatus() raft.BasicStatus @@ -887,6 +890,15 @@ func proposeBatch( // API where it populates the index and term for the passed in // slice of entries. See etcd-io/raft#57. maybeDeductFlowTokens(ctx, p.flowControlHandle(ctx), handles, ents) + + const index = uint64(50) + if ents[0].Index <= index && ents[len(ents)-1].Index >= index { + offset := index - ents[0].Index + p.getRaftTracer().MaybeRegisterProposal(raft.LogMark{ + Term: raftGroup.Term(), + Index: index, + }, string(props[offset].idKey)) + } } return err } @@ -1194,6 +1206,10 @@ func (rp *replicaProposer) onErrProposalDropped( } } +func (rp *replicaProposer) getRaftTracer() *rafttrace.RaftTracer { + return rp.mu.raftTracer +} + func (rp *replicaProposer) leaseDebugRLocked() string { return rp.shMu.state.Lease.String() } diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index 7b94a3165b7b..3a6f08be043f 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/leases" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil" "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" @@ -94,6 +95,10 @@ type testProposerRaft struct { var _ proposerRaft = &testProposerRaft{} +func (t *testProposerRaft) Term() uint64 { + return 0 +} + func (t *testProposerRaft) Step(msg raftpb.Message) error { if msg.Type != raftpb.MsgProp { return nil @@ -153,6 +158,10 @@ func (t *testProposer) getReplicaID() roachpb.ReplicaID { return 1 } +func (rp *testProposer) getRaftTracer() *rafttrace.RaftTracer { + return nil +} + func (t *testProposer) getReplicaDesc() roachpb.ReplicaDescriptor { return roachpb.ReplicaDescriptor{StoreID: t.getStoreID(), ReplicaID: t.getReplicaID()} } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 8e4a387f2b81..860a1f1ef226 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -600,6 +600,11 @@ func (r *Replica) stepRaftGroupRaftMuLocked(req *kvserverpb.RaftMessageRequest) r.raftMu.AssertHeld() var sideChannelInfo replica_rac2.SideChannelInfoUsingRaftMessageRequest err := r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) { + if mark := req.TraceMark; mark != nil { + r.mu.raftTracer.MaybeRegister(raft.LogMark{Term: mark.Term, Index: mark.Index}) + } + r.mu.raftTracer.MaybeTrace(req.Message) + // We're processing an incoming raft message (from a batch that may // include MsgVotes), so don't campaign if we wake up our raft // group. @@ -1136,6 +1141,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( } } + r.mu.raftTracer.MaybeTrace(msgStorageAppend) if state, err = s.StoreEntries(ctx, state, app, cb, &stats.append); err != nil { return stats, errors.Wrap(err, "while storing log entries") } @@ -1167,6 +1173,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( stats.tApplicationBegin = timeutil.Now() if hasMsg(msgStorageApply) { + r.mu.raftTracer.MaybeTrace(msgStorageApply) r.traceEntries(msgStorageApply.Entries, "committed, before applying any entries") err := appTask.ApplyCommittedEntries(ctx) @@ -1890,6 +1897,7 @@ func (r *Replica) deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked( } for i, m := range localMsgs { + r.mu.raftTracer.MaybeTrace(m) if err := raftGroup.Step(m); err != nil { log.Fatalf(ctx, "unexpected error stepping local raft message [%s]: %v", raft.DescribeMessage(m, raftEntryFormatter), err) @@ -1913,6 +1921,8 @@ func (r *Replica) sendRaftMessage( lastToReplica, lastFromReplica := r.getLastReplicaDescriptors() r.mu.RLock() + mark, trace := r.mu.raftTracer.MaybeTrace(msg) + fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.From), lastToReplica) toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.To), lastFromReplica) var startKey roachpb.RKey @@ -1966,6 +1976,9 @@ func (r *Replica) sendRaftMessage( UsingRac2Protocol: r.flowControlV2.GetEnabledWhenLeader() >= kvflowcontrol.V2EnabledWhenLeaderV1Encoding, LowPriorityOverride: lowPriorityOverride, } + if trace { + req.TraceMark = &kvserverpb.LogMark{Term: mark.Term, Index: mark.Index} + } // For RACv2, annotate successful MsgAppResp messages with the vector of // admitted log indices, by priority. if msg.Type == raftpb.MsgAppResp && !msg.Reject { diff --git a/pkg/testutils/lint/passes/fmtsafe/functions.go b/pkg/testutils/lint/passes/fmtsafe/functions.go index 3cb14f60029d..ebf22914af81 100644 --- a/pkg/testutils/lint/passes/fmtsafe/functions.go +++ b/pkg/testutils/lint/passes/fmtsafe/functions.go @@ -113,6 +113,7 @@ var requireConstFmt = map[string]bool{ "(*main.operationImpl).Fatalf": true, "(*github.com/cockroachdb/cockroach/pkg/cmd/roachtest.operationImpl).Fatalf": true, + "(*github.com/cockroachdb/cockroach/pkg/kv/kvserver.raftLogger).Tracef": true, "(*github.com/cockroachdb/cockroach/pkg/kv/kvserver.raftLogger).Debugf": true, "(*github.com/cockroachdb/cockroach/pkg/kv/kvserver.raftLogger).Infof": true, "(*github.com/cockroachdb/cockroach/pkg/kv/kvserver.raftLogger).Warningf": true,