From c49b1c39d68ce3fc37b8d6ca72c32332c8e8493d Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 24 Aug 2022 13:56:11 +0000 Subject: [PATCH 1/9] kvserver: cancel checksum collection context on quiescing Control request cancelation in the event of store stopper quiescing higher up the stack for convenience. Release justification: part of a performance improvement PR Release note: None --- pkg/kv/kvserver/replica_consistency.go | 3 --- pkg/kv/kvserver/replica_consistency_test.go | 11 ----------- pkg/kv/kvserver/stores_server.go | 2 ++ 3 files changed, 2 insertions(+), 14 deletions(-) diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index 5b9d0f3c4004..8f6fd205849f 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -521,9 +521,6 @@ func (r *Replica) checksumWait( ) (bool, error) { // Wait select { - case <-r.store.Stopper().ShouldQuiesce(): - return false, - errors.Errorf("store quiescing while waiting for compute checksum (ID = %s)", id) case <-ctx.Done(): return false, errors.Wrapf(ctx.Err(), "while waiting for compute checksum (ID = %s)", id) diff --git a/pkg/kv/kvserver/replica_consistency_test.go b/pkg/kv/kvserver/replica_consistency_test.go index 1d91a6321040..c554e52fc79b 100644 --- a/pkg/kv/kvserver/replica_consistency_test.go +++ b/pkg/kv/kvserver/replica_consistency_test.go @@ -113,17 +113,6 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) { t.Fatal(err) } require.Nil(t, rc.Checksum) - - // Need to reset the context, since we deadlined it above. - ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - // Next condition, node should quiesce. - tc.repl.store.Stopper().Quiesce(ctx) - rc, err = tc.repl.getChecksum(ctx, uuid.FastMakeV4()) - if !testutils.IsError(err, "store quiescing") { - t.Fatal(err) - } - require.Nil(t, rc.Checksum) } // TestReplicaChecksumSHA512 checks that a given dataset produces the expected diff --git a/pkg/kv/kvserver/stores_server.go b/pkg/kv/kvserver/stores_server.go index 73fd82dfe8c8..8a358277f2ef 100644 --- a/pkg/kv/kvserver/stores_server.go +++ b/pkg/kv/kvserver/stores_server.go @@ -56,6 +56,8 @@ func (is Server) CollectChecksum( resp := &CollectChecksumResponse{} err := is.execStoreCommand(ctx, req.StoreRequestHeader, func(ctx context.Context, s *Store) error { + ctx, cancel := s.stopper.WithCancelOnQuiesce(ctx) + defer cancel() r, err := s.GetReplica(req.RangeID) if err != nil { return err From 0aed4a25cc3a22c3c2da451e5d81b82ac3f6d4c7 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 1 Sep 2022 10:22:06 +0000 Subject: [PATCH 2/9] kvserver: cancel checksum computation on quiescing The checksum computation can take long, so if the store quiesces, it's better to cancel it. Release justification: part of a performance improvement PR Release note: None --- pkg/kv/kvserver/replica_consistency.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index 8f6fd205849f..8b1d811ec050 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -745,7 +745,6 @@ func (*Replica) sha512( } func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.ComputeChecksum) { - stopper := r.store.Stopper() now := timeutil.Now() r.mu.Lock() var notify chan struct{} @@ -798,7 +797,7 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co // // Don't use the proposal's context for this, as it likely to be canceled very // soon. - const taskName = "storage.Replica: computing checksum" + const taskName = "kvserver.Replica: computing checksum" sem := r.store.consistencySem if cc.Mode == roachpb.ChecksumMode_CHECK_STATS { // Stats-only checks are cheap, and the DistSender parallelizes these across @@ -806,11 +805,15 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co // they don't count towards the semaphore limit. sem = nil } - if err := stopper.RunAsyncTaskEx(r.AnnotateCtx(context.Background()), stop.TaskOpts{ + stopper := r.store.Stopper() + taskCtx, taskCancel := stopper.WithCancelOnQuiesce(r.AnnotateCtx(context.Background())) + if err := stopper.RunAsyncTaskEx(taskCtx, stop.TaskOpts{ TaskName: taskName, Sem: sem, WaitForSem: false, }, func(ctx context.Context) { + defer taskCancel() + if err := contextutil.RunWithTimeout(ctx, taskName, consistencyCheckAsyncTimeout, func(ctx context.Context) error { defer snap.Close() @@ -875,7 +878,8 @@ A file preventing this node from restarting was placed at: } }); err != nil { - defer snap.Close() + taskCancel() + snap.Close() log.Errorf(ctx, "could not run async checksum computation (ID = %s): %v", cc.ChecksumID, err) // Set checksum to nil. r.computeChecksumDone(ctx, cc.ChecksumID, nil, nil) From 324aabe1d5842bc39efa23dd207e61403970544a Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 24 Aug 2022 21:27:33 +0000 Subject: [PATCH 3/9] kvserver: return CollectChecksumResponse directly Release justification: part of a performance improvement PR Release note: None --- pkg/kv/kvserver/replica_consistency.go | 16 ++++++++-------- pkg/kv/kvserver/stores_server.go | 3 +-- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index 8b1d811ec050..abdd7d24c044 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -439,10 +439,10 @@ func (r *Replica) gcOldChecksumEntriesLocked(now time.Time) { } } -// getChecksum waits for the result of ComputeChecksum and returns it. -// It returns false if there is no checksum being computed for the id, -// or it has already been GCed. -func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (replicaChecksum, error) { +// getChecksum waits for the result of ComputeChecksum and returns it. Returns +// an error if there is no checksum being computed for the ID, it has already +// been GC-ed, or an error happened during the computation. +func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (CollectChecksumResponse, error) { now := timeutil.Now() r.mu.Lock() r.gcOldChecksumEntriesLocked(now) @@ -462,14 +462,14 @@ func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (replicaChecksu // Wait for the checksum to compute or at least to start. computed, err := r.checksumInitialWait(ctx, id, c.notify) if err != nil { - return replicaChecksum{}, err + return CollectChecksumResponse{}, err } // If the checksum started, but has not completed commit // to waiting the full deadline. if !computed { _, err = r.checksumWait(ctx, id, c.notify, nil) if err != nil { - return replicaChecksum{}, err + return CollectChecksumResponse{}, err } } @@ -483,9 +483,9 @@ func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (replicaChecksu // The latter case can occur when there's a version mismatch or, more generally, // when the (async) checksum computation fails. if !ok || c.Checksum == nil { - return replicaChecksum{}, errors.Errorf("no checksum found (ID = %s)", id) + return CollectChecksumResponse{}, errors.Errorf("no checksum found (ID = %s)", id) } - return c, nil + return c.CollectChecksumResponse, nil } // Waits for the checksum to be available or for the checksum to start computing. diff --git a/pkg/kv/kvserver/stores_server.go b/pkg/kv/kvserver/stores_server.go index 8a358277f2ef..d49f21f74b2d 100644 --- a/pkg/kv/kvserver/stores_server.go +++ b/pkg/kv/kvserver/stores_server.go @@ -62,11 +62,10 @@ func (is Server) CollectChecksum( if err != nil { return err } - c, err := r.getChecksum(ctx, req.ChecksumID) + ccr, err := r.getChecksum(ctx, req.ChecksumID) if err != nil { return err } - ccr := c.CollectChecksumResponse if !bytes.Equal(req.Checksum, ccr.Checksum) { // If this check is false, then this request is the replica carrying out // the consistency check. The message is spurious, but we want to leave the From 70ffed8b57148bbd0b1edb4ccf330c7d47773ffb Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 24 Aug 2022 22:29:58 +0000 Subject: [PATCH 4/9] kvserver: use require.* helpers in tests Release justification: part of a performance improvement PR Release note: None --- pkg/kv/kvserver/replica_consistency_test.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/pkg/kv/kvserver/replica_consistency_test.go b/pkg/kv/kvserver/replica_consistency_test.go index c554e52fc79b..c184f55d330e 100644 --- a/pkg/kv/kvserver/replica_consistency_test.go +++ b/pkg/kv/kvserver/replica_consistency_test.go @@ -56,9 +56,7 @@ func TestReplicaChecksumVersion(t *testing.T) { tc.repl.computeChecksumPostApply(ctx, cc) rc, err := tc.repl.getChecksum(ctx, cc.ChecksumID) if !matchingVersion { - if !testutils.IsError(err, "no checksum found") { - t.Fatal(err) - } + require.ErrorContains(t, err, "no checksum found") require.Nil(t, rc.Checksum) } else { require.NoError(t, err) @@ -87,10 +85,9 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) { tc.repl.mu.checksums[id] = replicaChecksum{notify: notify} tc.repl.mu.Unlock() rc, err := tc.repl.getChecksum(ctx, id) - if !testutils.IsError(err, "no checksum found") { - t.Fatal(err) - } + require.ErrorContains(t, err, "no checksum found") require.Nil(t, rc.Checksum) + // Next condition, the initial wait expires and checksum is not started, // this will take 10ms. id = uuid.FastMakeV4() @@ -98,10 +95,9 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) { tc.repl.mu.checksums[id] = replicaChecksum{notify: make(chan struct{})} tc.repl.mu.Unlock() rc, err = tc.repl.getChecksum(ctx, id) - if !testutils.IsError(err, "checksum computation did not start") { - t.Fatal(err) - } + require.ErrorContains(t, err, "checksum computation did not start") require.Nil(t, rc.Checksum) + // Next condition, initial wait expired and we found the started flag, // so next step is for context deadline. id = uuid.FastMakeV4() @@ -109,9 +105,7 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) { tc.repl.mu.checksums[id] = replicaChecksum{notify: make(chan struct{}), started: true} tc.repl.mu.Unlock() rc, err = tc.repl.getChecksum(ctx, id) - if !testutils.IsError(err, "context deadline exceeded") { - t.Fatal(err) - } + require.ErrorIs(t, err, context.DeadlineExceeded) require.Nil(t, rc.Checksum) } From f9bca26ad2ab3382e6057f86117f201be3ca43db Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 1 Sep 2022 11:44:13 +0000 Subject: [PATCH 5/9] kvserver: return error for better testability Return error from Replica.computeChecksumPostApply for a better introspection in tests, and to avoid lengthy log messages in favor of compositing them. Release justification: part of a performance improvement PR Release note: None --- pkg/kv/kvserver/replica_application_result.go | 8 +++++++- pkg/kv/kvserver/replica_consistency.go | 14 +++++++------- pkg/kv/kvserver/replica_consistency_test.go | 4 +++- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index 5707d7b15173..9c3baccfa656 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -20,6 +20,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" ) // replica_application_*.go files provide concrete implementations of @@ -334,7 +336,11 @@ func (r *Replica) handleVersionResult(ctx context.Context, version *roachpb.Vers } func (r *Replica) handleComputeChecksumResult(ctx context.Context, cc *kvserverpb.ComputeChecksum) { - r.computeChecksumPostApply(ctx, *cc) + err := r.computeChecksumPostApply(ctx, *cc) + // Don't log errors caused by the store quiescing, they are expected. + if err != nil && !errors.Is(err, stop.ErrUnavailable) { + log.Errorf(ctx, "failed to start ComputeChecksum task %s: %v", cc.ChecksumID, err) + } } func (r *Replica) handleChangeReplicasResult( diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index abdd7d24c044..e807b0213c7c 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -744,7 +744,9 @@ func (*Replica) sha512( return &result, nil } -func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.ComputeChecksum) { +func (r *Replica) computeChecksumPostApply( + ctx context.Context, cc kvserverpb.ComputeChecksum, +) error { now := timeutil.Now() r.mu.Lock() var notify chan struct{} @@ -766,11 +768,9 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co desc := *r.mu.state.Desc r.mu.Unlock() - if cc.Version != batcheval.ReplicaChecksumVersion { + if req, have := cc.Version, uint32(batcheval.ReplicaChecksumVersion); req != have { r.computeChecksumDone(ctx, cc.ChecksumID, nil, nil) - log.Infof(ctx, "incompatible ComputeChecksum versions (requested: %d, have: %d)", - cc.Version, batcheval.ReplicaChecksumVersion) - return + return errors.Errorf("incompatible versions (requested: %d, have: %d)", req, have) } // Caller is holding raftMu, so an engine snapshot is automatically @@ -880,8 +880,8 @@ A file preventing this node from restarting was placed at: }); err != nil { taskCancel() snap.Close() - log.Errorf(ctx, "could not run async checksum computation (ID = %s): %v", cc.ChecksumID, err) - // Set checksum to nil. r.computeChecksumDone(ctx, cc.ChecksumID, nil, nil) + return err } + return nil } diff --git a/pkg/kv/kvserver/replica_consistency_test.go b/pkg/kv/kvserver/replica_consistency_test.go index c184f55d330e..b51839a2d80b 100644 --- a/pkg/kv/kvserver/replica_consistency_test.go +++ b/pkg/kv/kvserver/replica_consistency_test.go @@ -53,12 +53,14 @@ func TestReplicaChecksumVersion(t *testing.T) { } else { cc.Version = 1 } - tc.repl.computeChecksumPostApply(ctx, cc) + taskErr := tc.repl.computeChecksumPostApply(ctx, cc) rc, err := tc.repl.getChecksum(ctx, cc.ChecksumID) if !matchingVersion { + require.ErrorContains(t, taskErr, "incompatible versions") require.ErrorContains(t, err, "no checksum found") require.Nil(t, rc.Checksum) } else { + require.NoError(t, taskErr) require.NoError(t, err) require.NotNil(t, rc.Checksum) } From d3bc24a13d2c93397168433d9cc3a6aebe178983 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 31 Aug 2022 13:19:39 +0000 Subject: [PATCH 6/9] kvserver: don't rewrite entries in Replica.checksums This refactoring makes replica.checksums map store *replicaChecksum pointers instead of values. This way we can modify the entries directly without doing another map roundtrip, which was error-prone and required handling situations when an entry was in the map and then disappeared. We still need to lock Replica.mu for reading/writing the entries, but we can avoid this too by putting sync primitives in the entry itself (see the follow-up commit). Release justification: part of a performance improvement PR Release note: None --- pkg/kv/kvserver/replica.go | 2 +- pkg/kv/kvserver/replica_consistency.go | 81 +++++++++------------ pkg/kv/kvserver/replica_consistency_test.go | 6 +- pkg/kv/kvserver/replica_init.go | 2 +- 4 files changed, 40 insertions(+), 51 deletions(-) diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index b840fd16d5ba..06b65bf23497 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -552,7 +552,7 @@ type Replica struct { lastUpdateTimes lastUpdateTimesMap // Computed checksum at a snapshot UUID. - checksums map[uuid.UUID]replicaChecksum + checksums map[uuid.UUID]*replicaChecksum // proposalQuota is the quota pool maintained by the lease holder where // incoming writes acquire quota from a fixed quota pool before going diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index e807b0213c7c..f3d23318b277 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -446,15 +446,15 @@ func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (CollectChecksu now := timeutil.Now() r.mu.Lock() r.gcOldChecksumEntriesLocked(now) - c, ok := r.mu.checksums[id] - if !ok { + c := r.mu.checksums[id] + if c == nil { + c = &replicaChecksum{notify: make(chan struct{})} // TODO(tbg): we need to unconditionally set a gcTimestamp or this // request can simply get stuck forever or cancel anyway and leak an // entry in r.mu.checksums. if d, dOk := ctx.Deadline(); dOk { c.gcTimestamp = d } - c.notify = make(chan struct{}) r.mu.checksums[id] = c } r.mu.Unlock() @@ -467,8 +467,7 @@ func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (CollectChecksu // If the checksum started, but has not completed commit // to waiting the full deadline. if !computed { - _, err = r.checksumWait(ctx, id, c.notify, nil) - if err != nil { + if _, err = r.checksumWait(ctx, id, c.notify, nil); err != nil { return CollectChecksumResponse{}, err } } @@ -476,13 +475,13 @@ func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (CollectChecksu if log.V(1) { log.Infof(ctx, "waited for compute checksum for %s", timeutil.Since(now)) } - r.mu.RLock() - c, ok = r.mu.checksums[id] - r.mu.RUnlock() - // If the checksum wasn't found or the checksum could not be computed, error out. - // The latter case can occur when there's a version mismatch or, more generally, - // when the (async) checksum computation fails. - if !ok || c.Checksum == nil { + + // If the checksum could not be computed, error out. This can occur when the + // async checksum computation task fails, e.g. if there is a version mismatch. + // + // Note: there is only one writer to replicaChecksum.CollectChecksumResponse, + // and we have synchronized with it above, so we don't have to lock r.mu here. + if c.Checksum == nil { return CollectChecksumResponse{}, errors.Errorf("no checksum found (ID = %s)", id) } return c.CollectChecksumResponse, nil @@ -543,30 +542,23 @@ func (r *Replica) checksumWait( // computeChecksumDone adds the computed checksum, sets a deadline for GCing the // checksum, and sends out a notification. func (r *Replica) computeChecksumDone( - ctx context.Context, id uuid.UUID, result *replicaHash, snapshot *roachpb.RaftSnapshotData, + c *replicaChecksum, result *replicaHash, snapshot *roachpb.RaftSnapshotData, ) { + // TODO(pavelkalinnikov): Communicate through the replicaChecksum directly, + // without using r.mu. E.g. send the CollectChecksumResponse through c.notify. r.mu.Lock() defer r.mu.Unlock() - if c, ok := r.mu.checksums[id]; ok { - if result != nil { - c.Checksum = result.SHA512[:] - delta := result.PersistedMS - delta.Subtract(result.RecomputedMS) - c.Delta = enginepb.MVCCStatsDelta(delta) - c.Persisted = result.PersistedMS - } - c.gcTimestamp = timeutil.Now().Add(replicaChecksumGCInterval) - c.Snapshot = snapshot - r.mu.checksums[id] = c - // Notify - close(c.notify) - } else { - // ComputeChecksum adds an entry into the map, and the entry can - // only be GCed once the gcTimestamp is set above. Something - // really bad happened. - log.Errorf(ctx, "no map entry for checksum (ID = %s)", id) + if result != nil { + c.Checksum = result.SHA512[:] + delta := result.PersistedMS + delta.Subtract(result.RecomputedMS) + c.Delta = enginepb.MVCCStatsDelta(delta) + c.Persisted = result.PersistedMS } + c.gcTimestamp = timeutil.Now().Add(replicaChecksumGCInterval) + c.Snapshot = snapshot + close(c.notify) // notify the receiver that the computation is done } type replicaHash struct { @@ -748,28 +740,25 @@ func (r *Replica) computeChecksumPostApply( ctx context.Context, cc kvserverpb.ComputeChecksum, ) error { now := timeutil.Now() + r.mu.Lock() - var notify chan struct{} - if c, ok := r.mu.checksums[cc.ChecksumID]; !ok { - // There is no record of this ID. Make a new notification. - notify = make(chan struct{}) - } else if !c.started { - // A CollectChecksumRequest is waiting on the existing notification. - notify = c.notify - } else { + c := r.mu.checksums[cc.ChecksumID] + // If there is no record of this ID, make a new one. + if c == nil { + c = &replicaChecksum{notify: make(chan struct{})} + r.mu.checksums[cc.ChecksumID] = c + } + if c.started { log.Fatalf(ctx, "attempted to apply ComputeChecksum command with duplicated checksum ID %s", cc.ChecksumID) } - + c.started = true r.gcOldChecksumEntriesLocked(now) - - // Create an entry with checksum == nil and gcTimestamp unset. - r.mu.checksums[cc.ChecksumID] = replicaChecksum{started: true, notify: notify} desc := *r.mu.state.Desc r.mu.Unlock() if req, have := cc.Version, uint32(batcheval.ReplicaChecksumVersion); req != have { - r.computeChecksumDone(ctx, cc.ChecksumID, nil, nil) + r.computeChecksumDone(c, nil, nil) return errors.Errorf("incompatible versions (requested: %d, have: %d)", req, have) } @@ -826,7 +815,7 @@ func (r *Replica) computeChecksumPostApply( if err != nil { result = nil } - r.computeChecksumDone(ctx, cc.ChecksumID, result, snapshot) + r.computeChecksumDone(c, result, snapshot) return err }, ); err != nil { @@ -880,7 +869,7 @@ A file preventing this node from restarting was placed at: }); err != nil { taskCancel() snap.Close() - r.computeChecksumDone(ctx, cc.ChecksumID, nil, nil) + r.computeChecksumDone(c, nil, nil) return err } return nil diff --git a/pkg/kv/kvserver/replica_consistency_test.go b/pkg/kv/kvserver/replica_consistency_test.go index b51839a2d80b..71f913d26d28 100644 --- a/pkg/kv/kvserver/replica_consistency_test.go +++ b/pkg/kv/kvserver/replica_consistency_test.go @@ -84,7 +84,7 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) { // Simple condition, the checksum is notified, but not computed. tc.repl.mu.Lock() - tc.repl.mu.checksums[id] = replicaChecksum{notify: notify} + tc.repl.mu.checksums[id] = &replicaChecksum{notify: notify} tc.repl.mu.Unlock() rc, err := tc.repl.getChecksum(ctx, id) require.ErrorContains(t, err, "no checksum found") @@ -94,7 +94,7 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) { // this will take 10ms. id = uuid.FastMakeV4() tc.repl.mu.Lock() - tc.repl.mu.checksums[id] = replicaChecksum{notify: make(chan struct{})} + tc.repl.mu.checksums[id] = &replicaChecksum{notify: make(chan struct{})} tc.repl.mu.Unlock() rc, err = tc.repl.getChecksum(ctx, id) require.ErrorContains(t, err, "checksum computation did not start") @@ -104,7 +104,7 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) { // so next step is for context deadline. id = uuid.FastMakeV4() tc.repl.mu.Lock() - tc.repl.mu.checksums[id] = replicaChecksum{notify: make(chan struct{}), started: true} + tc.repl.mu.checksums[id] = &replicaChecksum{notify: make(chan struct{}), started: true} tc.repl.mu.Unlock() rc, err = tc.repl.getChecksum(ctx, id) require.ErrorIs(t, err, context.DeadlineExceeded) diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index e6b8c4e48591..c25f4fde2817 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -100,7 +100,7 @@ func newUnloadedReplica( return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV) }) r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{} - r.mu.checksums = map[uuid.UUID]replicaChecksum{} + r.mu.checksums = map[uuid.UUID]*replicaChecksum{} r.mu.proposalBuf.Init((*replicaProposer)(r), tracker.NewLockfreeTracker(), r.Clock(), r.ClusterSettings()) r.mu.proposalBuf.testing.allowLeaseProposalWhenNotLeader = store.cfg.TestingKnobs.AllowLeaseRequestProposalsWhenNotLeader r.mu.proposalBuf.testing.allowLeaseTransfersWhenTargetMayNeedSnapshot = store.cfg.TestingKnobs.AllowLeaseTransfersWhenTargetMayNeedSnapshot From b321448bfa2b2ff4bd9c18a8b8f8fe827d09e152 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 31 Aug 2022 18:58:04 +0000 Subject: [PATCH 7/9] kvserver: refactor consistency check synchronization The replicaChecksum type helps bridging the checksum computation task and checksum collection request, for a certain computation ID. The code for the lifecycle of replicaChecksum is scattered across replica_consistency.go, and is difficult to understand. This commit simplifies the semantics of replicaChecksum by using Go channels, and stating the invariant on their state. It also minimizes the extensive Replica mutex locking (which is used by nearly every part of the system) by using channels for the communication between the task and the handler. It also resolves the TODO/bug in which some replicaChecksum entries could stay in the map forever if the request does not have a deadline. Release justification: part of a performance improvement PR Release note: None --- pkg/kv/kvserver/replica_consistency.go | 221 +++++++++----------- pkg/kv/kvserver/replica_consistency_test.go | 26 +-- 2 files changed, 117 insertions(+), 130 deletions(-) diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index f3d23318b277..b74717db3f53 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -69,15 +69,16 @@ var fatalOnStatsMismatch = envutil.EnvOrDefaultBool("COCKROACH_ENFORCE_CONSISTEN // replicaChecksum contains progress on a replica checksum computation. type replicaChecksum struct { - CollectChecksumResponse - // started is true if the checksum computation has started. - started bool - // If gcTimestamp is nonzero, GC this checksum after gcTimestamp. gcTimestamp - // is zero if and only if the checksum computation is in progress. + // started is closed when the checksum computation has started. + started chan struct{} + // result passes a single checksum computation result from the task. + // INVARIANT: result is written to or closed only if started is closed. + result chan CollectChecksumResponse + // A non-zero gcTimestamp means this tracker is "inactive", i.e. either the + // computation task completed/failed, or the checksum collection request + // returned. A tracker is deleted from the state when both participants have + // learnt about it, or gcTimestamp passes, whichever happens first. gcTimestamp time.Time - // This channel is closed after the checksum is computed, and is used - // as a notification. - notify chan struct{} } // CheckConsistency runs a consistency check on the range. It first applies a @@ -439,116 +440,103 @@ func (r *Replica) gcOldChecksumEntriesLocked(now time.Time) { } } -// getChecksum waits for the result of ComputeChecksum and returns it. Returns -// an error if there is no checksum being computed for the ID, it has already -// been GC-ed, or an error happened during the computation. -func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (CollectChecksumResponse, error) { - now := timeutil.Now() +// getReplicaChecksum returns replicaChecksum tracker for the given ID. +func (r *Replica) getReplicaChecksum(id uuid.UUID, now time.Time) *replicaChecksum { r.mu.Lock() + defer r.mu.Unlock() r.gcOldChecksumEntriesLocked(now) c := r.mu.checksums[id] if c == nil { - c = &replicaChecksum{notify: make(chan struct{})} - // TODO(tbg): we need to unconditionally set a gcTimestamp or this - // request can simply get stuck forever or cancel anyway and leak an - // entry in r.mu.checksums. - if d, dOk := ctx.Deadline(); dOk { - c.gcTimestamp = d + c = &replicaChecksum{ + started: make(chan struct{}), + result: make(chan CollectChecksumResponse, 1), // allow an async send } r.mu.checksums[id] = c } - r.mu.Unlock() - - // Wait for the checksum to compute or at least to start. - computed, err := r.checksumInitialWait(ctx, id, c.notify) - if err != nil { - return CollectChecksumResponse{}, err - } - // If the checksum started, but has not completed commit - // to waiting the full deadline. - if !computed { - if _, err = r.checksumWait(ctx, id, c.notify, nil); err != nil { - return CollectChecksumResponse{}, err - } - } - - if log.V(1) { - log.Infof(ctx, "waited for compute checksum for %s", timeutil.Since(now)) - } + return c +} - // If the checksum could not be computed, error out. This can occur when the - // async checksum computation task fails, e.g. if there is a version mismatch. - // - // Note: there is only one writer to replicaChecksum.CollectChecksumResponse, - // and we have synchronized with it above, so we don't have to lock r.mu here. - if c.Checksum == nil { - return CollectChecksumResponse{}, errors.Errorf("no checksum found (ID = %s)", id) +// gcReplicaChecksum schedules GC to remove the given replicaChecksum from the +// state after replicaChecksumGCInterval passes from now, or removes immediately +// if it is no longer active. +// +// Each user of replicaChecksum (at most two during its lifetime: sender and +// receiver; in any order) must call this method exactly once when they finish +// working on this tracker. +// +// The guarantee: both parties see the same tracker iff neither of them arrives +// at it (by calling getReplicaChecksum) later than GC timeout past the moment +// when the other left it (by calling gcReplicaChecksum). +func (r *Replica) gcReplicaChecksum(id uuid.UUID, rc *replicaChecksum) { + // TODO(pavelkalinnikov): Avoid locking, use atomics. + r.mu.Lock() + defer r.mu.Unlock() + // If the tracker is inactive (GC is already scheduled) then the counterparty + // has abandoned this tracker, and will not come back to it. Remove it then. + if !rc.gcTimestamp.IsZero() { + delete(r.mu.checksums, id) + } else { // otherwise give the counterparty some time to see this tracker + rc.gcTimestamp = timeutil.Now().Add(replicaChecksumGCInterval) } - return c.CollectChecksumResponse, nil } -// Waits for the checksum to be available or for the checksum to start computing. -// If we waited for 10% of the deadline and it has not started, then it's -// unlikely to start because this replica is most likely being restored from -// snapshots. -func (r *Replica) checksumInitialWait( - ctx context.Context, id uuid.UUID, notify chan struct{}, -) (bool, error) { - d, dOk := ctx.Deadline() - // The max wait time should be 5 seconds, so we dont end up waiting for - // minutes for a huge range. - maxInitialWait := 5 * time.Second - var initialWait <-chan time.Time - if dOk { - duration := time.Duration(timeutil.Until(d).Nanoseconds() / 10) - if duration > maxInitialWait { - duration = maxInitialWait - } - initialWait = time.After(duration) - } else { - initialWait = time.After(maxInitialWait) +// getChecksum waits for the result of ComputeChecksum and returns it. Returns +// an error if there is no checksum being computed for the ID, it has already +// been GC-ed, or an error happened during the computation. +func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (CollectChecksumResponse, error) { + now := timeutil.Now() + c := r.getReplicaChecksum(id, now) + defer r.gcReplicaChecksum(id, c) + + // Wait for the checksum computation to start. + select { + case <-ctx.Done(): + return CollectChecksumResponse{}, + errors.Wrapf(ctx.Err(), "while waiting for compute checksum (ID = %s)", id) + case <-time.After(r.checksumInitialWait(ctx)): + return CollectChecksumResponse{}, + errors.Errorf("checksum computation did not start in time for (ID = %s)", id) + case <-c.started: + // Happy case, the computation has started. } - return r.checksumWait(ctx, id, notify, initialWait) -} -// checksumWait waits for the checksum to be available or for the computation -// to start within the initialWait time. The bool return flag is used to -// indicate if a checksum is available (true) or if the initial wait has expired -// and the caller should wait more, since the checksum computation started. -func (r *Replica) checksumWait( - ctx context.Context, id uuid.UUID, notify chan struct{}, initialWait <-chan time.Time, -) (bool, error) { - // Wait + // Wait for the computation result. select { case <-ctx.Done(): - return false, + return CollectChecksumResponse{}, errors.Wrapf(ctx.Err(), "while waiting for compute checksum (ID = %s)", id) - case <-initialWait: - { - r.mu.Lock() - started := r.mu.checksums[id].started - r.mu.Unlock() - if !started { - return false, - errors.Errorf("checksum computation did not start in time for (ID = %s)", id) - } - return false, nil + case c, ok := <-c.result: + if log.V(1) { + log.Infof(ctx, "waited for compute checksum for %s", timeutil.Since(now)) + } + if !ok || c.Checksum == nil { + return CollectChecksumResponse{}, errors.Errorf("no checksum found (ID = %s)", id) } - case <-notify: - return true, nil + return c, nil } } -// computeChecksumDone adds the computed checksum, sets a deadline for GCing the -// checksum, and sends out a notification. -func (r *Replica) computeChecksumDone( - c *replicaChecksum, result *replicaHash, snapshot *roachpb.RaftSnapshotData, -) { - // TODO(pavelkalinnikov): Communicate through the replicaChecksum directly, - // without using r.mu. E.g. send the CollectChecksumResponse through c.notify. - r.mu.Lock() - defer r.mu.Unlock() +// checksumInitialWait returns the amount of time to wait until the checksum +// computation has started. It is set to min of 5s and 10% of the remaining time +// in the passed-in context (if it has a deadline). +// +// If it takes longer, chances are that the replica is being restored from +// snapshots, or otherwise too busy to handle this request soon. +func (*Replica) checksumInitialWait(ctx context.Context) time.Duration { + wait := 5 * time.Second + if d, ok := ctx.Deadline(); ok { + if dur := time.Duration(timeutil.Until(d).Nanoseconds() / 10); dur < wait { + wait = dur + } + } + return wait +} +// computeChecksumDone sends the checksum computation result to the receiver. +func (*Replica) computeChecksumDone( + rc *replicaChecksum, result *replicaHash, snapshot *roachpb.RaftSnapshotData, +) { + c := CollectChecksumResponse{Snapshot: snapshot} if result != nil { c.Checksum = result.SHA512[:] delta := result.PersistedMS @@ -556,9 +544,12 @@ func (r *Replica) computeChecksumDone( c.Delta = enginepb.MVCCStatsDelta(delta) c.Persisted = result.PersistedMS } - c.gcTimestamp = timeutil.Now().Add(replicaChecksumGCInterval) - c.Snapshot = snapshot - close(c.notify) // notify the receiver that the computation is done + + // Sending succeeds because the channel is buffered, and there is at most one + // computeChecksumDone per replicaChecksum. In case of a bug, another writer + // closes the channel, so this send panics instead of deadlocking. By design. + rc.result <- c + close(rc.result) } type replicaHash struct { @@ -739,29 +730,23 @@ func (*Replica) sha512( func (r *Replica) computeChecksumPostApply( ctx context.Context, cc kvserverpb.ComputeChecksum, ) error { - now := timeutil.Now() - - r.mu.Lock() - c := r.mu.checksums[cc.ChecksumID] - // If there is no record of this ID, make a new one. - if c == nil { - c = &replicaChecksum{notify: make(chan struct{})} - r.mu.checksums[cc.ChecksumID] = c - } - if c.started { - log.Fatalf(ctx, "attempted to apply ComputeChecksum command with duplicated checksum ID %s", - cc.ChecksumID) - } - c.started = true - r.gcOldChecksumEntriesLocked(now) - desc := *r.mu.state.Desc - r.mu.Unlock() + // Note: all exit paths must call gcReplicaChecksum. + c := r.getReplicaChecksum(cc.ChecksumID, timeutil.Now()) + // The close panics if there was another attempt to start computation with + // this ID, but it does not happen since post-apply triggers are invoked at + // most once per Raft log entry per process, and the ChecksumID is unique. + close(c.started) if req, have := cc.Version, uint32(batcheval.ReplicaChecksumVersion); req != have { r.computeChecksumDone(c, nil, nil) + r.gcReplicaChecksum(cc.ChecksumID, c) return errors.Errorf("incompatible versions (requested: %d, have: %d)", req, have) } + // Capture the current range descriptor, as it may change by the time the + // async task below runs. + desc := *r.Desc() + // Caller is holding raftMu, so an engine snapshot is automatically // Raft-consistent (i.e. not in the middle of an AddSSTable). snap := r.store.engine.NewSnapshot() @@ -816,6 +801,7 @@ func (r *Replica) computeChecksumPostApply( result = nil } r.computeChecksumDone(c, result, snapshot) + r.gcReplicaChecksum(cc.ChecksumID, c) return err }, ); err != nil { @@ -870,6 +856,7 @@ A file preventing this node from restarting was placed at: taskCancel() snap.Close() r.computeChecksumDone(c, nil, nil) + r.gcReplicaChecksum(cc.ChecksumID, c) return err } return nil diff --git a/pkg/kv/kvserver/replica_consistency_test.go b/pkg/kv/kvserver/replica_consistency_test.go index 71f913d26d28..1c9b8bf04d00 100644 --- a/pkg/kv/kvserver/replica_consistency_test.go +++ b/pkg/kv/kvserver/replica_consistency_test.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/stretchr/testify/require" ) @@ -78,14 +79,11 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) { defer stopper.Stop(ctx) tc.Start(ctx, t, stopper) - id := uuid.FastMakeV4() - notify := make(chan struct{}) - close(notify) - // Simple condition, the checksum is notified, but not computed. - tc.repl.mu.Lock() - tc.repl.mu.checksums[id] = &replicaChecksum{notify: notify} - tc.repl.mu.Unlock() + id := uuid.FastMakeV4() + c := tc.repl.getReplicaChecksum(id, timeutil.Now()) + close(c.started) + close(c.result) rc, err := tc.repl.getChecksum(ctx, id) require.ErrorContains(t, err, "no checksum found") require.Nil(t, rc.Checksum) @@ -93,9 +91,6 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) { // Next condition, the initial wait expires and checksum is not started, // this will take 10ms. id = uuid.FastMakeV4() - tc.repl.mu.Lock() - tc.repl.mu.checksums[id] = &replicaChecksum{notify: make(chan struct{})} - tc.repl.mu.Unlock() rc, err = tc.repl.getChecksum(ctx, id) require.ErrorContains(t, err, "checksum computation did not start") require.Nil(t, rc.Checksum) @@ -103,9 +98,14 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) { // Next condition, initial wait expired and we found the started flag, // so next step is for context deadline. id = uuid.FastMakeV4() - tc.repl.mu.Lock() - tc.repl.mu.checksums[id] = &replicaChecksum{notify: make(chan struct{}), started: true} - tc.repl.mu.Unlock() + c = tc.repl.getReplicaChecksum(id, timeutil.Now()) + close(c.started) + rc, err = tc.repl.getChecksum(ctx, id) + require.ErrorIs(t, err, context.DeadlineExceeded) + require.Nil(t, rc.Checksum) + + // Context is canceled during the initial waiting. + id = uuid.FastMakeV4() rc, err = tc.repl.getChecksum(ctx, id) require.ErrorIs(t, err, context.DeadlineExceeded) require.Nil(t, rc.Checksum) From 301dd7d62ad962598db1b36d10f01a1d4de86220 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Mon, 22 Aug 2022 17:36:44 +0000 Subject: [PATCH 8/9] kvserver: stop/prevent consistency check on request cancel Consistency checks are initiated by ComputeChecksum command in the Raft log, and run until completion under a background context. The result is collected by the initiator via the CollectChecksum long poll. The task is synchronized with the collection handler via the map of replicaChecksum structs. When the CollectChecksum handler exits due to a canceled context (for example, the request timed out, or the remote caller crashed), the background task continues to run. If it was not running, it may start in the future. In both cases, the consistency checks pool (which has a limited size and processing rate) spends resources on running dangling checks, and rejects useful ones. This commit makes sure that abandoned checksum computation tasks are: - stopped if the waiting collection request is canceled - never started if there was a recent collection request that gave up When starting, the checksum computation task first checks whether the corresponding collection request has previously been abandoned. If so, the task terminates early. Otherwise it starts and sends a cancel func through the channel that it used to notify the collection handler, so that it can abort the task when it abandons the request. Release justification: performance and stability improvement Release note (bug fix): A consistency check is now skipped/stopped when the collection request is canceled before/while running the check computation. Previously such checks would start and run until completion, and, due to the limited size of the worker pool, prevent the useful checks from running. --- pkg/kv/kvserver/replica_consistency.go | 51 +++++++++++++-------- pkg/kv/kvserver/replica_consistency_test.go | 37 +++++++++++---- 2 files changed, 60 insertions(+), 28 deletions(-) diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index b74717db3f53..5f408c53bbed 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -69,8 +69,10 @@ var fatalOnStatsMismatch = envutil.EnvOrDefaultBool("COCKROACH_ENFORCE_CONSISTEN // replicaChecksum contains progress on a replica checksum computation. type replicaChecksum struct { - // started is closed when the checksum computation has started. - started chan struct{} + // started is closed when the checksum computation has started. If the start + // was successful, passes a function that can be used by the receiver to stop + // the computation, otherwise is closed immediately. + started chan context.CancelFunc // result passes a single checksum computation result from the task. // INVARIANT: result is written to or closed only if started is closed. result chan CollectChecksumResponse @@ -440,20 +442,21 @@ func (r *Replica) gcOldChecksumEntriesLocked(now time.Time) { } } -// getReplicaChecksum returns replicaChecksum tracker for the given ID. -func (r *Replica) getReplicaChecksum(id uuid.UUID, now time.Time) *replicaChecksum { +// getReplicaChecksum returns replicaChecksum tracker for the given ID, and +// whether it is still active (i.e. has a zero GC timestamp). +func (r *Replica) getReplicaChecksum(id uuid.UUID, now time.Time) (*replicaChecksum, bool) { r.mu.Lock() defer r.mu.Unlock() r.gcOldChecksumEntriesLocked(now) c := r.mu.checksums[id] if c == nil { c = &replicaChecksum{ - started: make(chan struct{}), + started: make(chan context.CancelFunc, 1), // allow an async send result: make(chan CollectChecksumResponse, 1), // allow an async send } r.mu.checksums[id] = c } - return c + return c, c.gcTimestamp.IsZero() } // gcReplicaChecksum schedules GC to remove the given replicaChecksum from the @@ -485,10 +488,11 @@ func (r *Replica) gcReplicaChecksum(id uuid.UUID, rc *replicaChecksum) { // been GC-ed, or an error happened during the computation. func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (CollectChecksumResponse, error) { now := timeutil.Now() - c := r.getReplicaChecksum(id, now) + c, _ := r.getReplicaChecksum(id, now) defer r.gcReplicaChecksum(id, c) // Wait for the checksum computation to start. + var taskCancel context.CancelFunc select { case <-ctx.Done(): return CollectChecksumResponse{}, @@ -496,13 +500,17 @@ func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (CollectChecksu case <-time.After(r.checksumInitialWait(ctx)): return CollectChecksumResponse{}, errors.Errorf("checksum computation did not start in time for (ID = %s)", id) - case <-c.started: + case taskCancel = <-c.started: // Happy case, the computation has started. } + if taskCancel == nil { // but it may have started with an error + return CollectChecksumResponse{}, errors.Errorf("checksum task failed to start (ID = %s)", id) + } // Wait for the computation result. select { case <-ctx.Done(): + taskCancel() return CollectChecksumResponse{}, errors.Wrapf(ctx.Err(), "while waiting for compute checksum (ID = %s)", id) case c, ok := <-c.result: @@ -729,17 +737,19 @@ func (*Replica) sha512( func (r *Replica) computeChecksumPostApply( ctx context.Context, cc kvserverpb.ComputeChecksum, -) error { +) (err error) { // Note: all exit paths must call gcReplicaChecksum. - c := r.getReplicaChecksum(cc.ChecksumID, timeutil.Now()) - // The close panics if there was another attempt to start computation with - // this ID, but it does not happen since post-apply triggers are invoked at - // most once per Raft log entry per process, and the ChecksumID is unique. - close(c.started) - + c, active := r.getReplicaChecksum(cc.ChecksumID, timeutil.Now()) + defer func() { + if err != nil { + close(c.started) // send nothing to signal that the task failed to start + r.gcReplicaChecksum(cc.ChecksumID, c) + } + }() + if !active { + return errors.New("checksum collection request gave up") + } if req, have := cc.Version, uint32(batcheval.ReplicaChecksumVersion); req != have { - r.computeChecksumDone(c, nil, nil) - r.gcReplicaChecksum(cc.ChecksumID, c) return errors.Errorf("incompatible versions (requested: %d, have: %d)", req, have) } @@ -787,6 +797,11 @@ func (r *Replica) computeChecksumPostApply( WaitForSem: false, }, func(ctx context.Context) { defer taskCancel() + // There is only one writer to c.started (this task), so this doesn't block. + // But if by mistake there is another writer, one of us closes the channel + // eventually, and other send/close ops will crash. This is by design. + c.started <- taskCancel + close(c.started) if err := contextutil.RunWithTimeout(ctx, taskName, consistencyCheckAsyncTimeout, func(ctx context.Context) error { @@ -855,8 +870,6 @@ A file preventing this node from restarting was placed at: }); err != nil { taskCancel() snap.Close() - r.computeChecksumDone(c, nil, nil) - r.gcReplicaChecksum(cc.ChecksumID, c) return err } return nil diff --git a/pkg/kv/kvserver/replica_consistency_test.go b/pkg/kv/kvserver/replica_consistency_test.go index 1c9b8bf04d00..638e5b2bd1e9 100644 --- a/pkg/kv/kvserver/replica_consistency_test.go +++ b/pkg/kv/kvserver/replica_consistency_test.go @@ -58,7 +58,7 @@ func TestReplicaChecksumVersion(t *testing.T) { rc, err := tc.repl.getChecksum(ctx, cc.ChecksumID) if !matchingVersion { require.ErrorContains(t, taskErr, "incompatible versions") - require.ErrorContains(t, err, "no checksum found") + require.ErrorContains(t, err, "checksum task failed to start") require.Nil(t, rc.Checksum) } else { require.NoError(t, taskErr) @@ -79,26 +79,44 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) { defer stopper.Stop(ctx) tc.Start(ctx, t, stopper) - // Simple condition, the checksum is notified, but not computed. + requireChecksumTaskNotStarted := func(id uuid.UUID) { + require.ErrorContains(t, + tc.repl.computeChecksumPostApply(context.Background(), kvserverpb.ComputeChecksum{ + ChecksumID: id, + Mode: roachpb.ChecksumMode_CHECK_FULL, + Version: batcheval.ReplicaChecksumVersion, + }), "checksum collection request gave up") + } + + // Checksum computation failed to start. id := uuid.FastMakeV4() - c := tc.repl.getReplicaChecksum(id, timeutil.Now()) + c, _ := tc.repl.getReplicaChecksum(id, timeutil.Now()) close(c.started) - close(c.result) rc, err := tc.repl.getChecksum(ctx, id) + require.ErrorContains(t, err, "checksum task failed to start") + require.Nil(t, rc.Checksum) + + // Checksum computation started, but failed. + id = uuid.FastMakeV4() + c, _ = tc.repl.getReplicaChecksum(id, timeutil.Now()) + c.started <- func() {} + close(c.started) + close(c.result) + rc, err = tc.repl.getChecksum(ctx, id) require.ErrorContains(t, err, "no checksum found") require.Nil(t, rc.Checksum) - // Next condition, the initial wait expires and checksum is not started, - // this will take 10ms. + // The initial wait for the task start expires. This will take 10ms. id = uuid.FastMakeV4() rc, err = tc.repl.getChecksum(ctx, id) require.ErrorContains(t, err, "checksum computation did not start") require.Nil(t, rc.Checksum) + requireChecksumTaskNotStarted(id) - // Next condition, initial wait expired and we found the started flag, - // so next step is for context deadline. + // The computation has started, but the request context timed out. id = uuid.FastMakeV4() - c = tc.repl.getReplicaChecksum(id, timeutil.Now()) + c, _ = tc.repl.getReplicaChecksum(id, timeutil.Now()) + c.started <- func() {} close(c.started) rc, err = tc.repl.getChecksum(ctx, id) require.ErrorIs(t, err, context.DeadlineExceeded) @@ -109,6 +127,7 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) { rc, err = tc.repl.getChecksum(ctx, id) require.ErrorIs(t, err, context.DeadlineExceeded) require.Nil(t, rc.Checksum) + requireChecksumTaskNotStarted(id) } // TestReplicaChecksumSHA512 checks that a given dataset produces the expected From c7e129f2950bce50a869827575d15560d5b14ba1 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 23 Aug 2022 11:40:59 +0000 Subject: [PATCH 9/9] kvserver: send all consistency requests in parallel Currently, the replica initiating the consistency check sends a collection request to itself first, and only then to other replicas in parallel. This results in substantial asynchrony on the receiving replica, between the incoming CollectChecksum request and the checksum computation task started by the ComputeChecksum message. The current solution to that is keeping the checksum computation results in memory for replicaChecksumGCInterval to return them to late arriving requests. The reason why the first checksum collection blocks the others is that it computes the "master checksum", which is then added to all other requests. However, this field is only used by the receiving end to log an inconsistency error. The actual killing of this replica happens on the second phase of the protocol, after the initiating replica commits another Raft message with the Terminate field populated. So, there is no strong reason to keep this blocking behaviour. If the initiating replica fails to compute its local checksum, it does not send requests to other replicas. This is problematic because the checksum tasks will be run on all replicas, which opens the possibility for accumulating many such dangling checks. This commit makes all the checksum collection requests parallel. Benefits: - There is less asynchrony between the sender and receiver, so we can drop the GC (in follow-up commits), and require an incoming request before starting the checksum computation task. - All the outgoing collection requests are now explicitly canceled if the local computation fails. This way, the cancelation signal has more chance to propagate to all replicas and cancel the tasks that were started anyway. Release justification: performance and stability improvement Release note (bug fix): Consistency checks are now sent to all replicas in parallel, previously it would be blocked on processing the local replica first. This a) reduces the latency of one check 2x, and b) allows better propagation of the cancelation signal which results in fewer abandoned tasks on remote replicas, and more resources spent on useful checks. --- pkg/kv/kvserver/api.proto | 9 ++- pkg/kv/kvserver/consistency_queue.go | 9 ++- pkg/kv/kvserver/replica_consistency.go | 101 ++++++++++++------------- pkg/kv/kvserver/stores_server.go | 16 +--- 4 files changed, 61 insertions(+), 74 deletions(-) diff --git a/pkg/kv/kvserver/api.proto b/pkg/kv/kvserver/api.proto index 9c50cd5ce62c..18453f87763c 100644 --- a/pkg/kv/kvserver/api.proto +++ b/pkg/kv/kvserver/api.proto @@ -36,15 +36,18 @@ message CollectChecksumRequest { bytes checksum_id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "ChecksumID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"]; - bytes checksum = 4; + reserved 4; + // If true then the response must include the snapshot of the data from which + // the checksum is computed. + bool with_snapshot = 5; } message CollectChecksumResponse { // The checksum is the sha512 hash of the requested computation. It is empty // if the computation failed. bytes checksum = 1; - // snapshot is set if the roachpb.ComputeChecksumRequest had snapshot = true - // and the response checksum is different from the request checksum. + // snapshot is set if the with_snapshot in CollectChecksumRequest is true. For + // example, it can be set by the caller when it has detected an inconsistency. // // TODO(tschottdorf): with larger ranges, this is no longer tenable. // See https://github.com/cockroachdb/cockroach/issues/21128. diff --git a/pkg/kv/kvserver/consistency_queue.go b/pkg/kv/kvserver/consistency_queue.go index 771966df1a11..0a40db7aa21c 100644 --- a/pkg/kv/kvserver/consistency_queue.go +++ b/pkg/kv/kvserver/consistency_queue.go @@ -66,8 +66,9 @@ const consistencyCheckRateMinWait = 100 * time.Millisecond // replication factor of 7 could run 7 concurrent checks on every node. // // Note that checksum calculations below Raft are not tied to the caller's -// context (especially on followers), and will continue to run even after the -// caller has given up on them, which may cause them to build up. +// context, and may continue to run even after the caller has given up on them, +// which may cause them to build up. Although we do best effort to cancel the +// running task on the receiving end when the incoming request is aborted. // // CHECK_STATS checks do not count towards this limit, as they are cheap and the // DistSender will parallelize them across all ranges (notably when calling @@ -76,8 +77,8 @@ const consistencyCheckAsyncConcurrency = 7 // consistencyCheckAsyncTimeout is a below-Raft timeout for asynchronous // consistency check calculations. These are not tied to the caller's context, -// and thus will continue to run even after the caller has given up on them, so -// we give them an upper timeout to prevent them from running forever. +// and thus may continue to run even if the caller has given up on them, so we +// give them an upper timeout to prevent them from running forever. const consistencyCheckAsyncTimeout = time.Hour var testingAggressiveConsistencyChecks = envutil.EnvOrDefaultBool("COCKROACH_CONSISTENCY_AGGRESSIVE", false) diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index 5f408c53bbed..f442b1e680bb 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -15,7 +15,6 @@ import ( "crypto/sha512" "encoding/binary" "fmt" - "sort" "sync" "time" @@ -47,10 +46,15 @@ import ( // How long to keep consistency checker checksums in-memory for collection. // Typically a long-poll waits for the result of the computation, so it's almost -// immediately collected. However, the consistency checker synchronously -// collects the first replica's checksum before all others, so if the first one -// is slow the checksum may not be collected right away, and that first -// consistency check can take a long time due to rate limiting and range size. +// immediately collected. +// +// Up to 22.1, the consistency check initiator used to synchronously collect the +// first replica's checksum before all others, so checksum collection requests +// could arrive late if the first one was slow. Since 22.2, all requests are +// parallel and likely arrive quickly. +// +// TODO(pavelkalinnikov): Consider removing GC behaviour in 23.1+, when all the +// incoming requests are from 22.2+ nodes (hence arrive timely). const replicaChecksumGCInterval = time.Hour // fatalOnStatsMismatch, if true, turns stats mismatches into fatal errors. A @@ -87,10 +91,10 @@ type replicaChecksum struct { // ComputeChecksum through Raft and then issues CollectChecksum commands to the // other replicas. These are inspected and a CheckConsistencyResponse is assembled. // -// When args.Mode is CHECK_VIA_QUEUE and an inconsistency is detected and no -// diff was requested, the consistency check will be re-run to collect a diff, -// which is then printed before calling `log.Fatal`. This behavior should be -// lifted to the consistency checker queue in the future. +// When req.Mode is CHECK_VIA_QUEUE and an inconsistency is detected, the +// consistency check will be re-run to collect a diff, which is then printed +// before calling `log.Fatal`. This behavior should be lifted to the consistency +// checker queue in the future. func (r *Replica) CheckConsistency( ctx context.Context, req roachpb.CheckConsistencyRequest, ) (roachpb.CheckConsistencyResponse, *roachpb.Error) { @@ -328,7 +332,7 @@ type ConsistencyCheckResult struct { } func (r *Replica) collectChecksumFromReplica( - ctx context.Context, replica roachpb.ReplicaDescriptor, id uuid.UUID, checksum []byte, + ctx context.Context, replica roachpb.ReplicaDescriptor, id uuid.UUID, withSnap bool, ) (CollectChecksumResponse, error) { conn, err := r.store.cfg.NodeDialer.Dial(ctx, replica.NodeID, rpc.DefaultClass) if err != nil { @@ -340,7 +344,7 @@ func (r *Replica) collectChecksumFromReplica( StoreRequestHeader: StoreRequestHeader{NodeID: replica.NodeID, StoreID: replica.StoreID}, RangeID: r.RangeID, ChecksumID: id, - Checksum: checksum, + WithSnapshot: withSnap, } resp, err := client.CollectChecksum(ctx, req) if err != nil { @@ -364,70 +368,61 @@ func (r *Replica) runConsistencyCheck( } ccRes := res.(*roachpb.ComputeChecksumResponse) - var orderedReplicas []roachpb.ReplicaDescriptor - { - desc := r.Desc() - localReplica, err := r.GetReplicaDescriptor() - if err != nil { - return nil, errors.Wrap(err, "could not get replica descriptor") - } - - // Move the local replica to the front (which makes it the "master" - // we're comparing against). - orderedReplicas = append(orderedReplicas, desc.Replicas().Descriptors()...) - - sort.Slice(orderedReplicas, func(i, j int) bool { - return orderedReplicas[i] == localReplica - }) + replSet := r.Desc().Replicas() + localReplica, found := replSet.GetReplicaDescriptorByID(r.replicaID) + if !found { + return nil, errors.New("could not get local replica descriptor") } + replicas := replSet.Descriptors() + + resultCh := make(chan ConsistencyCheckResult, len(replicas)) + results := make([]ConsistencyCheckResult, 0, len(replicas)) - resultCh := make(chan ConsistencyCheckResult, len(orderedReplicas)) - var results []ConsistencyCheckResult var wg sync.WaitGroup + ctx, cancel := context.WithCancel(ctx) + + defer close(resultCh) // close the channel when + defer wg.Wait() // writers have terminated + defer cancel() // but cancel them first + // P.S. Have you noticed the Haiku? - for _, replica := range orderedReplicas { + for _, replica := range replicas { wg.Add(1) replica := replica // per-iteration copy for the goroutine if err := r.store.Stopper().RunAsyncTask(ctx, "storage.Replica: checking consistency", func(ctx context.Context) { defer wg.Done() - - var masterChecksum []byte - if len(results) > 0 { - masterChecksum = results[0].Response.Checksum - } - resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID, masterChecksum) + resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID, req.Snapshot) resultCh <- ConsistencyCheckResult{ Replica: replica, Response: resp, Err: err, } - }); err != nil { + }, + ); err != nil { + // If we can't start tasks, the node is likely draining. Return the error + // verbatim, after all the started tasks are stopped. wg.Done() - // If we can't start tasks, the node is likely draining. Just return the error verbatim. return nil, err } + } - // Collect the master result eagerly so that we can send a SHA in the - // remaining requests (this is used for logging inconsistencies on the - // remote nodes only). - if len(results) == 0 { - wg.Wait() - result := <-resultCh + // Collect the results from all replicas, while the tasks are running. + for result := range resultCh { + results = append(results, result) + if result.Replica.IsSame(localReplica) { + // If we can't compute the local checksum, give up. This will cancel all + // the outstanding requests, and wait for the tasks above to terminate. if err := result.Err; err != nil { - // If we can't compute the local checksum, give up. return nil, errors.Wrap(err, "computing own checksum") } - results = append(results, result) + // Put the local replica first in the list. + results[0], results[len(results)-1] = results[len(results)-1], results[0] + } + // If it was the last request, don't wait on the channel anymore. + if len(results) == len(replicas) { + break } - } - - wg.Wait() - close(resultCh) - - // Collect the remaining results. - for result := range resultCh { - results = append(results, result) } return results, nil diff --git a/pkg/kv/kvserver/stores_server.go b/pkg/kv/kvserver/stores_server.go index d49f21f74b2d..f3154aca6842 100644 --- a/pkg/kv/kvserver/stores_server.go +++ b/pkg/kv/kvserver/stores_server.go @@ -11,7 +11,6 @@ package kvserver import ( - "bytes" "context" "time" @@ -19,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" - "github.com/cockroachdb/redact" ) // Server implements PerReplicaServer. @@ -53,7 +51,7 @@ func (is Server) execStoreCommand( func (is Server) CollectChecksum( ctx context.Context, req *CollectChecksumRequest, ) (*CollectChecksumResponse, error) { - resp := &CollectChecksumResponse{} + var resp *CollectChecksumResponse err := is.execStoreCommand(ctx, req.StoreRequestHeader, func(ctx context.Context, s *Store) error { ctx, cancel := s.stopper.WithCancelOnQuiesce(ctx) @@ -66,17 +64,7 @@ func (is Server) CollectChecksum( if err != nil { return err } - if !bytes.Equal(req.Checksum, ccr.Checksum) { - // If this check is false, then this request is the replica carrying out - // the consistency check. The message is spurious, but we want to leave the - // snapshot (if present) intact. - if len(req.Checksum) > 0 { - log.Errorf(ctx, "consistency check failed on range r%d: expected checksum %x, got %x", - req.RangeID, redact.Safe(req.Checksum), redact.Safe(ccr.Checksum)) - // Leave resp.Snapshot alone so that the caller will receive what's - // in it (if anything). - } - } else { + if !req.WithSnapshot { ccr.Snapshot = nil } resp = &ccr