Skip to content

Commit

Permalink
kvserver: store.Enqueue shouldn't start verbose tracing
Browse files Browse the repository at this point in the history
Previously in server.Enqueue, we unconditionally started verbose
tracing. This is expensive and unnessary since most traces were never
processed. Additionally, with the new raft level logging for verbose
traces this resulted in unnecessary traces being created and logged to
the main cockroach.log. This change removes the automatic tracing inside
Enqueue and instead converts tests and callers to explicitly start a
span prior to calling the queue.

As part of this change the Enqueue method no longer returns a Recording.

Epic: none

Release note: None
  • Loading branch information
andrewbaptist committed Oct 16, 2024
1 parent 6c381b6 commit b64a8cf
Show file tree
Hide file tree
Showing 20 changed files with 127 additions and 91 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/multiregionccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ SET CLUSTER SETTING kv.allocator.min_lease_transfer_interval = '5m'
return errors.New(`could not find replica`)
}
for _, queueName := range []string{"split", "replicate", "raftsnapshot"} {
_, processErr, err := store.Enqueue(
processErr, err := store.Enqueue(
ctx, queueName, repl, true /* skipShouldQueue */, false, /* async */
)
if processErr != nil {
Expand Down
12 changes: 7 additions & 5 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -441,11 +442,12 @@ func TestTransferLeaseDuringJointConfigWithDeadIncomingVoter(t *testing.T) {
})

// Run the range through the replicate queue on n1.
trace, processErr, err := store0.Enqueue(
ctx, "replicate", repl0, true /* skipShouldQueue */, false /* async */)
traceCtx, rec := tracing.ContextWithRecordingSpan(ctx, store0.GetStoreConfig().Tracer(), "trace-enqueue")
processErr, err := store0.Enqueue(
traceCtx, "replicate", repl0, true /* skipShouldQueue */, false /* async */)
require.NoError(t, err)
require.NoError(t, processErr)
formattedTrace := trace.String()
formattedTrace := rec().String()
expectedMessages := []string{
`transitioning out of joint configuration`,
`leaseholder .* is being removed through an atomic replication change, transferring lease to`,
Expand Down Expand Up @@ -550,7 +552,7 @@ func internalTransferLeaseFailureDuringJointConfig(t *testing.T, isManual bool)
atomic.StoreInt64(&scratchRangeID, 0)
store := tc.GetFirstStoreFromServer(t, 0)
repl := store.LookupReplica(roachpb.RKey(scratchStartKey))
_, _, err = store.Enqueue(
_, err = store.Enqueue(
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
Expand Down Expand Up @@ -1310,7 +1312,7 @@ func TestLeasesDontThrashWhenNodeBecomesSuspect(t *testing.T) {
repl := tc.GetFirstStoreFromServer(t, i).LookupReplica(roachpb.RKey(key))
require.NotNil(t, repl)
// We don't know who the leaseholder might be, so ignore errors.
_, _, _ = tc.GetFirstStoreFromServer(t, i).Enqueue(
_, _ = tc.GetFirstStoreFromServer(t, i).Enqueue(
ctx, "lease", repl, true /* skipShouldQueue */, false, /* async */
)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/client_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -178,15 +179,16 @@ func TestMigrateWithInflightSnapshot(t *testing.T) {
repl, err := store.GetReplica(desc.RangeID)
require.NoError(t, err)
testutils.SucceedsSoon(t, func() error {
trace, processErr, err := store.Enqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */, false /* async */)
traceCtx, rec := tracing.ContextWithRecordingSpan(ctx, store.GetStoreConfig().Tracer(), "trace-enqueue")
processErr, err := store.Enqueue(traceCtx, "raftsnapshot", repl, true /* skipShouldQueue */, false /* async */)
if err != nil {
return err
}
if processErr != nil {
return processErr
}
const msg = `skipping snapshot; replica is likely a LEARNER in the process of being added: (n2,s2):2LEARNER`
formattedTrace := trace.String()
formattedTrace := rec().String()
if !strings.Contains(formattedTrace, msg) {
return errors.Errorf(`expected "%s" in trace got:\n%s`, msg, formattedTrace)
}
Expand Down
18 changes: 13 additions & 5 deletions pkg/kv/kvserver/client_protectedts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -146,8 +147,10 @@ ORDER BY raw_start_key ASC LIMIT 1`)
testutils.SucceedsSoon(t, func() error {
upsertUntilBackpressure()
s, repl := getStoreAndReplica()
trace, _, err := s.Enqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */)
traceCtx, rec := tracing.ContextWithRecordingSpan(ctx, s.GetStoreConfig().Tracer(), "trace-enqueue")
_, err := s.Enqueue(traceCtx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */)
require.NoError(t, err)
trace := rec()
if !processedRegexp.MatchString(trace.String()) {
return errors.Errorf("%q does not match %q", trace.String(), processedRegexp)
}
Expand Down Expand Up @@ -195,14 +198,17 @@ ORDER BY raw_start_key ASC LIMIT 1`)
s, repl := getStoreAndReplica()
// The protectedts record will prevent us from aging the MVCC garbage bytes
// past the oldest record so shouldQueue should be false. Verify that.
trace, _, err := s.Enqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */)
traceCtx, rec := tracing.ContextWithRecordingSpan(ctx, s.GetStoreConfig().Tracer(), "trace-enqueue")
_, err = s.Enqueue(traceCtx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */)
require.NoError(t, err)
require.Regexp(t, "(?s)shouldQueue=false", trace.String())
require.Regexp(t, "(?s)shouldQueue=false", rec().String())

// If we skipShouldQueue then gc will run but it should only run up to the
// timestamp of our record at the latest.
trace, _, err = s.Enqueue(ctx, "mvccGC", repl, true /* skipShouldQueue */, false /* async */)
traceCtx, rec = tracing.ContextWithRecordingSpan(ctx, s.GetStoreConfig().Tracer(), "trace-enqueue")
_, err = s.Enqueue(traceCtx, "mvccGC", repl, true /* skipShouldQueue */, false /* async */)
require.NoError(t, err)
trace := rec()
require.Regexp(t, "(?s)handled \\d+ incoming point keys; deleted \\d+", trace.String())
thresh := thresholdFromTrace(trace)
require.Truef(t, thresh.Less(ptsRec.Timestamp), "threshold: %v, protected %v %q", thresh, ptsRec.Timestamp, trace)
Expand Down Expand Up @@ -237,8 +243,10 @@ ORDER BY raw_start_key ASC LIMIT 1`)
// happens up to the protected timestamp of the new record.
require.NoError(t, ptsWithDB.Release(ctx, ptsRec.ID.GetUUID()))
testutils.SucceedsSoon(t, func() error {
trace, _, err = s.Enqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */)
traceCtx, rec := tracing.ContextWithRecordingSpan(ctx, s.GetStoreConfig().Tracer(), "trace-enqueue")
_, err = s.Enqueue(traceCtx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */)
require.NoError(t, err)
trace := rec()
if !processedRegexp.MatchString(trace.String()) {
return errors.Errorf("%q does not match %q", trace.String(), processedRegexp)
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -1141,9 +1142,11 @@ func TestStoreRangeSplitWithTracing(t *testing.T) {
repl := store.LookupReplica(splitKeyAddr)
targetRange.Store(int32(repl.RangeID))

recording, processErr, enqueueErr := store.Enqueue(
ctx, "split", repl, true /* skipShouldQueue */, false, /* async */
traceCtx, rec := tracing.ContextWithRecordingSpan(ctx, store.GetStoreConfig().Tracer(), "trace-enqueue")
processErr, enqueueErr := store.Enqueue(
traceCtx, "split", repl, true /* skipShouldQueue */, false, /* async */
)
recording := rec()
require.NoError(t, enqueueErr)
require.NoError(t, processErr)

Expand Down Expand Up @@ -4356,7 +4359,7 @@ func TestLBSplitUnsafeKeys(t *testing.T) {
// Update the split key override so that the split queue will enqueue and
// process the range. Remove it afterwards to avoid retrying the LHS.
splitKeyOverride.Store(splitKey)
_, processErr, enqueueErr := store.Enqueue(ctx, "split", repl, false /* shouldSkipQueue */, false /* async */)
processErr, enqueueErr := store.Enqueue(ctx, "split", repl, false /* shouldSkipQueue */, false /* async */)
splitKeyOverride.Store(roachpb.Key{})
require.NoError(t, enqueueErr)

Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/kvserverbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ go_library(
"//pkg/util/quotapool",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing/tracingpb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_redact//:redact",
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/kvserverbase/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
)

// StoresIterator is able to iterate over all stores on a given node.
Expand All @@ -30,7 +29,7 @@ type Store interface {
queue string,
rangeID roachpb.RangeID,
skipShouldQueue bool,
) (tracingpb.Recording, error)
) error

// SetQueueActive disables/enables the named queue.
SetQueueActive(active bool, queue string) error
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/lease_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,9 +466,9 @@ func TestLeaseQueueRaceReplicateQueue(t *testing.T) {
// replica in the replicate queue synchronously. The lease queue processing
// should block on the block mutex above, causing the replicate queue to
// return a AllocatorTokenErr trying to process the replica.
_, _, _ = repl.Store().Enqueue(ctx, "lease", repl, true /* skipShouldQueue */, true /* async */)
_, _ = repl.Store().Enqueue(ctx, "lease", repl, true /* skipShouldQueue */, true /* async */)
<-blocked
_, processErr, _ := repl.Store().Enqueue(ctx, "replicate", repl, true /* skipShouldQueue */, false /* async */)
processErr, _ := repl.Store().Enqueue(ctx, "replicate", repl, true /* skipShouldQueue */, false /* async */)
require.ErrorIs(t, processErr, plan.NewErrAllocatorToken("lease"))
}

Expand Down
56 changes: 32 additions & 24 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,8 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
// Manually enqueue the leaseholder replica into its store's raft snapshot
// queue. We expect it to pick up on the fact that the non-voter on its range
// needs a snapshot.
recording, pErr, err := leaseholderStore.Enqueue(
ctx, rec := tracing.ContextWithRecordingSpan(ctx, leaseholderStore.GetStoreConfig().Tracer(), "trace-enqueue")
pErr, err := leaseholderStore.Enqueue(
ctx,
"raftsnapshot",
leaseholderRepl,
Expand All @@ -1002,7 +1003,7 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
if err != nil {
return err
}
matched, err := regexp.MatchString("streamed snapshot.*to.*NON_VOTER", recording.String())
matched, err := regexp.MatchString("streamed snapshot.*to.*NON_VOTER", rec().String())
if err != nil {
return err
}
Expand Down Expand Up @@ -1266,13 +1267,14 @@ func TestReplicateQueueSeesLearnerOrJointConfig(t *testing.T) {
{
require.Equal(t, int64(0), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`))
store.TestingSetReplicateQueueActive(true)
trace, processErr, err := store.Enqueue(
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
traceCtx, finish := tracing.ContextWithRecordingSpan(ctx, store.GetStoreConfig().Tracer(), "trace-enqueue")
processErr, err := store.Enqueue(
traceCtx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
require.NoError(t, processErr)
action := "next replica action: remove learner"
require.NoError(t, testutils.MatchInOrder(trace.String(), []string{action}...))
require.NoError(t, testutils.MatchInOrder(finish().String(), []string{action}...))
require.Equal(t, int64(1), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`))

testutils.SucceedsSoon(t, func() error {
Expand All @@ -1294,13 +1296,14 @@ func TestReplicateQueueSeesLearnerOrJointConfig(t *testing.T) {
desc := tc.RemoveVotersOrFatal(t, scratchStartKey, tc.Target(2))
require.True(t, desc.Replicas().InAtomicReplicationChange(), desc)
store.TestingSetReplicateQueueActive(true)
trace, processErr, err := store.Enqueue(
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
traceCtx, finish := tracing.ContextWithRecordingSpan(ctx, store.GetStoreConfig().Tracer(), "trace-enqueue")
processErr, err := store.Enqueue(
traceCtx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
require.NoError(t, processErr)
action := "next replica action: finalize conf change"
require.NoError(t, testutils.MatchInOrder(trace.String(), []string{action}...))
require.NoError(t, testutils.MatchInOrder(finish().String(), []string{action}...))

testutils.SucceedsSoon(t, func() error {
desc = tc.LookupRangeOrFatal(t, scratchStartKey)
Expand Down Expand Up @@ -1334,13 +1337,14 @@ func TestReplicaGCQueueSeesLearnerOrJointConfig(t *testing.T) {
// Run the replicaGC queue.
checkNoGC := func() roachpb.RangeDescriptor {
store, repl := getFirstStoreReplica(t, tc.Server(1), scratchStartKey)
trace, processErr, err := store.Enqueue(
ctx, "replicaGC", repl, true /* skipShouldQueue */, false, /* async */
traceCtx, rec := tracing.ContextWithRecordingSpan(ctx, store.GetStoreConfig().Tracer(), "trace-enqueue")
processErr, err := store.Enqueue(
traceCtx, "replicaGC", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
require.NoError(t, processErr)
const msg = `not gc'able, replica is still in range descriptor: (n2,s2):`
require.Contains(t, trace.String(), msg)
require.Contains(t, rec().String(), msg)
return tc.LookupRangeOrFatal(t, scratchStartKey)
}
desc := checkNoGC()
Expand Down Expand Up @@ -1396,8 +1400,9 @@ func TestRaftSnapshotQueueSeesLearner(t *testing.T) {
// raft to figure out that the replica needs a snapshot.
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
testutils.SucceedsSoon(t, func() error {
trace, processErr, err := store.Enqueue(
ctx, "raftsnapshot", repl, true /* skipShouldQueue */, false, /* async */
traceCtx, rec := tracing.ContextWithRecordingSpan(ctx, store.GetStoreConfig().Tracer(), "trace-enqueue")
processErr, err := store.Enqueue(
traceCtx, "raftsnapshot", repl, true /* skipShouldQueue */, false, /* async */
)
if err != nil {
return err
Expand All @@ -1406,7 +1411,7 @@ func TestRaftSnapshotQueueSeesLearner(t *testing.T) {
return processErr
}
const msg = `skipping snapshot; replica is likely a LEARNER in the process of being added: (n2,s2):2LEARNER`
formattedTrace := trace.String()
formattedTrace := rec().String()
if !strings.Contains(formattedTrace, msg) {
return errors.Errorf(`expected "%s" in trace got:\n%s`, msg, formattedTrace)
}
Expand Down Expand Up @@ -1548,16 +1553,17 @@ func TestLearnerReplicateQueueRace(t *testing.T) {
queue1ErrCh := make(chan error, 1)
go func() {
queue1ErrCh <- func() error {
trace, processErr, err := store.Enqueue(
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
traceCtx, rec := tracing.ContextWithRecordingSpan(ctx, store.GetStoreConfig().Tracer(), "trace-enqueue")
processErr, err := store.Enqueue(
traceCtx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
)
if err != nil {
return err
}
if processErr == nil || !strings.Contains(processErr.Error(), `descriptor changed`) {
return errors.Wrap(processErr, `expected "descriptor changed" error got: %+v`)
}
formattedTrace := trace.String()
formattedTrace := rec().String()
expectedMessages := []string{
`could not promote .*?n3,s3.*? to voter, rolling back:.*?change replicas of r\d+ failed: descriptor changed`,
`learner to roll back not found`,
Expand Down Expand Up @@ -2044,7 +2050,7 @@ func TestMergeQueueDoesNotInterruptReplicationChange(t *testing.T) {
// ensure that the merge correctly notices that there is a snapshot in
// flight and ignores the range.
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchKey)
_, processErr, enqueueErr := store.Enqueue(
processErr, enqueueErr := store.Enqueue(
ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, enqueueErr)
Expand Down Expand Up @@ -2090,12 +2096,13 @@ func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) {
})

store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
trace, processErr, err := store.Enqueue(
ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */
traceCtx, rec := tracing.ContextWithRecordingSpan(ctx, store.GetStoreConfig().Tracer(), "trace-enqueue")
processErr, err := store.Enqueue(
traceCtx, "merge", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
require.NoError(t, processErr)
formattedTrace := trace.String()
formattedTrace := rec().String()
expectedMessages := []string{
`removing learner replicas \[n2,s2\]`,
`merging to produce range: /Table/Max-/Max`,
Expand Down Expand Up @@ -2127,12 +2134,13 @@ func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) {
checkTransitioningOut := func() {
t.Helper()
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
trace, processErr, err := store.Enqueue(
ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */
traceCtx, rec := tracing.ContextWithRecordingSpan(ctx, store.GetStoreConfig().Tracer(), "trace-enqueue")
processErr, err := store.Enqueue(
traceCtx, "merge", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
require.NoError(t, processErr)
formattedTrace := trace.String()
formattedTrace := rec().String()
expectedMessages := []string{
`transitioning out of joint configuration`,
`merging to produce range: /Table/Max-/Max`,
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,8 +745,7 @@ func (rq *replicateQueue) processOneChangeWithTracing(
ctx context.Context, repl *Replica, desc *roachpb.RangeDescriptor, conf *roachpb.SpanConfig,
) (requeue bool, _ error) {
processStart := timeutil.Now()
ctx, sp := tracing.EnsureChildSpan(ctx, rq.Tracer, "process replica",
tracing.WithRecording(tracingpb.RecordingVerbose))
ctx, sp := tracing.EnsureChildSpan(ctx, rq.Tracer, "process replica")
defer sp.Finish()

requeue, err := rq.processOneChange(ctx, repl, desc, conf,
Expand Down
Loading

0 comments on commit b64a8cf

Please sign in to comment.