Skip to content

Commit

Permalink
sstable: adapt EstimateDiskUsage to be agnostic to index block impl
Browse files Browse the repository at this point in the history
Adapt Reader.EstimateDiskUsage to be generic over the index block
iterator type.
  • Loading branch information
jbowens committed Sep 7, 2024
1 parent da5055a commit 376d455
Showing 1 changed file with 30 additions and 35 deletions.
65 changes: 30 additions & 35 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,12 @@ func (r *Reader) CommonProperties() *CommonProperties {
// data blocks overlapped and add that same fraction of the metadata blocks to the
// estimate.
func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
return estimateDiskUsage[rowblk.IndexIter, *rowblk.IndexIter](r, start, end)
}

func estimateDiskUsage[I any, PI indexBlockIterator[I]](
r *Reader, start, end []byte,
) (uint64, error) {
if r.err != nil {
return 0, r.err
}
Expand All @@ -717,57 +723,51 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
// Iterators over the bottom-level index blocks containing start and end.
// These may be different in case of partitioned index but will both point
// to the same blockIter over the single index in the unpartitioned case.
var startIdxIter, endIdxIter *rowblk.Iter
var startIdxIter, endIdxIter PI
if r.Properties.IndexPartitions == 0 {
iter, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
if err != nil {
startIdxIter = new(I)
if err := startIdxIter.InitHandle(r.Compare, r.Split, indexH, NoTransforms); err != nil {
return 0, err
}
startIdxIter = iter
endIdxIter = iter
endIdxIter = startIdxIter
} else {
topIter, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
if err != nil {
var topIter PI = new(I)
if err := topIter.InitHandle(r.Compare, r.Split, indexH, NoTransforms); err != nil {
return 0, err
}

kv := topIter.SeekGE(start, base.SeekGEFlagsNone)
if kv == nil {
// The range falls completely after this file, or an error occurred.
return 0, topIter.Error()
if !topIter.SeekGE(start) {
// The range falls completely after this file.
return 0, nil
}
startIdxBH, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
startIndexBH, err := topIter.BlockHandleWithProperties()
if err != nil {
return 0, errCorruptIndexEntry(err)
}
startIdxBlock, err := r.readBlock(context.Background(), startIdxBH.Handle,
startIdxBlock, err := r.readBlock(context.Background(), startIndexBH.Handle,
nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
if err != nil {
return 0, err
}
defer startIdxBlock.Release()
startIdxIter, err = rowblk.NewIter(r.Compare, r.Split, startIdxBlock.Get(), NoTransforms)
startIdxIter = new(I)
err = startIdxIter.InitHandle(r.Compare, r.Split, startIdxBlock, NoTransforms)
if err != nil {
return 0, err
}

kv = topIter.SeekGE(end, base.SeekGEFlagsNone)
if kv == nil {
if err := topIter.Error(); err != nil {
return 0, err
}
} else {
endIdxBH, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
if topIter.SeekGE(end) {
endIndexBH, err := topIter.BlockHandleWithProperties()
if err != nil {
return 0, errCorruptIndexEntry(err)
}
endIdxBlock, err := r.readBlock(context.Background(),
endIdxBH.Handle, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
endIndexBH.Handle, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
if err != nil {
return 0, err
}
defer endIdxBlock.Release()
endIdxIter, err = rowblk.NewIter(r.Compare, r.Split, endIdxBlock.Get(), NoTransforms)
endIdxIter = new(I)
err = endIdxIter.InitHandle(r.Compare, r.Split, endIdxBlock, NoTransforms)
if err != nil {
return 0, err
}
Expand All @@ -776,12 +776,11 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
// startIdxIter should not be nil at this point, while endIdxIter can be if the
// range spans past the end of the file.

kv := startIdxIter.SeekGE(start, base.SeekGEFlagsNone)
if kv == nil {
// The range falls completely after this file, or an error occurred.
return 0, startIdxIter.Error()
if !startIdxIter.SeekGE(start) {
// The range falls completely after this file.
return 0, nil
}
startBH, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
startBH, err := startIdxIter.BlockHandleWithProperties()
if err != nil {
return 0, errCorruptIndexEntry(err)
}
Expand All @@ -804,15 +803,11 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
// The range spans beyond this file. Include data blocks through the last.
return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
}
kv = endIdxIter.SeekGE(end, base.SeekGEFlagsNone)
if kv == nil {
if err := endIdxIter.Error(); err != nil {
return 0, err
}
if !endIdxIter.SeekGE(end) {
// The range spans beyond this file. Include data blocks through the last.
return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
}
endBH, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
endBH, err := endIdxIter.BlockHandleWithProperties()
if err != nil {
return 0, errCorruptIndexEntry(err)
}
Expand Down

0 comments on commit 376d455

Please sign in to comment.