Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions pkg/ccl/changefeedccl/changefeedbase/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@ func TestAsTerminalError(t *testing.T) {

// Regardless of the state of the node drain, or the type of error,
// context error takes precedence.
require.Regexp(t, context.Canceled,
changefeedbase.AsTerminalError(canceledCtx, nodeIsDraining, errors.New("ignored")))
require.Regexp(t, context.Canceled,
changefeedbase.AsTerminalError(canceledCtx, nodeIsNotDraining, errors.New("ignored")))
require.Regexp(t, context.Canceled.Error(),
changefeedbase.AsTerminalError(canceledCtx, nodeIsDraining, errors.New("ignored")).Error())
require.Regexp(t, context.Canceled.Error(),
changefeedbase.AsTerminalError(canceledCtx, nodeIsNotDraining, errors.New("ignored")).Error())
})

t.Run("node drain marked as job retry", func(t *testing.T) {
cause := errors.New("some error happened")
termErr := changefeedbase.AsTerminalError(context.Background(), nodeIsDraining, cause)
require.Regexp(t, cause.Error(), termErr)
require.Contains(t, cause.Error(), termErr.Error())
require.True(t, jobs.IsRetryJobError(termErr))
})

Expand All @@ -58,19 +58,19 @@ func TestAsTerminalError(t *testing.T) {
cause := changefeedbase.WithTerminalError(
changefeedbase.MarkRetryableError(errors.New("confusing error")))
termErr := changefeedbase.AsTerminalError(context.Background(), nodeIsNotDraining, cause)
require.Regexp(t, cause.Error(), termErr)
require.Contains(t, cause.Error(), termErr.Error())
})

t.Run("assertion failures are terminal", func(t *testing.T) {
// Assertion failures are terminal, even if marked as retry-able.
cause := changefeedbase.MarkRetryableError(errors.AssertionFailedf("though shall not pass"))
termErr := changefeedbase.AsTerminalError(context.Background(), nodeIsNotDraining, cause)
require.Regexp(t, cause.Error(), termErr)
require.Contains(t, cause.Error(), termErr.Error())
})

t.Run("gc error is terminal", func(t *testing.T) {
cause := changefeedbase.MarkRetryableError(&kvpb.BatchTimestampBeforeGCError{})
termErr := changefeedbase.AsTerminalError(context.Background(), nodeIsNotDraining, cause)
require.Regexp(t, cause.Error(), termErr)
require.Contains(t, cause.Error(), termErr.Error())
})
}
6 changes: 5 additions & 1 deletion pkg/kv/kvpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1365,7 +1365,11 @@ func (e *BatchTimestampBeforeGCError) Error() string {
}

func (e *BatchTimestampBeforeGCError) SafeFormatError(p errors.Printer) (next error) {
p.Printf("batch timestamp %v must be after replica GC threshold %v", e.Timestamp, e.Threshold)
p.Printf(
"batch timestamp %v must be after replica GC threshold %v (r%d: %s)",
e.Timestamp, e.Threshold, e.RangeID,
roachpb.RSpan{Key: []byte(e.StartKey), EndKey: []byte(e.EndKey)},
)
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvpb/errors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,11 @@ message BatchTimestampBeforeGCError {
// that has been marked as excluded from a backup via
// `ALTER TABLE ... SET (exclude_data_from_backup = true)`.
optional bool data_excluded_from_backup = 3 [(gogoproto.nullable) = false];
optional int64 range_id = 4 [(gogoproto.nullable) = false,
(gogoproto.customname) = "RangeID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeID"];
optional bytes start_key = 5 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"];
optional bytes end_key = 6 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"];
}

// A MVCCHistoryMutationError indicates that MVCC history was unexpectedly
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvpb/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func TestErrorRedaction(t *testing.T) {
},
{
err: &BatchTimestampBeforeGCError{},
expect: "batch timestamp 0,0 must be after replica GC threshold 0,0",
expect: "batch timestamp 0,0 must be after replica GC threshold 0,0 (r0: ‹/Min›)",
},
{
err: &TxnAlreadyEncounteredErrorError{},
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/kvserverbase/forced_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ func CheckForcedErr(
ForcedError: kvpb.NewError(&kvpb.BatchTimestampBeforeGCError{
Timestamp: wts,
Threshold: *replicaState.GCThreshold,
RangeID: replicaState.Desc.RangeID,
StartKey: replicaState.Desc.StartKey.AsRawKey(),
EndKey: replicaState.Desc.EndKey.AsRawKey(),
}),
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2011,10 +2011,14 @@ func (r *Replica) checkTSAboveGCThresholdRLocked(
if threshold.Less(ts) {
return nil
}
desc := r.descRLocked()
return &kvpb.BatchTimestampBeforeGCError{
Timestamp: ts,
Threshold: threshold,
DataExcludedFromBackup: r.excludeReplicaFromBackupRLocked(ctx, rspan),
RangeID: desc.RangeID,
StartKey: desc.StartKey.AsRawKey(),
EndKey: desc.EndKey.AsRawKey(),
}
}

Expand Down