Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: add tracing to raft #131850

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1510,6 +1510,7 @@ GO_TARGETS = [
"//pkg/kv/kvserver/raftentry:raftentry_test",
"//pkg/kv/kvserver/raftlog:raftlog",
"//pkg/kv/kvserver/raftlog:raftlog_test",
"//pkg/kv/kvserver/rafttrace:rafttrace",
"//pkg/kv/kvserver/raftutil:raftutil",
"//pkg/kv/kvserver/raftutil:raftutil_test",
"//pkg/kv/kvserver/rangefeed:rangefeed",
Expand Down
5 changes: 5 additions & 0 deletions pkg/cmd/roachtest/tests/admission_control_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,11 @@ func (v variations) runTest(ctx context.Context, t test.Test, c cluster.Cluster)
`SET CLUSTER SETTING kv.lease.reject_on_leader_unknown.enabled = true`); err != nil {
t.Fatal(err)
}
// Enable raft tracing. Remove this once raft tracing is the default.
if _, err := db.ExecContext(ctx,
`SET CLUSTER SETTING kv.raft.max_concurrent_traces = '10'`); err != nil {
t.Fatal(err)
}
// This isn't strictly necessary, but it would be nice if this test passed at 10s (or lower).
if _, err := db.ExecContext(ctx,
`SET CLUSTER SETTING server.time_after_store_suspect = '10s'`); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ go_library(
"//pkg/kv/kvserver/multiqueue",
"//pkg/kv/kvserver/raftentry",
"//pkg/kv/kvserver/raftlog",
"//pkg/kv/kvserver/rafttrace",
"//pkg/kv/kvserver/rangefeed",
"//pkg/kv/kvserver/rditer",
"//pkg/kv/kvserver/readsummary",
Expand Down Expand Up @@ -437,6 +438,7 @@ go_test(
"//pkg/kv/kvserver/protectedts/ptutil",
"//pkg/kv/kvserver/raftentry",
"//pkg/kv/kvserver/raftlog",
"//pkg/kv/kvserver/rafttrace",
"//pkg/kv/kvserver/raftutil",
"//pkg/kv/kvserver/rangefeed",
"//pkg/kv/kvserver/rditer",
Expand Down
61 changes: 61 additions & 0 deletions pkg/kv/kvserver/client_raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
Expand All @@ -33,6 +34,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/vfs"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -132,6 +135,64 @@ func TestRaftLogQueue(t *testing.T) {
}
}

func TestRaftTracing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// TODO(baptist): Remove this once we change the default to be enabled.
st := cluster.MakeTestingClusterSettings()
rafttrace.MaxConcurrentRaftTraces.Override(context.Background(), &st.SV, 10)

tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: st,
RaftConfig: base.RaftConfig{
RangeLeaseDuration: 24 * time.Hour, // disable lease moves
RaftElectionTimeoutTicks: 1 << 30, // disable elections
},
},
})
defer tc.Stopper().Stop(context.Background())
store := tc.GetFirstStoreFromServer(t, 0)

// Write a single value to ensure we have a leader on n1.
key := tc.ScratchRange(t)
_, pErr := kv.SendWrapped(context.Background(), store.TestSender(), putArgs(key, []byte("value")))
require.NoError(t, pErr.GoError())
require.NoError(t, tc.WaitForSplitAndInitialization(key))
// Set to have 3 voters.
tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...)
tc.WaitForVotersOrFatal(t, key, tc.Targets(1, 2)...)

for i := 0; i < 100; i++ {
var finish func() tracingpb.Recording
ctx := context.Background()
if i == 50 {
// Trace a random request on a "client" tracer.
ctx, finish = tracing.ContextWithRecordingSpan(ctx, tracing.NewTracer(), "test")
}
_, pErr := kv.SendWrapped(ctx, store.TestSender(), putArgs(key, []byte(fmt.Sprintf("value-%d", i))))
require.NoError(t, pErr.GoError())
// Note that this is the clients span, there may be additional logs created after the span is returned.
if finish != nil {
// NB: It is hard to get all the messages in an expected order. We
// simply ensure some of the key messages are returned. Also note
// that we want to make sure that the logs are not reported against
// the tracing library, but the line that called into it.
expectedMessages := []string{
`replica_proposal_buf.* flushing proposal to Raft`,
`replica_proposal_buf.* registering local trace`,
`replica_raft.* 1->2 MsgApp`,
`replica_raft.* 1->3 MsgApp`,
`replica_raft.* AppendThread->1 MsgStorageAppendResp`,
`ack-ing replication success to the client`,
}
require.NoError(t, testutils.MatchInOrder(finish().String(), expectedMessages...))
}
}
}

// TestCrashWhileTruncatingSideloadedEntries emulates a process crash in the
// middle of applying a raft log truncation command that removes some entries
// from the sideloaded storage. The test expects that storage remains in a
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/kvserverpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ message RaftHeartbeat {
bool lagging_followers_on_quiesce_accurate = 10;
}

// The traced entry from the leader along with the trace and span ID.
message TracedEntry {
uint64 index = 1 [(gogoproto.nullable) = false,
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/kv/kvpb.RaftIndex"];
uint64 trace_id = 2 [(gogoproto.nullable) = false, (gogoproto.customname) = "TraceID",
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.TraceID"];
uint64 span_id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "SpanID",
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.SpanID"];
}

// RaftMessageRequest is the request used to send raft messages using our
// protobuf-based RPC codec. If a RaftMessageRequest has a non-empty number of
// heartbeats or heartbeat_resps, the contents of the message field is treated
Expand Down Expand Up @@ -103,6 +113,12 @@ message RaftMessageRequest {
// indices. Used only with RACv2.
kv.kvserver.kvflowcontrol.kvflowcontrolpb.AdmittedState admitted_state = 14 [(gogoproto.nullable) = false];

// TracedEntry is a mapping from Raft index to trace and span ids for this
// request. They are set by the leader and will begin tracing on the
// follower. Note that traces are not currently sent back to the leader, but
// instead logged locally.
repeated TracedEntry traced_entries = 15 [(gogoproto.nullable) = false];

reserved 10;
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func traceProposals(r *Replica, ids []kvserverbase.CmdIDKey, event string) {
r.mu.RLock()
for _, id := range ids {
if prop, ok := r.mu.proposals[id]; ok {
ctxs = append(ctxs, prop.ctx)
ctxs = append(ctxs, prop.Context())
}
}
r.mu.RUnlock()
Expand Down
22 changes: 22 additions & 0 deletions pkg/kv/kvserver/rafttrace/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "rafttrace",
srcs = ["rafttrace.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/raft",
"//pkg/raft/raftpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/log",
"//pkg/util/syncutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
],
)
Loading
Loading