diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index b5a3ebb04c14..d359c5f55b7b 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1510,6 +1510,7 @@ GO_TARGETS = [ "//pkg/kv/kvserver/raftentry:raftentry_test", "//pkg/kv/kvserver/raftlog:raftlog", "//pkg/kv/kvserver/raftlog:raftlog_test", + "//pkg/kv/kvserver/rafttrace:rafttrace", "//pkg/kv/kvserver/raftutil:raftutil", "//pkg/kv/kvserver/raftutil:raftutil_test", "//pkg/kv/kvserver/rangefeed:rangefeed", diff --git a/pkg/cmd/roachtest/tests/admission_control_latency.go b/pkg/cmd/roachtest/tests/admission_control_latency.go index 7793bc158aab..036ce4dcf7e5 100644 --- a/pkg/cmd/roachtest/tests/admission_control_latency.go +++ b/pkg/cmd/roachtest/tests/admission_control_latency.go @@ -750,6 +750,11 @@ func (v variations) runTest(ctx context.Context, t test.Test, c cluster.Cluster) `SET CLUSTER SETTING kv.lease.reject_on_leader_unknown.enabled = true`); err != nil { t.Fatal(err) } + // Enable raft tracing. Remove this once raft tracing is the default. + if _, err := db.ExecContext(ctx, + `SET CLUSTER SETTING kv.raft.max_concurrent_traces = '10'`); err != nil { + t.Fatal(err) + } // This isn't strictly necessary, but it would be nice if this test passed at 10s (or lower). if _, err := db.ExecContext(ctx, `SET CLUSTER SETTING server.time_after_store_suspect = '10s'`); err != nil { diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 7b0aefce0c94..1941fdae1db3 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -162,6 +162,7 @@ go_library( "//pkg/kv/kvserver/multiqueue", "//pkg/kv/kvserver/raftentry", "//pkg/kv/kvserver/raftlog", + "//pkg/kv/kvserver/rafttrace", "//pkg/kv/kvserver/rangefeed", "//pkg/kv/kvserver/rditer", "//pkg/kv/kvserver/readsummary", @@ -437,6 +438,7 @@ go_test( "//pkg/kv/kvserver/protectedts/ptutil", "//pkg/kv/kvserver/raftentry", "//pkg/kv/kvserver/raftlog", + "//pkg/kv/kvserver/rafttrace", "//pkg/kv/kvserver/raftutil", "//pkg/kv/kvserver/rangefeed", "//pkg/kv/kvserver/rditer", diff --git a/pkg/kv/kvserver/client_raft_log_queue_test.go b/pkg/kv/kvserver/client_raft_log_queue_test.go index 9a466877c684..2c189e3cb3d8 100644 --- a/pkg/kv/kvserver/client_raft_log_queue_test.go +++ b/pkg/kv/kvserver/client_raft_log_queue_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -33,6 +34,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/vfs" "github.com/gogo/protobuf/proto" @@ -132,6 +135,64 @@ func TestRaftLogQueue(t *testing.T) { } } +func TestRaftTracing(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // TODO(baptist): Remove this once we change the default to be enabled. + st := cluster.MakeTestingClusterSettings() + rafttrace.MaxConcurrentRaftTraces.Override(context.Background(), &st.SV, 10) + + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: st, + RaftConfig: base.RaftConfig{ + RangeLeaseDuration: 24 * time.Hour, // disable lease moves + RaftElectionTimeoutTicks: 1 << 30, // disable elections + }, + }, + }) + defer tc.Stopper().Stop(context.Background()) + store := tc.GetFirstStoreFromServer(t, 0) + + // Write a single value to ensure we have a leader on n1. + key := tc.ScratchRange(t) + _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), putArgs(key, []byte("value"))) + require.NoError(t, pErr.GoError()) + require.NoError(t, tc.WaitForSplitAndInitialization(key)) + // Set to have 3 voters. + tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...) + tc.WaitForVotersOrFatal(t, key, tc.Targets(1, 2)...) + + for i := 0; i < 100; i++ { + var finish func() tracingpb.Recording + ctx := context.Background() + if i == 50 { + // Trace a random request on a "client" tracer. + ctx, finish = tracing.ContextWithRecordingSpan(ctx, tracing.NewTracer(), "test") + } + _, pErr := kv.SendWrapped(ctx, store.TestSender(), putArgs(key, []byte(fmt.Sprintf("value-%d", i)))) + require.NoError(t, pErr.GoError()) + // Note that this is the clients span, there may be additional logs created after the span is returned. + if finish != nil { + // NB: It is hard to get all the messages in an expected order. We + // simply ensure some of the key messages are returned. Also note + // that we want to make sure that the logs are not reported against + // the tracing library, but the line that called into it. + expectedMessages := []string{ + `replica_proposal_buf.* flushing proposal to Raft`, + `replica_proposal_buf.* registering local trace`, + `replica_raft.* 1->2 MsgApp`, + `replica_raft.* 1->3 MsgApp`, + `replica_raft.* AppendThread->1 MsgStorageAppendResp`, + `ack-ing replication success to the client`, + } + require.NoError(t, testutils.MatchInOrder(finish().String(), expectedMessages...)) + } + } +} + // TestCrashWhileTruncatingSideloadedEntries emulates a process crash in the // middle of applying a raft log truncation command that removes some entries // from the sideloaded storage. The test expects that storage remains in a diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index a096ac9bb917..916c57a43bcc 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -48,6 +48,16 @@ message RaftHeartbeat { bool lagging_followers_on_quiesce_accurate = 10; } +// The traced entry from the leader along with the trace and span ID. +message TracedEntry { + uint64 index = 1 [(gogoproto.nullable) = false, + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/kv/kvpb.RaftIndex"]; + uint64 trace_id = 2 [(gogoproto.nullable) = false, (gogoproto.customname) = "TraceID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.TraceID"]; + uint64 span_id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "SpanID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.SpanID"]; +} + // RaftMessageRequest is the request used to send raft messages using our // protobuf-based RPC codec. If a RaftMessageRequest has a non-empty number of // heartbeats or heartbeat_resps, the contents of the message field is treated @@ -103,6 +113,12 @@ message RaftMessageRequest { // indices. Used only with RACv2. kv.kvserver.kvflowcontrol.kvflowcontrolpb.AdmittedState admitted_state = 14 [(gogoproto.nullable) = false]; + // TracedEntry is a mapping from Raft index to trace and span ids for this + // request. They are set by the leader and will begin tracing on the + // follower. Note that traces are not currently sent back to the leader, but + // instead logged locally. + repeated TracedEntry traced_entries = 15 [(gogoproto.nullable) = false]; + reserved 10; } diff --git a/pkg/kv/kvserver/raft.go b/pkg/kv/kvserver/raft.go index 553cf26e4098..4f1e311004ca 100644 --- a/pkg/kv/kvserver/raft.go +++ b/pkg/kv/kvserver/raft.go @@ -267,7 +267,7 @@ func traceProposals(r *Replica, ids []kvserverbase.CmdIDKey, event string) { r.mu.RLock() for _, id := range ids { if prop, ok := r.mu.proposals[id]; ok { - ctxs = append(ctxs, prop.ctx) + ctxs = append(ctxs, prop.Context()) } } r.mu.RUnlock() diff --git a/pkg/kv/kvserver/rafttrace/BUILD.bazel b/pkg/kv/kvserver/rafttrace/BUILD.bazel new file mode 100644 index 000000000000..4def820184a1 --- /dev/null +++ b/pkg/kv/kvserver/rafttrace/BUILD.bazel @@ -0,0 +1,22 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "rafttrace", + srcs = ["rafttrace.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv/kvpb", + "//pkg/kv/kvserver/kvserverpb", + "//pkg/raft", + "//pkg/raft/raftpb", + "//pkg/settings", + "//pkg/settings/cluster", + "//pkg/util/log", + "//pkg/util/syncutil", + "//pkg/util/tracing", + "//pkg/util/tracing/tracingpb", + "@com_github_cockroachdb_logtags//:logtags", + "@com_github_cockroachdb_redact//:redact", + ], +) diff --git a/pkg/kv/kvserver/rafttrace/rafttrace.go b/pkg/kv/kvserver/rafttrace/rafttrace.go new file mode 100644 index 000000000000..e280ca4e7366 --- /dev/null +++ b/pkg/kv/kvserver/rafttrace/rafttrace.go @@ -0,0 +1,348 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package rafttrace + +import ( + "context" + "sync/atomic" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/raft" + "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + "github.com/cockroachdb/logtags" + "github.com/cockroachdb/redact" +) + +// traceValue represents the trace information for a single registration. +type traceValue struct { + traced kvserverpb.TracedEntry + // ctx is a trace specific context used to log events on this trace. + ctx context.Context + // baseCtx is the underlying proposal buffer which we additionally log to. + // We can't store the underlying context directly because it can be modified + // by the proposal buffer once it is finished from the user's perspective. + baseCtx withContext + + mu struct { + syncutil.Mutex + // appRespSent and storageAppendRespSent track whether range messages have + // already been logged to log at most once. This limits the log from growing + // too large at a small risk of missing some messages in the case of dropped + // messages or reproposals. + appRespSent, storageAppendRespSent map[raftpb.PeerID]bool + } +} + +// logf logs the message to the trace context and the base context. The base +// context is populated on the leaseholder and is attached to the SQL trace. +func (tv *traceValue) logf(depth int, format string, args ...interface{}) { + log.InfofDepth(tv.ctx, depth+1, format, args...) + if tv.baseCtx != nil { + if ctx := tv.baseCtx.Context(); ctx != nil { + log.VEventfDepth(ctx, depth+1, 3, format, args...) + } + } +} + +// String attempts to balance uniqueness with readability by only keeping the +// lower 16 bits of the trace and span. +func (tv *traceValue) String() string { + return redact.StringWithoutMarkers(tv) +} + +func (tv *traceValue) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("i%d/%x.%x", tv.traced.Index, uint16(tv.traced.TraceID), uint16(tv.traced.SpanID)) +} + +// RaftTracer is a utility to trace the lifetime of raft log entries. It may +// include some unrelated entries, since it does not consider the term. It +// traces at most one MsgAppResp and MsgStorageAppendResp per index which is the +// first one that is past our index entry. This limitation means it may not +// capture all the relevant messages particularly if the term changes. +// +// The library will log in two different ways once to the standard cockroach log +// and once to the SQL trace on the leader. +// TODO(baptist): Look at logging traces on followers as well and sending back +// to the leader in the MsgAppResp. It would need to be best effort, but might +// still be useful. +type RaftTracer struct { + // m is a map of all the currently traced entries for this replica. The + // aggregate size of the map is equal to or less than numRegistered. We add + // to numRegistered before we update m, and delete from m before we remove + // from numRegistered to keep this invariant. + m syncutil.Map[kvpb.RaftIndex, traceValue] + // numRegistered is the number of currently registered traces for this + // store, not this replica. The number of registered will always be less + // than the MaxConcurrentRaftTraces setting. If the setting is lowered, we + // flush all traces on all replicas. + numRegistered *atomic.Int64 + // This is the ambient context for the replica and is used for remote + // traces. It contains the replica/range information. On each trace we + // additionally append the unique trace/span IDs. + ctx context.Context + st *cluster.Settings +} + +// NewRaftTracer creates a new RaftTracer with the given ambient context for the +// replica. +func NewRaftTracer( + ctx context.Context, st *cluster.Settings, numRegistered *atomic.Int64, +) *RaftTracer { + return &RaftTracer{ctx: ctx, st: st, numRegistered: numRegistered} +} + +// MaxConcurrentRaftTraces is the maximum number of entries that can be traced +// at any time on this store. Additional traces will be ignored until the number +// of traces drops below the limit. Having too many active traces can negatively +// impact performance as we iterate over all of them for some messages. 10 is a +// reasonable default that balances usefulness with performance impact. It isn't +// expected that this limit will normally be hit. +var MaxConcurrentRaftTraces = settings.RegisterIntSetting( + settings.SystemVisible, + "kv.raft.max_concurrent_traces", + "the maximum number of tracked raft traces", + 0, + settings.NonNegativeInt, +) + +// maybeRegister checks if should register a new trace. If there are too many +// registered traces it will not register and return false. The invariant is +// that numRegistered <= numAllowed. This method will return true if we can +// keep the invariant and added one to the number registered, otherwise it will +// return false. +func (r *RaftTracer) maybeRegister() bool { + numAllowed := MaxConcurrentRaftTraces.Get(&r.st.SV) + numRegistered := r.numRegistered.Load() + + // The maximum number of traces has been reached. We don't register this + // trace and return false. + if numRegistered == numAllowed { + return false + } + + // This can only happen if numAllowed has changed. If this happens flush all + // our current traces and don't register this request. + if numAllowed == 0 { + r.FlushAll() + return false + } + + // Only increment the number of registered traces if the numRegistered + // hasn't changed. In the case of an ABA update, it does not break the + // invariant since some other trace was registered and deregistered, but + // there is still a slot available. + return r.numRegistered.CompareAndSwap(numRegistered, numRegistered+1) +} + +func (r *RaftTracer) storeEntry(te kvserverpb.TracedEntry, baseCtx withContext) *traceValue { + tv := traceValue{ + traced: te, + baseCtx: baseCtx, + } + tv.ctx = logtags.AddTag(r.ctx, "id", redact.Safe(tv.String())) + tv.mu.appRespSent = make(map[raftpb.PeerID]bool) + tv.mu.storageAppendRespSent = make(map[raftpb.PeerID]bool) + r.m.Store(te.Index, &tv) + return &tv +} + +// RegisterRemote is used to register a remote trace. This is used when we +// receive a raft message over the wire with a request to continue tracing it. +func (r *RaftTracer) RegisterRemote(te kvserverpb.TracedEntry) { + if !r.maybeRegister() { + return + } + // NB: We don't currently return remote traces, if we did, we would pass the + // remote ctx here and trace it. The problem is knowing when to send it + // back to the remote node. + tv := r.storeEntry(te, nil) + tv.logf(1, "registering remote trace %s", tv) +} + +// withContext allows us to get the context from the object. +type withContext interface { + Context() context.Context +} + +// Register is called on an entry that is about to be proposed. This will begin +// logging all subsequent updates to this entry. +func (r *RaftTracer) Register(baseCtx withContext, ent raftpb.Entry) { + // Only register if there is a trace in the context and it is set to verbose + // logging. + span := tracing.SpanFromContext(baseCtx.Context()) + if span == nil || span.RecordingType() != tracingpb.RecordingVerbose { + return + } + + // If the index is nil, then we can't trace this entry. This can happen if + // there is a leader/leaseholder spilt. We don't have an easy way to handle + // this today, so don't attempt to trace it. + if ent.Index == 0 { + log.VEventf(baseCtx.Context(), 2, "skip registering raft proposal without index: %v", ent) + return + } + + // This must be the last conditional. If this returns true we must call + // storeEntry to not leak a registered permit. + if !r.maybeRegister() { + log.VEvent(baseCtx.Context(), 2, "too many active raft traces, skipping") + return + } + + // Grab the trace and span id now because the baseCtx.Context may be + // nil'ed later. + tv := r.storeEntry( + kvserverpb.TracedEntry{ + Index: kvpb.RaftIndex(ent.Index), + TraceID: span.TraceID(), + SpanID: span.SpanID(), + }, + baseCtx, + ) + tv.logf(1, "registering local trace %s", tv) +} + +// MaybeTrace will log the message if it is covered by a trace. +func (r *RaftTracer) MaybeTrace(m raftpb.Message) []kvserverpb.TracedEntry { + // NB: This check is an optimization to handle the common case where there + // are no registered traces on the store. + if r.numRegistered.Load() == 0 { + return nil + } + + switch m.Type { + case raftpb.MsgProp, raftpb.MsgApp, raftpb.MsgStorageAppend, raftpb.MsgStorageApply: + return r.traceIfCovered(m) + case raftpb.MsgAppResp, raftpb.MsgStorageAppendResp, raftpb.MsgStorageApplyResp: + r.traceIfPast(m) + return nil + } + return nil +} + +// FlushAll will unregister all the currently active traces. It is safe to call +// multiple times, but should always be called when the replica is destroyed. +func (r *RaftTracer) FlushAll() { + r.m.Range(func(index kvpb.RaftIndex, t *traceValue) bool { + r.m.Delete(index) + r.numRegistered.Add(-1) + t.logf(2, "cleanup log index %d during close", index) + return true + }) +} + +func peer(p raftpb.PeerID) redact.SafeString { + return redact.SafeString(raft.DescribeTarget(p)) +} + +// traceIfCovered will log the message if it touches any of the registered trace +// points. Additionally it returns any saved contexts by index for sending to +// remote nodes. This typically applies to messages that the leader sends to the +// followers. +func (r *RaftTracer) traceIfCovered(m raftpb.Message) []kvserverpb.TracedEntry { + if len(m.Entries) == 0 { + return nil + } + minEntryIndex := kvpb.RaftIndex(m.Entries[0].Index) + maxEntryIndex := kvpb.RaftIndex(m.Entries[len(m.Entries)-1].Index) + var tracedEntries []kvserverpb.TracedEntry + r.m.Range(func(index kvpb.RaftIndex, t *traceValue) bool { + // If the traced index is not in the range of the entries, we can skip + // it. We don't need to check each individual entry since they are + // contiguous. + if t.traced.Index < minEntryIndex || t.traced.Index > maxEntryIndex { + return true + } + tracedEntries = append(tracedEntries, t.traced) + // TODO(baptist): Not all the fields are relevant to log for all + // message types. Consider cleaning up what is logged. + t.logf(4, + "%s->%s %v Term:%d Log:%d/%d Range:%d-%d", + peer(m.From), + peer(m.To), + m.Type, + m.Term, + m.LogTerm, + m.Index, + minEntryIndex, + maxEntryIndex, + ) + return true + }) + return tracedEntries +} + +// traceIfPast will log the message the message is past any registered tracing +// points. It will additionally unregister traces that are no longer useful. +// This call is for events that move the needle/watermark forward (e.g. the log +// storage syncs), but don't have an exact range of entries affected. So, being +// unable to match these events to entries exactly once, we instead check that +// the watermark passed the entry. To protect against overly verbose logging, we +// only allow MsgAppResp and MsgStorageAppendResp to be logged once per trace. +func (r *RaftTracer) traceIfPast(m raftpb.Message) { + if m.Reject { + return + } + r.m.Range(func(index kvpb.RaftIndex, t *traceValue) bool { + t.mu.Lock() + defer t.mu.Unlock() + switch m.Type { + case raftpb.MsgAppResp: + if kvpb.RaftIndex(m.Index) >= index && !t.mu.appRespSent[m.From] { + t.mu.appRespSent[m.From] = true + t.logf(4, + "%s->%s %v Term:%d Index:%d", + peer(m.From), + peer(m.To), + m.Type, + m.Term, + m.Index, + ) + } + case raftpb.MsgStorageAppendResp: + if kvpb.RaftIndex(m.Index) >= index && !t.mu.storageAppendRespSent[m.From] { + t.mu.storageAppendRespSent[m.From] = true + t.logf(4, + "%s->%s %v Log:%d/%d", + peer(m.From), + peer(m.To), + m.Type, + m.LogTerm, + m.Index, + ) + } + case raftpb.MsgStorageApplyResp: + if len(m.Entries) == 0 { + return true + } + // Use the last entry to determine if we should log this message. + msgIndex := m.Entries[len(m.Entries)-1].Index + if kvpb.RaftIndex(msgIndex) >= index { + t.logf(4, + "%s->%s %v Term:%d Index:%d", + peer(m.From), + peer(m.To), + m.Type, + m.Entries[len(m.Entries)-1].Term, + msgIndex, + ) + // We unregister the index here because we are now "done" with + // this entry and don't expect more useful events. + r.m.Delete(index) + r.numRegistered.Add(-1) + t.logf(4, "unregistered log index %d from tracing", index) + } + } + return true + }) +} diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 1eb4a3369dfc..0b2fbc22408d 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" @@ -891,6 +892,10 @@ type Replica struct { // MsgAppPull <=> LazyReplication. // Updated with both raftMu and mu held. currentRACv2Mode rac2.RaftMsgAppMode + + // raftTracer is used to trace raft messages that are sent with a + // tracing context. + raftTracer rafttrace.RaftTracer } // The raft log truncations that are pending. Access is protected by its own diff --git a/pkg/kv/kvserver/replica_application_decoder.go b/pkg/kv/kvserver/replica_application_decoder.go index 3586137f7c7b..5b9cdf49cb5b 100644 --- a/pkg/kv/kvserver/replica_application_decoder.go +++ b/pkg/kv/kvserver/replica_application_decoder.go @@ -145,7 +145,7 @@ func (d *replicaDecoder) createTracingSpans(ctx context.Context) { propCtx := ctx // raft scheduler's ctx var propSp *tracing.Span // If the client has a trace, put a child into propCtx. - if sp := tracing.SpanFromContext(cmd.proposal.ctx); sp != nil { + if sp := tracing.SpanFromContext(cmd.proposal.Context()); sp != nil { propCtx, propSp = sp.Tracer().StartSpanCtx( propCtx, "local proposal", tracing.WithParent(sp), ) diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index 4f85db924d3c..998f7258d3a5 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -328,7 +328,6 @@ func (r *Replica) makeReproposal(origP *ProposalData) (reproposal *ProposalData, // span "follows from" the proposal's span, if the proposal sticks around // for (some reincarnation of) the command to eventually apply, its trace // will reflect the reproposal as well. - ctx: origP.ctx, idKey: raftlog.MakeCmdIDKey(), proposedAtTicks: 0, // set in registerProposalLocked createdAtTicks: 0, // set in registerProposalLocked @@ -364,6 +363,8 @@ func (r *Replica) makeReproposal(origP *ProposalData) (reproposal *ProposalData, seedProposal: seedP, } + origCtx := origP.Context() + newProposal.ctx.Store(&origCtx) return newProposal, func() { // If the original proposal had an explicit span, it's an async consensus @@ -394,7 +395,8 @@ func (r *Replica) makeReproposal(origP *ProposalData) (reproposal *ProposalData, // // TODO(radu): Should this context be created via tracer.ForkSpan? // We'd need to make sure the span is finished eventually. - origP.ctx = r.AnnotateCtx(context.TODO()) + ctx := r.AnnotateCtx(context.TODO()) + origP.ctx.Store(&ctx) seedP.lastReproposal = newProposal } } diff --git a/pkg/kv/kvserver/replica_application_result_test.go b/pkg/kv/kvserver/replica_application_result_test.go index c5f2dfc996b4..51a83b9a50bd 100644 --- a/pkg/kv/kvserver/replica_application_result_test.go +++ b/pkg/kv/kvserver/replica_application_result_test.go @@ -37,8 +37,7 @@ func makeProposalData() *ProposalData { AdmissionOriginNode: 1, } - return &ProposalData{ - ctx: context.WithValue(context.Background(), struct{}{}, "nonempty-ctx"), + prop := ProposalData{ sp: &tracing.Span{}, idKey: "deadbeef", proposedAtTicks: 1, @@ -58,6 +57,9 @@ func makeProposalData() *ProposalData { seedProposal: nil, lastReproposal: nil, } + ctx := context.WithValue(context.Background(), struct{}{}, "nonempty-ctx") + prop.ctx.Store(&ctx) + return &prop } func TestProposalDataAndRaftCommandAreConsideredWhenAddingFields(t *testing.T) { @@ -73,8 +75,8 @@ func TestProposalDataAndRaftCommandAreConsideredWhenAddingFields(t *testing.T) { // NB: we can't use zerofields for two reasons: First, we have unexported fields // here, and second, we don't want to check for recursively populated structs (but // only for the top level fields). - require.Equal(t, 10, reflect.TypeOf(*prop.command).NumField()) - require.Equal(t, 19, reflect.TypeOf(*prop).NumField()) + require.Equal(t, 10, reflect.Indirect(reflect.ValueOf(prop.command)).NumField()) + require.Equal(t, 19, reflect.Indirect(reflect.ValueOf(prop)).NumField()) } func TestReplicaMakeReproposalChaininig(t *testing.T) { @@ -84,7 +86,7 @@ func TestReplicaMakeReproposalChaininig(t *testing.T) { var r Replica proposals := make([]*ProposalData, 1, 4) proposals[0] = makeProposalData() - sharedCtx := proposals[0].ctx + sharedCtx := proposals[0].Context() verify := func() { seed := proposals[0] @@ -102,9 +104,9 @@ func TestReplicaMakeReproposalChaininig(t *testing.T) { } // Only the latest reproposal must use the seed context. for _, prop := range proposals[:len(proposals)-1] { - require.NotEqual(t, sharedCtx, prop.ctx) + require.NotEqual(t, sharedCtx, prop.Context()) } - require.Equal(t, sharedCtx, proposals[len(proposals)-1].ctx) + require.Equal(t, sharedCtx, proposals[len(proposals)-1].Context()) } verify() diff --git a/pkg/kv/kvserver/replica_destroy.go b/pkg/kv/kvserver/replica_destroy.go index 553b5e012fd7..e5ae538b049b 100644 --- a/pkg/kv/kvserver/replica_destroy.go +++ b/pkg/kv/kvserver/replica_destroy.go @@ -181,4 +181,5 @@ func (r *Replica) disconnectReplicationRaftMuLocked(ctx context.Context) { log.Fatalf(ctx, "removing raft group before destroying replica %s", r) } r.mu.internalRaftGroup = nil + r.mu.raftTracer.FlushAll() } diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 1dafec262910..2cd49459cb49 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/raft" @@ -154,7 +155,7 @@ func newUninitializedReplicaWithoutRaftGroup( } // Expose proposal data for external test packages. return store.cfg.TestingKnobs.TestingProposalSubmitFilter(kvserverbase.ProposalFilterArgs{ - Ctx: p.ctx, + Ctx: p.Context(), RangeID: rangeID, StoreID: store.StoreID(), ReplicaID: replicaID, @@ -329,6 +330,7 @@ func (r *Replica) initRaftGroupRaftMuLockedReplicaMuLocked() error { return err } r.mu.internalRaftGroup = rg + r.mu.raftTracer = *rafttrace.NewRaftTracer(ctx, r.ClusterSettings(), &r.store.concurrentRaftTraces) r.flowControlV2.InitRaftLocked( ctx, replica_rac2.NewRaftNode(rg, (*replicaForRACv2)(r)), rg.LogMark()) return nil diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 20881c02b945..5b5b8bc1eb0b 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -9,6 +9,7 @@ import ( "context" "os" "path/filepath" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/keys" @@ -116,7 +117,15 @@ type ProposalData struct { // that during command application one should always use `replicatedCmd.ctx` // for best coverage. `p.ctx` should be used when a `replicatedCmd` is not in // scope, i.e. outside of raft command application. - ctx context.Context + // + // The context may by updated during the proposal lifecycle but will never + // be nil. To clear out the context, set it to context.Background(). It is + // protected by an atomic pointer because it can be read without holding the + // raftMu use ProposalData.Context() to read it. + // + // TODO(baptist): Track down all the places where we read and write ctx and + // determine whether we can convert this back to non-atomic field. + ctx atomic.Pointer[context.Context] // An optional tracing span bound to the proposal in the case of async // consensus (it will be referenced by p.ctx). We need to finish this span @@ -216,6 +225,12 @@ type ProposalData struct { lastReproposal *ProposalData } +// Context returns the context associated with the proposal. The context may +// change during the lifetime of the proposal. +func (proposal *ProposalData) Context() context.Context { + return *proposal.ctx.Load() +} + // useReplicationAdmissionControl indicates whether this raft command should // be subject to replication admission control. func (proposal *ProposalData) useReplicationAdmissionControl() bool { @@ -270,7 +285,8 @@ func (proposal *ProposalData) signalProposalResult(pr proposalResult) { // // NB: `proposal.ec.repl` might already have been cleared if we arrive here // through finishApplication. - proposal.ctx = context.Background() + ctx := context.Background() + proposal.ctx.Store(&ctx) } } @@ -1050,13 +1066,13 @@ func (r *Replica) requestToProposal( // Fill out the results even if pErr != nil; we'll return the error below. proposal := &ProposalData{ - ctx: ctx, idKey: idKey, doneCh: make(chan proposalResult, 1), Local: &res.Local, Request: ba, leaseStatus: *st, } + proposal.ctx.Store(&ctx) if needConsensus { proposal.command = &kvserverpb.RaftCommand{ diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index 633568fa7b5e..16f29aa46b64 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -126,6 +126,7 @@ type singleBatchProposer interface { getReplicaID() roachpb.ReplicaID flowControlHandle(ctx context.Context) kvflowcontrol.Handle onErrProposalDropped([]raftpb.Entry, []*ProposalData, raftpb.StateType) + registerForTracing(*ProposalData, raftpb.Entry) } // A proposer is an object that uses a propBuf to coordinate Raft proposals. @@ -255,7 +256,7 @@ func (b *propBuf) Insert(ctx context.Context, p *ProposalData, tok TrackedReques } if log.V(4) { - log.Infof(p.ctx, "submitting proposal %x", p.idKey) + log.Infof(p.Context(), "submitting proposal %x", p.idKey) } // Insert the proposal into the buffer's array. The buffer now takes ownership @@ -571,7 +572,7 @@ func (b *propBuf) FlushLockedWithRaftGroup( Data: p.encodedCommand, }) nextProp++ - log.VEvent(p.ctx, 2, "flushing proposal to Raft") + log.VEvent(p.Context(), 2, "flushing proposal to Raft") // We don't want deduct flow tokens for reproposed commands, and of // course for proposals that didn't integrate with kvflowcontrol. @@ -581,7 +582,7 @@ func (b *propBuf) FlushLockedWithRaftGroup( } else { admitHandles = append(admitHandles, admitEntHandle{ handle: p.raftAdmissionMeta, - pCtx: p.ctx, + pCtx: p.Context(), }) } } @@ -869,26 +870,31 @@ func proposeBatch( // TODO(bdarnell): Handle ErrProposalDropped better. // https://github.com/cockroachdb/cockroach/issues/21849 for _, p := range props { - if p.ctx != nil { - log.Event(p.ctx, "entry dropped") - } + log.Event(p.Context(), "entry dropped") } p.onErrProposalDropped(ents, props, raftGroup.BasicStatus().RaftState) return nil //nolint:returnerrcheck } - if err == nil { - // Now that we know what raft log position[1] this proposal is to end up - // in, deduct flow tokens for it. This is done without blocking (we've - // already waited for available flow tokens pre-evaluation). The tokens - // will later be returned once we're informed of the entry being - // admitted below raft. - // - // [1]: We're relying on an undocumented side effect of upstream raft - // API where it populates the index and term for the passed in - // slice of entries. See etcd-io/raft#57. - maybeDeductFlowTokens(ctx, p.flowControlHandle(ctx), handles, ents) + if err != nil { + return err } - return err + // Now that we know what raft log position[1] this proposal is to end up + // in, deduct flow tokens for it. This is done without blocking (we've + // already waited for available flow tokens pre-evaluation). The tokens + // will later be returned once we're informed of the entry being + // admitted below raft. + // + // [1]: We're relying on an undocumented side effect of upstream raft + // API where it populates the index and term for the passed in + // slice of entries. See etcd-io/raft#57. + maybeDeductFlowTokens(ctx, p.flowControlHandle(ctx), handles, ents) + + // Register the proposal with rafttrace. This will add the trace to the raft + // lifecycle. We trace at most one entry per batch, so pick the last one as + // is the last one to be completed. + lastIndex := len(ents) - 1 + p.registerForTracing(props[lastIndex], ents[lastIndex]) + return nil } func maybeDeductFlowTokens( @@ -1175,6 +1181,10 @@ func (rp *replicaProposer) closedTimestampTarget() hlc.Timestamp { return (*Replica)(rp).closedTimestampTargetRLocked() } +func (rp *replicaProposer) registerForTracing(p *ProposalData, e raftpb.Entry) { + (*Replica)(rp).mu.raftTracer.Register(p, e) +} + func (rp *replicaProposer) withGroupLocked(fn func(raftGroup proposerRaft) error) error { return (*Replica)(rp).withRaftGroupLocked(func(raftGroup *raft.RawNode) (bool, error) { // We're proposing a command here so there is no need to wake the leader diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index bdb47cb3a7eb..93fcc2f9f9f7 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -217,6 +217,8 @@ func (t *testProposer) campaignLocked(ctx context.Context) { } } +func (t *testProposer) registerForTracing(*ProposalData, raftpb.Entry) {} + func (t *testProposer) rejectProposalWithErrLocked(_ context.Context, _ *ProposalData, err error) { if t.onRejectProposalWithErrLocked == nil { panic(fmt.Sprintf("unexpected rejectProposalWithErrLocked call: err=%v", err)) @@ -301,7 +303,6 @@ func (pc proposalCreator) newProposal(ba *kvpb.BatchRequest) *ProposalData { } } p := &ProposalData{ - ctx: context.Background(), idKey: kvserverbase.CmdIDKey("test-cmd"), command: &kvserverpb.RaftCommand{ ReplicatedEvalResult: kvserverpb.ReplicatedEvalResult{ @@ -313,6 +314,8 @@ func (pc proposalCreator) newProposal(ba *kvpb.BatchRequest) *ProposalData { Request: ba, leaseStatus: pc.lease, } + ctx := context.Background() + p.ctx.Store(&ctx) p.encodedCommand = pc.encodeProposal(p) return p } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 7664c454797a..165d2072cdaa 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -123,7 +123,7 @@ func (r *Replica) evalAndPropose( idKey := raftlog.MakeCmdIDKey() proposal, pErr := r.requestToProposal(ctx, idKey, ba, g, st, ui) ba = proposal.Request // may have been updated - log.Event(proposal.ctx, "evaluated request") + log.Event(proposal.Context(), "evaluated request") // If the request hit a server-side concurrency retry error, immediately // propagate the error. Don't assume ownership of the concurrency guard. @@ -168,7 +168,7 @@ func (r *Replica) evalAndPropose( // from this point on. proposal.ec = makeReplicatedEndCmds(r, g, *st, timeutil.Now()) - log.VEventf(proposal.ctx, 2, + log.VEventf(proposal.Context(), 2, "proposing command to write %d new keys, %d new values, %d new intents, "+ "write batch size=%d bytes", proposal.command.ReplicatedEvalResult.Delta.KeyCount, @@ -204,7 +204,9 @@ func (r *Replica) evalAndPropose( // Fork the proposal's context span so that the proposal's context // can outlive the original proposer's context. - proposal.ctx, proposal.sp = tracing.ForkSpan(ctx, "async consensus") + ctx, sp := tracing.ForkSpan(ctx, "async consensus") + proposal.ctx.Store(&ctx) + proposal.sp = sp if proposal.sp != nil { // We can't leak this span if we fail to hand the proposal to the // replication layer, so finish it later in this method if we are to @@ -279,7 +281,7 @@ func (r *Replica) evalAndPropose( "command is too large: %d bytes (max: %d)", quotaSize, maxSize, )) } - log.VEventf(proposal.ctx, 2, "acquiring proposal quota (%d bytes)", quotaSize) + log.VEventf(proposal.Context(), 2, "acquiring proposal quota (%d bytes)", quotaSize) var err error proposal.quotaAlloc, err = r.maybeAcquireProposalQuota(ctx, ba, quotaSize) if err != nil { @@ -349,7 +351,8 @@ func (r *Replica) evalAndPropose( } // TODO(radu): Should this context be created via tracer.ForkSpan? // We'd need to make sure the span is finished eventually. - last.ctx = r.AnnotateCtx(context.TODO()) + ctx := r.AnnotateCtx(context.TODO()) + last.ctx.Store(&ctx) } return proposalCh, abandon, idKey, writeBytes, nil } @@ -396,12 +399,12 @@ func (r *Replica) propose( log.Errorf(ctx, "%v", err) return kvpb.NewError(err) } - log.KvDistribution.Infof(p.ctx, "proposing %s", crt) + log.KvDistribution.Infof(p.Context(), "proposing %s", crt) } else if p.command.ReplicatedEvalResult.AddSSTable != nil { - log.VEvent(p.ctx, 4, "sideloadable proposal detected") + log.VEvent(p.Context(), 4, "sideloadable proposal detected") r.store.metrics.AddSSTableProposals.Inc(1) } else if log.V(4) { - log.Infof(p.ctx, "proposing command %x: %s", p.idKey, p.Request.Summary()) + log.Infof(p.Context(), "proposing command %x: %s", p.idKey, p.Request.Summary()) } raftAdmissionMeta := p.raftAdmissionMeta @@ -430,7 +433,7 @@ func (r *Replica) propose( // Too verbose even for verbose logging, so manually enable if you want to // debug proposal sizes. if false { - log.Infof(p.ctx, `%s: proposal: %d + log.Infof(p.Context(), `%s: proposal: %d RaftCommand.ReplicatedEvalResult: %d RaftCommand.ReplicatedEvalResult.Delta: %d RaftCommand.WriteBatch: %d @@ -447,7 +450,7 @@ func (r *Replica) propose( // TODO(tschottdorf): can we mark them so lightstep can group them? const largeProposalEventThresholdBytes = 2 << 19 // 512kb if ln := len(p.encodedCommand); ln > largeProposalEventThresholdBytes { - log.Eventf(p.ctx, "proposal is large: %s", humanizeutil.IBytes(int64(ln))) + log.Eventf(p.Context(), "proposal is large: %s", humanizeutil.IBytes(int64(ln))) } // Insert into the proposal buffer, which passes the command to Raft to be @@ -456,7 +459,7 @@ func (r *Replica) propose( // // NB: we must not hold r.mu while using the proposal buffer, see comment // on the field. - log.VEvent(p.ctx, 2, "submitting proposal to proposal buffer") + log.VEvent(p.Context(), 2, "submitting proposal to proposal buffer") if err := r.mu.proposalBuf.Insert(ctx, p, tok.Move(ctx)); err != nil { return kvpb.NewError(err) } @@ -635,6 +638,11 @@ func (r *Replica) stepRaftGroupRaftMuLocked(req *kvserverpb.RaftMessageRequest) var sideChannelInfo replica_rac2.SideChannelInfoUsingRaftMessageRequest var admittedVector rac2.AdmittedVector err := r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) { + // If this message requested tracing, begin tracing it. + for _, e := range req.TracedEntries { + r.mu.raftTracer.RegisterRemote(e) + } + r.mu.raftTracer.MaybeTrace(req.Message) // We're processing an incoming raft message (from a batch that may // include MsgVotes), so don't campaign if we wake up our raft // group. @@ -1209,6 +1217,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( } } + r.mu.raftTracer.MaybeTrace(msgStorageAppend) if state, err = s.StoreEntries(ctx, state, app, cb, &stats.append); err != nil { return stats, errors.Wrap(err, "while storing log entries") } @@ -1240,6 +1249,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( stats.tApplicationBegin = timeutil.Now() if hasMsg(msgStorageApply) { + r.mu.raftTracer.MaybeTrace(msgStorageApply) r.traceEntries(msgStorageApply.Entries, "committed, before applying any entries") err := appTask.ApplyCommittedEntries(ctx) @@ -1658,7 +1668,7 @@ func (r *Replica) refreshProposalsLocked( // up here too. if p.command.MaxLeaseIndex <= r.shMu.state.LeaseAppliedIndex { r.cleanupFailedProposalLocked(p) - log.Eventf(p.ctx, "retry proposal %x: %s", p.idKey, reason) + log.Eventf(p.Context(), "retry proposal %x: %s", p.idKey, reason) p.finishApplication(ctx, makeProposalResultErr( kvpb.NewAmbiguousResultErrorf( "unable to determine whether command was applied via snapshot", @@ -1726,7 +1736,7 @@ func (r *Replica) refreshProposalsLocked( // definitely required, however. sort.Sort(reproposals) for _, p := range reproposals { - log.Eventf(p.ctx, "re-submitting command %x (MLI %d, CT %s): %s", + log.Eventf(p.Context(), "re-submitting command %x (MLI %d, CT %s): %s", p.idKey, p.command.MaxLeaseIndex, p.command.ClosedTimestamp, reason) if err := r.mu.proposalBuf.ReinsertLocked(ctx, p); err != nil { r.cleanupFailedProposalLocked(p) @@ -1989,6 +1999,7 @@ func (r *Replica) deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked( } for i, m := range localMsgs { + r.mu.raftTracer.MaybeTrace(m) if err := raftGroup.Step(m); err != nil { log.Fatalf(ctx, "unexpected error stepping local raft message [%s]: %v", raft.DescribeMessage(m, raftEntryFormatter), err) @@ -2012,6 +2023,7 @@ func (r *Replica) sendRaftMessage( lastToReplica, lastFromReplica := r.getLastReplicaDescriptors() r.mu.RLock() + traced := r.mu.raftTracer.MaybeTrace(msg) fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.From), lastToReplica) toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.To), lastFromReplica) var startKey roachpb.RKey @@ -2064,6 +2076,7 @@ func (r *Replica) sendRaftMessage( RangeStartKey: startKey, // usually nil UsingRac2Protocol: r.flowControlV2.GetEnabledWhenLeader() >= kvflowcontrol.V2EnabledWhenLeaderV1Encoding, LowPriorityOverride: lowPriorityOverride, + TracedEntries: traced, } // For RACv2, annotate successful MsgAppResp messages with the vector of // admitted log indices, by priority. diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index b233127bedd1..1b307a9cab69 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -7772,7 +7772,7 @@ func TestReplicaAbandonProposal(t *testing.T) { dropProp := int32(1) tc.repl.mu.Lock() tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) { - if v := p.ctx.Value(magicKey{}); v != nil { + if v := p.Context().Value(magicKey{}); v != nil { cancel() return atomic.LoadInt32(&dropProp) == 1, nil } @@ -7890,7 +7890,7 @@ func TestReplicaRetryRaftProposal(t *testing.T) { tc.repl.mu.Lock() tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride kvpb.LeaseAppliedIndex) { - if v := p.ctx.Value(magicKey{}); v != nil { + if v := p.Context().Value(magicKey{}); v != nil { if curAttempt := atomic.AddInt32(&c, 1); curAttempt == 1 { return wrongLeaseIndex } @@ -7994,7 +7994,7 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) { abandoned := make(map[kvserverbase.CmdIDKey]struct{}) // protected by repl.mu tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) { if _, ok := abandoned[p.idKey]; ok { - log.Infof(p.ctx, "abandoning command") + log.Infof(p.Context(), "abandoning command") return true, nil } return false, nil @@ -8066,7 +8066,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { if atomic.LoadInt32(&dropAll) == 1 { return true, nil } - if v := p.ctx.Value(magicKey{}); v != nil { + if v := p.Context().Value(magicKey{}); v != nil { seenCmds = append(seenCmds, int(p.command.MaxLeaseIndex)) } return false, nil @@ -8098,7 +8098,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { } origIndexes := make([]int, 0, num) for _, p := range tc.repl.mu.proposals { - if v := p.ctx.Value(magicKey{}); v != nil { + if v := p.Context().Value(magicKey{}); v != nil { origIndexes = append(origIndexes, int(p.command.MaxLeaseIndex)) } } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 803bfd138d21..d0d1137d5404 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -943,6 +943,11 @@ type Store struct { // has likely improved). draining atomic.Bool + // concurrentRaftTraces is the number of concurrent raft trace requests that + // are currently registered. This limit is used to prevent extensive raft + // tracing from inadvertently impacting performance. + concurrentRaftTraces atomic.Int64 + // Locking notes: To avoid deadlocks, the following lock order must be // obeyed: baseQueue.mu < Replica.raftMu < Replica.readOnlyCmdMu < Store.mu // < Replica.mu < Replica.unreachablesMu < Store.coalescedMu < Store.scheduler.mu. diff --git a/pkg/raft/raftpb/raft.go b/pkg/raft/raftpb/raft.go index b2073df26232..2169809f9339 100644 --- a/pkg/raft/raftpb/raft.go +++ b/pkg/raft/raftpb/raft.go @@ -24,6 +24,12 @@ type Epoch int64 // SafeValue implements the redact.SafeValue interface. func (e Epoch) SafeValue() {} +// The enums in raft are all safe for redaction. +func (MessageType) SafeValue() {} +func (EntryType) SafeValue() {} +func (ConfChangeType) SafeValue() {} +func (ConfChangeTransition) SafeValue() {} + // Priority specifies per-entry priorities, that are local to the interaction // between a leader-replica pair, i.e., they are not an invariant of a // particular entry in the raft log (the replica could be the leader itself or diff --git a/pkg/raft/util.go b/pkg/raft/util.go index dfad989b062b..2f86a7d651ff 100644 --- a/pkg/raft/util.go +++ b/pkg/raft/util.go @@ -200,6 +200,10 @@ func describeMessageWithIndent(indent string, m pb.Message, f EntryFormatter) st return buf.String() } +func DescribeTarget(id pb.PeerID) string { + return describeTarget(id) +} + func describeTarget(id pb.PeerID) string { switch id { case None: diff --git a/pkg/testutils/lint/passes/fmtsafe/functions.go b/pkg/testutils/lint/passes/fmtsafe/functions.go index 4890f40faecb..87c6314f597f 100644 --- a/pkg/testutils/lint/passes/fmtsafe/functions.go +++ b/pkg/testutils/lint/passes/fmtsafe/functions.go @@ -120,6 +120,8 @@ var requireConstFmt = map[string]bool{ "(*github.com/cockroachdb/cockroach/pkg/kv/kvserver.raftLogger).Fatalf": true, "(*github.com/cockroachdb/cockroach/pkg/kv/kvserver.raftLogger).Panicf": true, + "(*github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace.traceValue).logf": true, + "(*github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2.LogTracker).errorf": true, "(github.com/cockroachdb/cockroach/pkg/raft/raftlogger.Logger).Debugf": true, diff --git a/pkg/testutils/lint/passes/redactcheck/redactcheck.go b/pkg/testutils/lint/passes/redactcheck/redactcheck.go index 025db3047678..00e764fdeb6e 100644 --- a/pkg/testutils/lint/passes/redactcheck/redactcheck.go +++ b/pkg/testutils/lint/passes/redactcheck/redactcheck.go @@ -138,8 +138,12 @@ func runAnalyzer(pass *analysis.Pass) (interface{}, error) { "ID": {}, }, "github.com/cockroachdb/cockroach/pkg/raft/raftpb": { - "Epoch": {}, - "PeerID": {}, + "Epoch": {}, + "PeerID": {}, + "MessageType": {}, + "EntryType": {}, + "ConfChangeType": {}, + "ConfChangeTransition": {}, }, "github.com/cockroachdb/cockroach/pkg/repstream/streampb": { "StreamID": {}, @@ -225,6 +229,10 @@ func runAnalyzer(pass *analysis.Pass) (interface{}, error) { "WorkKind": {}, "QueueKind": {}, }, + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb": { + "TraceID": {}, + "SpanID": {}, + }, "github.com/cockroachdb/cockroach/pkg/util/hlc": { "ClockTimestamp": {}, "LegacyTimestamp": {}, diff --git a/pkg/util/tracing/tracingpb/recorded_span.go b/pkg/util/tracing/tracingpb/recorded_span.go index 90dfb675345b..381b521e960a 100644 --- a/pkg/util/tracing/tracingpb/recorded_span.go +++ b/pkg/util/tracing/tracingpb/recorded_span.go @@ -22,9 +22,13 @@ const ( // TraceID is a probabilistically-unique id, shared by all spans in a trace. type TraceID uint64 +func (TraceID) SafeValue() {} + // SpanID is a probabilistically-unique span id. type SpanID uint64 +func (SpanID) SafeValue() {} + // Recording represents a group of RecordedSpans rooted at a fixed root span, as // returned by GetRecording. Spans are sorted by StartTime. type Recording []RecordedSpan