Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 4 additions & 25 deletions pkg/kv/kvserver/consistency_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,31 +55,10 @@ const consistencyCheckRateBurstFactor = 8
// churn on timers.
const consistencyCheckRateMinWait = 100 * time.Millisecond

// consistencyCheckAsyncConcurrency is the maximum number of asynchronous
// consistency checks to run concurrently per store below Raft. The
// server.consistency_check.max_rate limit is shared among these, so running too
// many at the same time will cause them to time out. The rate is multiplied by
// 10 (permittedRangeScanSlowdown) to obtain the per-check timeout. 7 gives
// reasonable headroom, and also handles clusters with high replication factor
// and/or many nodes -- recall that each node runs a separate consistency queue
// which can schedule checks on other nodes, e.g. a 7-node cluster with a
// replication factor of 7 could run 7 concurrent checks on every node.
//
// Note that checksum calculations below Raft are not tied to the caller's
// context, and may continue to run even after the caller has given up on them,
// which may cause them to build up. Although we do best effort to cancel the
// running task on the receiving end when the incoming request is aborted.
//
// CHECK_STATS checks do not count towards this limit, as they are cheap and the
// DistSender will parallelize them across all ranges (notably when calling
// crdb_internal.check_consistency()).
const consistencyCheckAsyncConcurrency = 7

// consistencyCheckAsyncTimeout is a below-Raft timeout for asynchronous
// consistency check calculations. These are not tied to the caller's context,
// and thus may continue to run even if the caller has given up on them, so we
// give them an upper timeout to prevent them from running forever.
const consistencyCheckAsyncTimeout = time.Hour
// consistencyCheckSyncTimeout is the max amount of time the consistency check
// computation and the checksum collection request will wait for each other
// before giving up.
const consistencyCheckSyncTimeout = 5 * time.Second

var testingAggressiveConsistencyChecks = envutil.EnvOrDefaultBool("COCKROACH_CONSISTENCY_AGGRESSIVE", false)

Expand Down
133 changes: 70 additions & 63 deletions pkg/kv/kvserver/replica_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ import (
// Up to 22.1, the consistency check initiator used to synchronously collect the
// first replica's checksum before all others, so checksum collection requests
// could arrive late if the first one was slow. Since 22.2, all requests are
// parallel and likely arrive quickly.
// parallel and likely arrive quickly. Thus, in 23.1 the checksum task waits a
// short amount of time until the collection request arrives, and otherwise
// doesn't start.
//
// TODO(pavelkalinnikov): Consider removing GC behaviour in 23.1+, when all the
// incoming requests are from 22.2+ nodes (hence arrive timely).
// We still need the delayed GC in order to help a late arriving participant to
// learn that the other one gave up, and fail fast instead of waiting.
Comment thread
pav-kv marked this conversation as resolved.
const replicaChecksumGCInterval = time.Hour

// fatalOnStatsMismatch, if true, turns stats mismatches into fatal errors. A
Expand Down Expand Up @@ -422,7 +424,7 @@ func (r *Replica) getReplicaChecksum(id uuid.UUID, now time.Time) (*replicaCheck
c := r.mu.checksums[id]
if c == nil {
c = &replicaChecksum{
started: make(chan context.CancelFunc, 1), // allow an async send
started: make(chan context.CancelFunc), // require send/recv sync
result: make(chan CollectChecksumResponse, 1), // allow an async send
}
r.mu.checksums[id] = c
Expand Down Expand Up @@ -496,13 +498,13 @@ func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (CollectChecksu
}

// checksumInitialWait returns the amount of time to wait until the checksum
// computation has started. It is set to min of 5s and 10% of the remaining time
// in the passed-in context (if it has a deadline).
// computation has started. It is set to min of consistencyCheckSyncTimeout and
// 10% of the remaining time in the passed-in context (if it has a deadline).
//
// If it takes longer, chances are that the replica is being restored from
// snapshots, or otherwise too busy to handle this request soon.
func (*Replica) checksumInitialWait(ctx context.Context) time.Duration {
wait := 5 * time.Second
wait := consistencyCheckSyncTimeout
if d, ok := ctx.Deadline(); ok {
if dur := time.Duration(timeutil.Until(d).Nanoseconds() / 10); dur < wait {
wait = dur
Expand Down Expand Up @@ -747,72 +749,79 @@ func (r *Replica) computeChecksumPostApply(
}

// Compute SHA asynchronously and store it in a map by UUID. Concurrent checks
// share the rate limit in r.store.consistencyLimiter, so we also limit the
// number of concurrent checks via r.store.consistencySem.
// share the rate limit in r.store.consistencyLimiter, so if too many run at
// the same time, chances are they will time out.
//
// Don't use the proposal's context for this, as it likely to be canceled very
// soon.
// Each node's consistency queue runs a check for one range at a time, which
// it broadcasts to all replicas, so the average number of incoming in-flight
// collection requests per node is equal to the replication factor (typ. 3-7).
// Abandoned tasks are canceled eagerly within a few seconds, so there is very
// limited room for running above this figure. Thus we don't limit the number
// of concurrent tasks here.
//
// NB: CHECK_STATS checks are cheap and the DistSender will parallelize them
// across all ranges (notably when calling crdb_internal.check_consistency()).
const taskName = "kvserver.Replica: computing checksum"
sem := r.store.consistencySem
if cc.Mode == roachpb.ChecksumMode_CHECK_STATS {
// Stats-only checks are cheap, and the DistSender parallelizes these across
// ranges (in particular when calling crdb_internal.check_consistency()), so
// they don't count towards the semaphore limit.
sem = nil
}
stopper := r.store.Stopper()
// Don't use the proposal's context, as it is likely to be canceled very soon.
taskCtx, taskCancel := stopper.WithCancelOnQuiesce(r.AnnotateCtx(context.Background()))
if err := stopper.RunAsyncTaskEx(taskCtx, stop.TaskOpts{
TaskName: taskName,
Sem: sem,
WaitForSem: false,
TaskName: taskName,
}, func(ctx context.Context) {
defer taskCancel()
// There is only one writer to c.started (this task), so this doesn't block.
// But if by mistake there is another writer, one of us closes the channel
// eventually, and other send/close ops will crash. This is by design.
c.started <- taskCancel
close(c.started)

if err := contextutil.RunWithTimeout(ctx, taskName, consistencyCheckAsyncTimeout,
defer snap.Close()
defer r.gcReplicaChecksum(cc.ChecksumID, c)
// Wait until the CollectChecksum request handler joins in and learns about
// the starting computation, and then start it.
if err := contextutil.RunWithTimeout(ctx, taskName, consistencyCheckSyncTimeout,
func(ctx context.Context) error {
defer snap.Close()
var snapshot *roachpb.RaftSnapshotData
if cc.SaveSnapshot {
snapshot = &roachpb.RaftSnapshotData{}
// There is only one writer to c.started (this task), buf if by mistake
// there is another writer, one of us closes the channel eventually, and
// other writes to c.started will crash. By design.
defer close(c.started)
select {
case <-ctx.Done():
return ctx.Err()
case c.started <- taskCancel:
return nil
}

result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, r.store.consistencyLimiter)
if err != nil {
result = nil
}
r.computeChecksumDone(c, result, snapshot)
r.gcReplicaChecksum(cc.ChecksumID, c)
return err
},
); err != nil {
log.Errorf(ctx, "checksum computation failed: %v", err)
log.Errorf(ctx, "checksum collection did not join: %v", err)
} else {
var snapshot *roachpb.RaftSnapshotData
if cc.SaveSnapshot {
snapshot = &roachpb.RaftSnapshotData{}
}
result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, r.store.consistencyLimiter)
if err != nil {
log.Errorf(ctx, "checksum computation failed: %v", err)
result = nil
}
r.computeChecksumDone(c, result, snapshot)
}

var shouldFatal bool
for _, rDesc := range cc.Terminate {
if rDesc.StoreID == r.store.StoreID() && rDesc.ReplicaID == r.replicaID {
shouldFatal = true
break
}
}
if !shouldFatal {
return
}

if shouldFatal {
// This node should fatal as a result of a previous consistency
// check (i.e. this round is carried out only to obtain a diff).
// If we fatal too early, the diff won't make it back to the lease-
// holder and thus won't be printed to the logs. Since we're already
// in a goroutine that's about to end, simply sleep for a few seconds
// and then terminate.
auxDir := r.store.engine.GetAuxiliaryDir()
_ = r.store.engine.MkdirAll(auxDir)
path := base.PreventedStartupFile(auxDir)
// This node should fatal as a result of a previous consistency check (i.e.
// this round is carried out only to obtain a diff). If we fatal too early,
// the diff won't make it back to the leaseholder and thus won't be printed
// to the logs. Since we're already in a goroutine that's about to end,
// simply sleep for a few seconds and then terminate.
auxDir := r.store.engine.GetAuxiliaryDir()
_ = r.store.engine.MkdirAll(auxDir)
path := base.PreventedStartupFile(auxDir)

const attentionFmt = `ATTENTION:
const attentionFmt = `ATTENTION:

this node is terminating because a replica inconsistency was detected between %s
and its other replicas. Please check your cluster-wide log files for more
Expand All @@ -825,19 +834,17 @@ A checkpoints directory to aid (expert) debugging should be present in:
A file preventing this node from restarting was placed at:
%s
`
preventStartupMsg := fmt.Sprintf(attentionFmt, r, auxDir, path)
if err := fs.WriteFile(r.store.engine, path, []byte(preventStartupMsg)); err != nil {
log.Warningf(ctx, "%v", err)
}

if p := r.store.cfg.TestingKnobs.ConsistencyTestingKnobs.OnBadChecksumFatal; p != nil {
p(*r.store.Ident)
} else {
time.Sleep(10 * time.Second)
log.Fatalf(r.AnnotateCtx(context.Background()), attentionFmt, r, auxDir, path)
}
preventStartupMsg := fmt.Sprintf(attentionFmt, r, auxDir, path)
if err := fs.WriteFile(r.store.engine, path, []byte(preventStartupMsg)); err != nil {
log.Warningf(ctx, "%v", err)
}

if p := r.store.cfg.TestingKnobs.ConsistencyTestingKnobs.OnBadChecksumFatal; p != nil {
p(*r.store.Ident)
} else {
time.Sleep(10 * time.Second)
log.Fatalf(r.AnnotateCtx(context.Background()), attentionFmt, r, auxDir, path)
}
}); err != nil {
taskCancel()
snap.Close()
Expand Down
55 changes: 40 additions & 15 deletions pkg/kv/kvserver/replica_consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

func TestReplicaChecksumVersion(t *testing.T) {
Expand All @@ -54,8 +55,10 @@ func TestReplicaChecksumVersion(t *testing.T) {
} else {
cc.Version = 1
}
taskErr := tc.repl.computeChecksumPostApply(ctx, cc)
var g errgroup.Group
g.Go(func() error { return tc.repl.computeChecksumPostApply(ctx, cc) })
rc, err := tc.repl.getChecksum(ctx, cc.ChecksumID)
taskErr := g.Wait()
if !matchingVersion {
require.ErrorContains(t, taskErr, "incompatible versions")
require.ErrorContains(t, err, "checksum task failed to start")
Expand All @@ -79,13 +82,12 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) {
defer stopper.Stop(ctx)
tc.Start(ctx, t, stopper)

requireChecksumTaskNotStarted := func(id uuid.UUID) {
require.ErrorContains(t,
tc.repl.computeChecksumPostApply(context.Background(), kvserverpb.ComputeChecksum{
ChecksumID: id,
Mode: roachpb.ChecksumMode_CHECK_FULL,
Version: batcheval.ReplicaChecksumVersion,
}), "checksum collection request gave up")
startChecksumTask := func(ctx context.Context, id uuid.UUID) error {
return tc.repl.computeChecksumPostApply(ctx, kvserverpb.ComputeChecksum{
ChecksumID: id,
Mode: roachpb.ChecksumMode_CHECK_FULL,
Version: batcheval.ReplicaChecksumVersion,
})
}

// Checksum computation failed to start.
Expand All @@ -99,28 +101,38 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) {
// Checksum computation started, but failed.
id = uuid.FastMakeV4()
c, _ = tc.repl.getReplicaChecksum(id, timeutil.Now())
c.started <- func() {}
close(c.started)
close(c.result)
var g errgroup.Group
g.Go(func() error {
c.started <- func() {}
close(c.started)
close(c.result)
return nil
})
rc, err = tc.repl.getChecksum(ctx, id)
require.ErrorContains(t, err, "no checksum found")
require.Nil(t, rc.Checksum)
require.NoError(t, g.Wait())

// The initial wait for the task start expires. This will take 10ms.
id = uuid.FastMakeV4()
rc, err = tc.repl.getChecksum(ctx, id)
require.ErrorContains(t, err, "checksum computation did not start")
require.Nil(t, rc.Checksum)
requireChecksumTaskNotStarted(id)
require.ErrorContains(t, startChecksumTask(context.Background(), id),
"checksum collection request gave up")

// The computation has started, but the request context timed out.
id = uuid.FastMakeV4()
c, _ = tc.repl.getReplicaChecksum(id, timeutil.Now())
c.started <- func() {}
close(c.started)
g.Go(func() error {
c.started <- func() {}
close(c.started)
return nil
})
rc, err = tc.repl.getChecksum(ctx, id)
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Nil(t, rc.Checksum)
require.NoError(t, g.Wait())

// Context is canceled during the initial waiting.
id = uuid.FastMakeV4()
Expand All @@ -129,7 +141,20 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) {
rc, err = tc.repl.getChecksum(ctx, id)
require.ErrorIs(t, err, context.Canceled)
require.Nil(t, rc.Checksum)
requireChecksumTaskNotStarted(id)
require.ErrorContains(t, startChecksumTask(context.Background(), id),
"checksum collection request gave up")

// The task failed to start because the checksum collection request did not
// join. Later, when it joins, it finds out that the task gave up.
id = uuid.FastMakeV4()
c, _ = tc.repl.getReplicaChecksum(id, timeutil.Now())
require.NoError(t, startChecksumTask(context.Background(), id))
// TODO(pavelkalinnikov): Avoid this long wait in the test.
time.Sleep(2 * consistencyCheckSyncTimeout) // give the task time to give up
_, ok := <-c.started
require.False(t, ok) // ensure the task gave up
rc, err = tc.repl.getChecksum(context.Background(), id)
require.ErrorContains(t, err, "checksum task failed to start")
}

// TestReplicaChecksumSHA512 checks that a given dataset produces the expected
Expand Down
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,6 @@ type Store struct {
scanner *replicaScanner // Replica scanner
consistencyQueue *consistencyQueue // Replica consistency check queue
consistencyLimiter *quotapool.RateLimiter // Rate limits consistency checks
consistencySem *quotapool.IntPool // Limit concurrent consistency checks
metrics *StoreMetrics
intentResolver *intentresolver.IntentResolver
recoveryMgr txnrecovery.Manager
Expand Down Expand Up @@ -2090,9 +2089,6 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
rate := consistencyCheckRate.Get(&s.ClusterSettings().SV)
s.consistencyLimiter.UpdateLimit(quotapool.Limit(rate), rate*consistencyCheckRateBurstFactor)
})
s.consistencySem = quotapool.NewIntPool("concurrent async consistency checks",
consistencyCheckAsyncConcurrency)
s.stopper.AddCloser(s.consistencySem.Closer("stopper"))

// Set the started flag (for unittests).
atomic.StoreInt32(&s.started, 1)
Expand Down