Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/batcheval/declare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ func TestRequestsSerializeWithAllKeys(t *testing.T) {
if err != nil {
t.Error(err)
}
if !allLatchSpans.Intersects(&otherLatchSpans) {
checker := spanset.NewSpanChecker(&allLatchSpans)
if !checker.Intersects(&otherLatchSpans) {
t.Errorf("%s does not serialize with declareAllKeys", method)
}
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ func NewReplicaEvalContext(
rec = newEvalContextImpl(ctx, r, requiresClosedTSOlderThanStorageSnap, ah)
if util.RaceEnabled {
return &SpanSetReplicaEvalContext{
i: rec,
ss: *ss,
i: rec,
checker: *spanset.NewSpanChecker(ss),
}
}
return rec
Expand Down
18 changes: 9 additions & 9 deletions pkg/kv/kvserver/replica_eval_context_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
// ReplicaEvalContext which verifies that access to state is registered in the
// SpanSet if one is given.
type SpanSetReplicaEvalContext struct {
i batcheval.EvalContext
ss spanset.SpanSet
i batcheval.EvalContext
checker spanset.SpanChecker
}

var _ batcheval.EvalContext = &SpanSetReplicaEvalContext{}
Expand Down Expand Up @@ -103,7 +103,7 @@ func (rec *SpanSetReplicaEvalContext) IsFirstRange() bool {
// Desc returns the Replica's RangeDescriptor.
func (rec SpanSetReplicaEvalContext) Desc() *roachpb.RangeDescriptor {
desc := rec.i.Desc()
rec.ss.AssertAllowed(spanset.SpanReadOnly,
rec.checker.AssertAllowed(spanset.SpanReadOnly,
roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)},
)
return desc
Expand Down Expand Up @@ -144,7 +144,7 @@ func (rec SpanSetReplicaEvalContext) GetMaxSplitCPU(ctx context.Context) (float6
func (rec SpanSetReplicaEvalContext) CanCreateTxnRecord(
ctx context.Context, txnID uuid.UUID, txnKey []byte, txnMinTS hlc.Timestamp,
) (bool, kvpb.TransactionAbortedReason) {
rec.ss.AssertAllowed(spanset.SpanReadOnly,
rec.checker.AssertAllowed(spanset.SpanReadOnly,
roachpb.Span{Key: keys.TransactionKey(txnKey, txnID)},
)
return rec.i.CanCreateTxnRecord(ctx, txnID, txnKey, txnMinTS)
Expand All @@ -156,7 +156,7 @@ func (rec SpanSetReplicaEvalContext) CanCreateTxnRecord(
func (rec SpanSetReplicaEvalContext) MinTxnCommitTS(
ctx context.Context, txnID uuid.UUID, txnKey []byte,
) hlc.Timestamp {
rec.ss.AssertAllowed(spanset.SpanReadOnly,
rec.checker.AssertAllowed(spanset.SpanReadOnly,
roachpb.Span{Key: keys.TransactionKey(txnKey, txnID)},
)
return rec.i.MinTxnCommitTS(ctx, txnID, txnKey)
Expand All @@ -166,7 +166,7 @@ func (rec SpanSetReplicaEvalContext) MinTxnCommitTS(
// keys are garbage collected. Reads and writes at timestamps <= this time will
// not be served.
func (rec SpanSetReplicaEvalContext) GetGCThreshold() hlc.Timestamp {
rec.ss.AssertAllowed(spanset.SpanReadOnly,
rec.checker.AssertAllowed(spanset.SpanReadOnly,
roachpb.Span{Key: keys.RangeGCThresholdKey(rec.GetRangeID())},
)
return rec.i.GetGCThreshold()
Expand All @@ -190,7 +190,7 @@ func (rec SpanSetReplicaEvalContext) String() string {
func (rec SpanSetReplicaEvalContext) GetLastReplicaGCTimestamp(
ctx context.Context,
) (hlc.Timestamp, error) {
if err := rec.ss.CheckAllowed(spanset.SpanReadOnly,
if err := rec.checker.CheckAllowed(spanset.SpanReadOnly,
roachpb.Span{Key: keys.RangeLastReplicaGCTimestampKey(rec.GetRangeID())},
); err != nil {
return hlc.Timestamp{}, err
Expand Down Expand Up @@ -222,11 +222,11 @@ func (rec *SpanSetReplicaEvalContext) GetCurrentReadSummary(ctx context.Context)
// To capture a read summary over the range, all keys must be latched for
// writing to prevent any concurrent reads or writes.
desc := rec.i.Desc()
rec.ss.AssertAllowed(spanset.SpanReadWrite, roachpb.Span{
rec.checker.AssertAllowed(spanset.SpanReadWrite, roachpb.Span{
Key: keys.MakeRangeKeyPrefix(desc.StartKey),
EndKey: keys.MakeRangeKeyPrefix(desc.EndKey),
})
rec.ss.AssertAllowed(spanset.SpanReadWrite, roachpb.Span{
rec.checker.AssertAllowed(spanset.SpanReadWrite, roachpb.Span{
Key: desc.StartKey.AsRawKey(),
EndKey: desc.EndKey.AsRawKey(),
})
Expand Down
9 changes: 7 additions & 2 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2722,6 +2722,7 @@ func TestReplicaLatchingSplitDeclaresWrites(t *testing.T) {
0,
)
require.NoError(t, err)
checker := spanset.NewSpanChecker(&spans)
for _, tc := range []struct {
access spanset.SpanAccess
key roachpb.Key
Expand All @@ -2732,7 +2733,7 @@ func TestReplicaLatchingSplitDeclaresWrites(t *testing.T) {
{spanset.SpanReadWrite, roachpb.Key("b"), false},
{spanset.SpanReadWrite, roachpb.Key("d"), true},
} {
err := spans.CheckAllowed(tc.access, roachpb.Span{Key: tc.key})
err := checker.CheckAllowed(tc.access, roachpb.Span{Key: tc.key})
if tc.expectAccess {
require.NoError(t, err)
} else {
Expand Down Expand Up @@ -5344,7 +5345,11 @@ func TestAbortSpanError(t *testing.T) {
}

ec := newEvalContextImpl(ctx, tc.repl, false /* requireClosedTS */, kvpb.AdmissionHeader{})
rec := &SpanSetReplicaEvalContext{ec, *allSpans()}
ss := allSpans()
rec := &SpanSetReplicaEvalContext{
i: ec,
checker: *spanset.NewSpanChecker(ss),
}
pErr := checkIfTxnAborted(ctx, rec, tc.engine, txn)
if _, ok := pErr.GetDetail().(*kvpb.TransactionAbortedError); ok {
expected := txn.Clone()
Expand Down
86 changes: 45 additions & 41 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
// MVCCIterator wraps an storage.MVCCIterator and ensures that it can
// only be used to access spans in a SpanSet.
type MVCCIterator struct {
i storage.MVCCIterator
spans *SpanSet
i storage.MVCCIterator
checker *SpanChecker

// spansOnly controls whether or not timestamps associated with the
// spans are considered when ensuring access. If set to true,
Expand All @@ -48,13 +48,13 @@ var _ storage.MVCCIterator = &MVCCIterator{}
// iterator against the given SpanSet. Timestamps associated with the spans
// in the spanset are not considered, only the span boundaries are checked.
func NewIterator(iter storage.MVCCIterator, spans *SpanSet) *MVCCIterator {
return &MVCCIterator{i: iter, spans: spans, spansOnly: true}
return &MVCCIterator{i: iter, checker: NewSpanChecker(spans), spansOnly: true}
}

// NewIteratorAt constructs an iterator that verifies access of the underlying
// iterator against the given SpanSet at the given timestamp.
func NewIteratorAt(iter storage.MVCCIterator, spans *SpanSet, ts hlc.Timestamp) *MVCCIterator {
return &MVCCIterator{i: iter, spans: spans, ts: ts}
return &MVCCIterator{i: iter, checker: NewSpanChecker(spans), ts: ts}
}

// Close is part of the storage.MVCCIterator interface.
Expand Down Expand Up @@ -137,9 +137,9 @@ func (i *MVCCIterator) checkAllowed(span roachpb.Span, errIfDisallowed bool) {
func (i *MVCCIterator) checkAllowedValidPos(span roachpb.Span, errIfDisallowed bool) {
var err error
if i.spansOnly {
err = i.spans.CheckAllowed(SpanReadOnly, span)
err = i.checker.CheckAllowed(SpanReadOnly, span)
} else {
err = i.spans.CheckAllowedAt(SpanReadOnly, span, i.ts)
err = i.checker.CheckAllowedAt(SpanReadOnly, span, i.ts)
}
if errIfDisallowed {
i.err = err
Expand Down Expand Up @@ -213,11 +213,11 @@ func (i *MVCCIterator) FindSplitKey(
start, end, minSplitKey roachpb.Key, targetSize int64,
) (storage.MVCCKey, error) {
if i.spansOnly {
if err := i.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: start, EndKey: end}); err != nil {
if err := i.checker.CheckAllowed(SpanReadOnly, roachpb.Span{Key: start, EndKey: end}); err != nil {
return storage.MVCCKey{}, err
}
} else {
if err := i.spans.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: start, EndKey: end}, i.ts); err != nil {
if err := i.checker.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: start, EndKey: end}, i.ts); err != nil {
return storage.MVCCKey{}, err
}
}
Expand All @@ -242,8 +242,9 @@ func (i *MVCCIterator) UnsafeLazyValue() pebble.LazyValue {
// EngineIterator wraps a storage.EngineIterator and ensures that it can
// only be used to access spans in a SpanSet.
type EngineIterator struct {
i storage.EngineIterator
spans *SpanSet
i storage.EngineIterator
//spans *SpanSet
checker *SpanChecker
spansOnly bool
ts hlc.Timestamp
}
Expand All @@ -261,10 +262,10 @@ func (i *EngineIterator) SeekEngineKeyGE(key storage.EngineKey) (valid bool, err
}
if key.IsMVCCKey() && !i.spansOnly {
mvccKey, _ := key.ToMVCCKey()
if err := i.spans.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: mvccKey.Key}, i.ts); err != nil {
if err := i.checker.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: mvccKey.Key}, i.ts); err != nil {
return false, err
}
} else if err = i.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: key.Key}); err != nil {
} else if err = i.checker.CheckAllowed(SpanReadOnly, roachpb.Span{Key: key.Key}); err != nil {
return false, err
}
return valid, err
Expand All @@ -278,10 +279,10 @@ func (i *EngineIterator) SeekEngineKeyLT(key storage.EngineKey) (valid bool, err
}
if key.IsMVCCKey() && !i.spansOnly {
mvccKey, _ := key.ToMVCCKey()
if err := i.spans.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: mvccKey.Key}, i.ts); err != nil {
if err := i.checker.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: mvccKey.Key}, i.ts); err != nil {
return false, err
}
} else if err = i.spans.CheckAllowed(SpanReadOnly, roachpb.Span{EndKey: key.Key}); err != nil {
} else if err = i.checker.CheckAllowed(SpanReadOnly, roachpb.Span{EndKey: key.Key}); err != nil {
return false, err
}
return valid, err
Expand Down Expand Up @@ -313,7 +314,7 @@ func (i *EngineIterator) SeekEngineKeyGEWithLimit(
if state != pebble.IterValid {
return state, err
}
if err = i.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: key.Key}); err != nil {
if err = i.checker.CheckAllowed(SpanReadOnly, roachpb.Span{Key: key.Key}); err != nil {
return pebble.IterExhausted, err
}
return state, err
Expand All @@ -327,7 +328,7 @@ func (i *EngineIterator) SeekEngineKeyLTWithLimit(
if state != pebble.IterValid {
return state, err
}
if err = i.spans.CheckAllowed(SpanReadOnly, roachpb.Span{EndKey: key.Key}); err != nil {
if err = i.checker.CheckAllowed(SpanReadOnly, roachpb.Span{EndKey: key.Key}); err != nil {
return pebble.IterExhausted, err
}
return state, err
Expand All @@ -354,11 +355,11 @@ func (i *EngineIterator) checkKeyAllowed() (valid bool, err error) {
}
if key.IsMVCCKey() && !i.spansOnly {
mvccKey, _ := key.ToMVCCKey()
if err := i.spans.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: mvccKey.Key}, i.ts); err != nil {
if err := i.checker.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: mvccKey.Key}, i.ts); err != nil {
// Invalid, but no error.
return false, nil // nolint:returnerrcheck
}
} else if err = i.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: key.Key}); err != nil {
} else if err = i.checker.CheckAllowed(SpanReadOnly, roachpb.Span{Key: key.Key}); err != nil {
// Invalid, but no error.
return false, nil // nolint:returnerrcheck
}
Expand Down Expand Up @@ -431,8 +432,8 @@ func (i *EngineIterator) Stats() storage.IteratorStats {
}

type spanSetReader struct {
r storage.Reader
spans *SpanSet
r storage.Reader
checker *SpanChecker

spansOnly bool
ts hlc.Timestamp
Expand Down Expand Up @@ -469,11 +470,11 @@ func (s spanSetReader) MVCCIterate(
f func(storage.MVCCKeyValue, storage.MVCCRangeKeyStack) error,
) error {
if s.spansOnly {
if err := s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: start, EndKey: end}); err != nil {
if err := s.checker.CheckAllowed(SpanReadOnly, roachpb.Span{Key: start, EndKey: end}); err != nil {
return err
}
} else {
if err := s.spans.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: start, EndKey: end}, s.ts); err != nil {
if err := s.checker.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: start, EndKey: end}, s.ts); err != nil {
return err
}
}
Expand All @@ -488,9 +489,9 @@ func (s spanSetReader) NewMVCCIterator(
return nil, err
}
if s.spansOnly {
return NewIterator(mvccIter, s.spans), nil
return NewIterator(mvccIter, s.checker.spans), nil
}
return NewIteratorAt(mvccIter, s.spans, s.ts), nil
return NewIteratorAt(mvccIter, s.checker.spans, s.ts), nil
}

func (s spanSetReader) NewEngineIterator(
Expand All @@ -501,8 +502,9 @@ func (s spanSetReader) NewEngineIterator(
return nil, err
}
return &EngineIterator{
i: engineIter,
spans: s.spans,
i: engineIter,
//spans: s.checker.spans,
checker: s.checker,
spansOnly: s.spansOnly,
ts: s.ts,
}, nil
Expand All @@ -519,8 +521,8 @@ func (s spanSetReader) PinEngineStateForIterators(readCategory fs.ReadCategory)
}

type spanSetWriter struct {
w storage.Writer
spans *SpanSet
w storage.Writer
checker *SpanChecker

spansOnly bool
ts hlc.Timestamp
Expand All @@ -535,11 +537,11 @@ func (s spanSetWriter) ApplyBatchRepr(repr []byte, sync bool) error {

func (s spanSetWriter) checkAllowed(key roachpb.Key) error {
if s.spansOnly {
if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key}); err != nil {
if err := s.checker.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key}); err != nil {
return err
}
} else {
if err := s.spans.CheckAllowedAt(SpanReadWrite, roachpb.Span{Key: key}, s.ts); err != nil {
if err := s.checker.CheckAllowedAt(SpanReadWrite, roachpb.Span{Key: key}, s.ts); err != nil {
return err
}
}
Expand All @@ -561,7 +563,7 @@ func (s spanSetWriter) ClearUnversioned(key roachpb.Key, opts storage.ClearOptio
}

func (s spanSetWriter) ClearEngineKey(key storage.EngineKey, opts storage.ClearOptions) error {
if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil {
if err := s.checker.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil {
return err
}
return s.w.ClearEngineKey(key, opts)
Expand All @@ -575,11 +577,11 @@ func (s spanSetWriter) SingleClearEngineKey(key storage.EngineKey) error {

func (s spanSetWriter) checkAllowedRange(start, end roachpb.Key) error {
if s.spansOnly {
if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: start, EndKey: end}); err != nil {
if err := s.checker.CheckAllowed(SpanReadWrite, roachpb.Span{Key: start, EndKey: end}); err != nil {
return err
}
} else {
if err := s.spans.CheckAllowedAt(SpanReadWrite, roachpb.Span{Key: start, EndKey: end}, s.ts); err != nil {
if err := s.checker.CheckAllowedAt(SpanReadWrite, roachpb.Span{Key: start, EndKey: end}, s.ts); err != nil {
return err
}
}
Expand Down Expand Up @@ -661,11 +663,11 @@ func (s spanSetWriter) ClearMVCCRangeKey(rangeKey storage.MVCCRangeKey) error {

func (s spanSetWriter) Merge(key storage.MVCCKey, value []byte) error {
if s.spansOnly {
if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil {
if err := s.checker.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil {
return err
}
} else {
if err := s.spans.CheckAllowedAt(SpanReadWrite, roachpb.Span{Key: key.Key}, s.ts); err != nil {
if err := s.checker.CheckAllowedAt(SpanReadWrite, roachpb.Span{Key: key.Key}, s.ts); err != nil {
return err
}
}
Expand Down Expand Up @@ -697,7 +699,7 @@ func (s spanSetWriter) PutEngineKey(key storage.EngineKey, value []byte) error {
if !s.spansOnly {
panic("cannot do timestamp checking for putting EngineKey")
}
if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil {
if err := s.checker.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil {
return err
}
return s.w.PutEngineKey(key, value)
Expand Down Expand Up @@ -731,17 +733,19 @@ var _ storage.ReadWriter = ReadWriter{}

func makeSpanSetReadWriter(rw storage.ReadWriter, spans *SpanSet) ReadWriter {
spans = addLockTableSpans(spans)
checker := NewSpanChecker(spans)
return ReadWriter{
spanSetReader: spanSetReader{r: rw, spans: spans, spansOnly: true},
spanSetWriter: spanSetWriter{w: rw, spans: spans, spansOnly: true},
spanSetReader: spanSetReader{r: rw, checker: checker, spansOnly: true},
spanSetWriter: spanSetWriter{w: rw, checker: checker, spansOnly: true},
}
}

func makeSpanSetReadWriterAt(rw storage.ReadWriter, spans *SpanSet, ts hlc.Timestamp) ReadWriter {
spans = addLockTableSpans(spans)
checker := NewSpanChecker(spans)
return ReadWriter{
spanSetReader: spanSetReader{r: rw, spans: spans, ts: ts},
spanSetWriter: spanSetWriter{w: rw, spans: spans, ts: ts},
spanSetReader: spanSetReader{r: rw, checker: checker, ts: ts},
spanSetWriter: spanSetWriter{w: rw, checker: checker, ts: ts},
}
}

Expand All @@ -752,7 +756,7 @@ func makeSpanSetReadWriterAt(rw storage.ReadWriter, spans *SpanSet, ts hlc.Times
// NewReader clones and does not retain the provided span set.
func NewReader(r storage.Reader, spans *SpanSet, ts hlc.Timestamp) storage.Reader {
spans = addLockTableSpans(spans)
return spanSetReader{r: r, spans: spans, ts: ts}
return spanSetReader{r: r, checker: NewSpanChecker(spans), ts: ts}
}

// NewReadWriterAt returns a storage.ReadWriter that asserts access of the
Expand Down
Loading