diff --git a/pkg/kv/kvserver/consistency_queue.go b/pkg/kv/kvserver/consistency_queue.go index 0a40db7aa21c..3f40424fee89 100644 --- a/pkg/kv/kvserver/consistency_queue.go +++ b/pkg/kv/kvserver/consistency_queue.go @@ -55,31 +55,10 @@ const consistencyCheckRateBurstFactor = 8 // churn on timers. const consistencyCheckRateMinWait = 100 * time.Millisecond -// consistencyCheckAsyncConcurrency is the maximum number of asynchronous -// consistency checks to run concurrently per store below Raft. The -// server.consistency_check.max_rate limit is shared among these, so running too -// many at the same time will cause them to time out. The rate is multiplied by -// 10 (permittedRangeScanSlowdown) to obtain the per-check timeout. 7 gives -// reasonable headroom, and also handles clusters with high replication factor -// and/or many nodes -- recall that each node runs a separate consistency queue -// which can schedule checks on other nodes, e.g. a 7-node cluster with a -// replication factor of 7 could run 7 concurrent checks on every node. -// -// Note that checksum calculations below Raft are not tied to the caller's -// context, and may continue to run even after the caller has given up on them, -// which may cause them to build up. Although we do best effort to cancel the -// running task on the receiving end when the incoming request is aborted. -// -// CHECK_STATS checks do not count towards this limit, as they are cheap and the -// DistSender will parallelize them across all ranges (notably when calling -// crdb_internal.check_consistency()). -const consistencyCheckAsyncConcurrency = 7 - -// consistencyCheckAsyncTimeout is a below-Raft timeout for asynchronous -// consistency check calculations. These are not tied to the caller's context, -// and thus may continue to run even if the caller has given up on them, so we -// give them an upper timeout to prevent them from running forever. -const consistencyCheckAsyncTimeout = time.Hour +// consistencyCheckSyncTimeout is the max amount of time the consistency check +// computation and the checksum collection request will wait for each other +// before giving up. +const consistencyCheckSyncTimeout = 5 * time.Second var testingAggressiveConsistencyChecks = envutil.EnvOrDefaultBool("COCKROACH_CONSISTENCY_AGGRESSIVE", false) diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index ebe4ec6e78f3..130caa6cf415 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -51,10 +51,12 @@ import ( // Up to 22.1, the consistency check initiator used to synchronously collect the // first replica's checksum before all others, so checksum collection requests // could arrive late if the first one was slow. Since 22.2, all requests are -// parallel and likely arrive quickly. +// parallel and likely arrive quickly. Thus, in 23.1 the checksum task waits a +// short amount of time until the collection request arrives, and otherwise +// doesn't start. // -// TODO(pavelkalinnikov): Consider removing GC behaviour in 23.1+, when all the -// incoming requests are from 22.2+ nodes (hence arrive timely). +// We still need the delayed GC in order to help a late arriving participant to +// learn that the other one gave up, and fail fast instead of waiting. const replicaChecksumGCInterval = time.Hour // fatalOnStatsMismatch, if true, turns stats mismatches into fatal errors. A @@ -422,7 +424,7 @@ func (r *Replica) getReplicaChecksum(id uuid.UUID, now time.Time) (*replicaCheck c := r.mu.checksums[id] if c == nil { c = &replicaChecksum{ - started: make(chan context.CancelFunc, 1), // allow an async send + started: make(chan context.CancelFunc), // require send/recv sync result: make(chan CollectChecksumResponse, 1), // allow an async send } r.mu.checksums[id] = c @@ -496,13 +498,13 @@ func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (CollectChecksu } // checksumInitialWait returns the amount of time to wait until the checksum -// computation has started. It is set to min of 5s and 10% of the remaining time -// in the passed-in context (if it has a deadline). +// computation has started. It is set to min of consistencyCheckSyncTimeout and +// 10% of the remaining time in the passed-in context (if it has a deadline). // // If it takes longer, chances are that the replica is being restored from // snapshots, or otherwise too busy to handle this request soon. func (*Replica) checksumInitialWait(ctx context.Context) time.Duration { - wait := 5 * time.Second + wait := consistencyCheckSyncTimeout if d, ok := ctx.Deadline(); ok { if dur := time.Duration(timeutil.Until(d).Nanoseconds() / 10); dur < wait { wait = dur @@ -747,72 +749,79 @@ func (r *Replica) computeChecksumPostApply( } // Compute SHA asynchronously and store it in a map by UUID. Concurrent checks - // share the rate limit in r.store.consistencyLimiter, so we also limit the - // number of concurrent checks via r.store.consistencySem. + // share the rate limit in r.store.consistencyLimiter, so if too many run at + // the same time, chances are they will time out. // - // Don't use the proposal's context for this, as it likely to be canceled very - // soon. + // Each node's consistency queue runs a check for one range at a time, which + // it broadcasts to all replicas, so the average number of incoming in-flight + // collection requests per node is equal to the replication factor (typ. 3-7). + // Abandoned tasks are canceled eagerly within a few seconds, so there is very + // limited room for running above this figure. Thus we don't limit the number + // of concurrent tasks here. + // + // NB: CHECK_STATS checks are cheap and the DistSender will parallelize them + // across all ranges (notably when calling crdb_internal.check_consistency()). const taskName = "kvserver.Replica: computing checksum" - sem := r.store.consistencySem - if cc.Mode == roachpb.ChecksumMode_CHECK_STATS { - // Stats-only checks are cheap, and the DistSender parallelizes these across - // ranges (in particular when calling crdb_internal.check_consistency()), so - // they don't count towards the semaphore limit. - sem = nil - } stopper := r.store.Stopper() + // Don't use the proposal's context, as it is likely to be canceled very soon. taskCtx, taskCancel := stopper.WithCancelOnQuiesce(r.AnnotateCtx(context.Background())) if err := stopper.RunAsyncTaskEx(taskCtx, stop.TaskOpts{ - TaskName: taskName, - Sem: sem, - WaitForSem: false, + TaskName: taskName, }, func(ctx context.Context) { defer taskCancel() - // There is only one writer to c.started (this task), so this doesn't block. - // But if by mistake there is another writer, one of us closes the channel - // eventually, and other send/close ops will crash. This is by design. - c.started <- taskCancel - close(c.started) - - if err := contextutil.RunWithTimeout(ctx, taskName, consistencyCheckAsyncTimeout, + defer snap.Close() + defer r.gcReplicaChecksum(cc.ChecksumID, c) + // Wait until the CollectChecksum request handler joins in and learns about + // the starting computation, and then start it. + if err := contextutil.RunWithTimeout(ctx, taskName, consistencyCheckSyncTimeout, func(ctx context.Context) error { - defer snap.Close() - var snapshot *roachpb.RaftSnapshotData - if cc.SaveSnapshot { - snapshot = &roachpb.RaftSnapshotData{} + // There is only one writer to c.started (this task), buf if by mistake + // there is another writer, one of us closes the channel eventually, and + // other writes to c.started will crash. By design. + defer close(c.started) + select { + case <-ctx.Done(): + return ctx.Err() + case c.started <- taskCancel: + return nil } - - result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, r.store.consistencyLimiter) - if err != nil { - result = nil - } - r.computeChecksumDone(c, result, snapshot) - r.gcReplicaChecksum(cc.ChecksumID, c) - return err }, ); err != nil { - log.Errorf(ctx, "checksum computation failed: %v", err) + log.Errorf(ctx, "checksum collection did not join: %v", err) + } else { + var snapshot *roachpb.RaftSnapshotData + if cc.SaveSnapshot { + snapshot = &roachpb.RaftSnapshotData{} + } + result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, r.store.consistencyLimiter) + if err != nil { + log.Errorf(ctx, "checksum computation failed: %v", err) + result = nil + } + r.computeChecksumDone(c, result, snapshot) } var shouldFatal bool for _, rDesc := range cc.Terminate { if rDesc.StoreID == r.store.StoreID() && rDesc.ReplicaID == r.replicaID { shouldFatal = true + break } } + if !shouldFatal { + return + } - if shouldFatal { - // This node should fatal as a result of a previous consistency - // check (i.e. this round is carried out only to obtain a diff). - // If we fatal too early, the diff won't make it back to the lease- - // holder and thus won't be printed to the logs. Since we're already - // in a goroutine that's about to end, simply sleep for a few seconds - // and then terminate. - auxDir := r.store.engine.GetAuxiliaryDir() - _ = r.store.engine.MkdirAll(auxDir) - path := base.PreventedStartupFile(auxDir) + // This node should fatal as a result of a previous consistency check (i.e. + // this round is carried out only to obtain a diff). If we fatal too early, + // the diff won't make it back to the leaseholder and thus won't be printed + // to the logs. Since we're already in a goroutine that's about to end, + // simply sleep for a few seconds and then terminate. + auxDir := r.store.engine.GetAuxiliaryDir() + _ = r.store.engine.MkdirAll(auxDir) + path := base.PreventedStartupFile(auxDir) - const attentionFmt = `ATTENTION: + const attentionFmt = `ATTENTION: this node is terminating because a replica inconsistency was detected between %s and its other replicas. Please check your cluster-wide log files for more @@ -825,19 +834,17 @@ A checkpoints directory to aid (expert) debugging should be present in: A file preventing this node from restarting was placed at: %s ` - preventStartupMsg := fmt.Sprintf(attentionFmt, r, auxDir, path) - if err := fs.WriteFile(r.store.engine, path, []byte(preventStartupMsg)); err != nil { - log.Warningf(ctx, "%v", err) - } - - if p := r.store.cfg.TestingKnobs.ConsistencyTestingKnobs.OnBadChecksumFatal; p != nil { - p(*r.store.Ident) - } else { - time.Sleep(10 * time.Second) - log.Fatalf(r.AnnotateCtx(context.Background()), attentionFmt, r, auxDir, path) - } + preventStartupMsg := fmt.Sprintf(attentionFmt, r, auxDir, path) + if err := fs.WriteFile(r.store.engine, path, []byte(preventStartupMsg)); err != nil { + log.Warningf(ctx, "%v", err) } + if p := r.store.cfg.TestingKnobs.ConsistencyTestingKnobs.OnBadChecksumFatal; p != nil { + p(*r.store.Ident) + } else { + time.Sleep(10 * time.Second) + log.Fatalf(r.AnnotateCtx(context.Background()), attentionFmt, r, auxDir, path) + } }); err != nil { taskCancel() snap.Close() diff --git a/pkg/kv/kvserver/replica_consistency_test.go b/pkg/kv/kvserver/replica_consistency_test.go index 7c5ad7db42ed..98bb6be6fdf2 100644 --- a/pkg/kv/kvserver/replica_consistency_test.go +++ b/pkg/kv/kvserver/replica_consistency_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) func TestReplicaChecksumVersion(t *testing.T) { @@ -54,8 +55,10 @@ func TestReplicaChecksumVersion(t *testing.T) { } else { cc.Version = 1 } - taskErr := tc.repl.computeChecksumPostApply(ctx, cc) + var g errgroup.Group + g.Go(func() error { return tc.repl.computeChecksumPostApply(ctx, cc) }) rc, err := tc.repl.getChecksum(ctx, cc.ChecksumID) + taskErr := g.Wait() if !matchingVersion { require.ErrorContains(t, taskErr, "incompatible versions") require.ErrorContains(t, err, "checksum task failed to start") @@ -79,13 +82,12 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) { defer stopper.Stop(ctx) tc.Start(ctx, t, stopper) - requireChecksumTaskNotStarted := func(id uuid.UUID) { - require.ErrorContains(t, - tc.repl.computeChecksumPostApply(context.Background(), kvserverpb.ComputeChecksum{ - ChecksumID: id, - Mode: roachpb.ChecksumMode_CHECK_FULL, - Version: batcheval.ReplicaChecksumVersion, - }), "checksum collection request gave up") + startChecksumTask := func(ctx context.Context, id uuid.UUID) error { + return tc.repl.computeChecksumPostApply(ctx, kvserverpb.ComputeChecksum{ + ChecksumID: id, + Mode: roachpb.ChecksumMode_CHECK_FULL, + Version: batcheval.ReplicaChecksumVersion, + }) } // Checksum computation failed to start. @@ -99,28 +101,38 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) { // Checksum computation started, but failed. id = uuid.FastMakeV4() c, _ = tc.repl.getReplicaChecksum(id, timeutil.Now()) - c.started <- func() {} - close(c.started) - close(c.result) + var g errgroup.Group + g.Go(func() error { + c.started <- func() {} + close(c.started) + close(c.result) + return nil + }) rc, err = tc.repl.getChecksum(ctx, id) require.ErrorContains(t, err, "no checksum found") require.Nil(t, rc.Checksum) + require.NoError(t, g.Wait()) // The initial wait for the task start expires. This will take 10ms. id = uuid.FastMakeV4() rc, err = tc.repl.getChecksum(ctx, id) require.ErrorContains(t, err, "checksum computation did not start") require.Nil(t, rc.Checksum) - requireChecksumTaskNotStarted(id) + require.ErrorContains(t, startChecksumTask(context.Background(), id), + "checksum collection request gave up") // The computation has started, but the request context timed out. id = uuid.FastMakeV4() c, _ = tc.repl.getReplicaChecksum(id, timeutil.Now()) - c.started <- func() {} - close(c.started) + g.Go(func() error { + c.started <- func() {} + close(c.started) + return nil + }) rc, err = tc.repl.getChecksum(ctx, id) require.ErrorIs(t, err, context.DeadlineExceeded) require.Nil(t, rc.Checksum) + require.NoError(t, g.Wait()) // Context is canceled during the initial waiting. id = uuid.FastMakeV4() @@ -129,7 +141,20 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) { rc, err = tc.repl.getChecksum(ctx, id) require.ErrorIs(t, err, context.Canceled) require.Nil(t, rc.Checksum) - requireChecksumTaskNotStarted(id) + require.ErrorContains(t, startChecksumTask(context.Background(), id), + "checksum collection request gave up") + + // The task failed to start because the checksum collection request did not + // join. Later, when it joins, it finds out that the task gave up. + id = uuid.FastMakeV4() + c, _ = tc.repl.getReplicaChecksum(id, timeutil.Now()) + require.NoError(t, startChecksumTask(context.Background(), id)) + // TODO(pavelkalinnikov): Avoid this long wait in the test. + time.Sleep(2 * consistencyCheckSyncTimeout) // give the task time to give up + _, ok := <-c.started + require.False(t, ok) // ensure the task gave up + rc, err = tc.repl.getChecksum(context.Background(), id) + require.ErrorContains(t, err, "checksum task failed to start") } // TestReplicaChecksumSHA512 checks that a given dataset produces the expected diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 6ac5f567acb0..fceda5df42db 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -744,7 +744,6 @@ type Store struct { scanner *replicaScanner // Replica scanner consistencyQueue *consistencyQueue // Replica consistency check queue consistencyLimiter *quotapool.RateLimiter // Rate limits consistency checks - consistencySem *quotapool.IntPool // Limit concurrent consistency checks metrics *StoreMetrics intentResolver *intentresolver.IntentResolver recoveryMgr txnrecovery.Manager @@ -2090,9 +2089,6 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { rate := consistencyCheckRate.Get(&s.ClusterSettings().SV) s.consistencyLimiter.UpdateLimit(quotapool.Limit(rate), rate*consistencyCheckRateBurstFactor) }) - s.consistencySem = quotapool.NewIntPool("concurrent async consistency checks", - consistencyCheckAsyncConcurrency) - s.stopper.AddCloser(s.consistencySem.Closer("stopper")) // Set the started flag (for unittests). atomic.StoreInt32(&s.started, 1)