Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions docvalues.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type docValueReader struct {
curChunkHeader []MetaData
curChunkData []byte // compressed data cache
uncompressed []byte // temp buf for snappy decompression
bytesRead uint64
}

func (di *docValueReader) size() int {
Expand Down Expand Up @@ -96,6 +97,10 @@ func (s *SegmentBase) loadFieldDocValueReader(field string,
chunkOffsetsLen := binary.BigEndian.Uint64(s.mem[fieldDvLocEnd-16 : fieldDvLocEnd-8])
// acquire position of chunk offsets
chunkOffsetsPosition = (fieldDvLocEnd - 16) - chunkOffsetsLen

// 16 bytes since it corresponds to the length
// of chunk offsets and the position of the offsets
s.bytesRead += uint64(16)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some commentary here on why 16 here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

} else {
return nil, fmt.Errorf("loadFieldDocValueReader: fieldDvLoc too small: %d-%d", fieldDvLocEnd, fieldDvLocStart)
}
Expand All @@ -116,13 +121,28 @@ func (s *SegmentBase) loadFieldDocValueReader(field string,
fdvIter.chunkOffsets[i] = loc
offset += uint64(read)
}

s.bytesRead += offset
// set the data offset
fdvIter.dvDataLoc = fieldDvLocStart

return fdvIter, nil
}

// Implements the segment.DiskStatsReporter interface
// The purpose of this implementation is to get
// the bytes read from the disk (pertaining to the
// docvalues) while querying.
// the loadDvChunk retrieves the next chunk of docvalues
// and the bytes retrieved off the disk pertaining to that
// is accounted as well.
func (di *docValueReader) BytesRead() uint64 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commentary indicating that these methods are an interface implementation and the reference to that interface here would be useful.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

return di.bytesRead
}

func (di *docValueReader) SetBytesRead(val uint64) {
di.bytesRead = val
}

func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *SegmentBase) error {
// advance to the chunk where the docValues
// reside for the given docNum
Expand All @@ -145,7 +165,7 @@ func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *SegmentBase) error
return fmt.Errorf("failed to read the chunk")
}
chunkMetaLoc := destChunkDataLoc + uint64(read)

di.bytesRead += uint64(read)
offset := uint64(0)
if cap(di.curChunkHeader) < int(numDocs) {
di.curChunkHeader = make([]MetaData, int(numDocs))
Expand All @@ -161,6 +181,7 @@ func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *SegmentBase) error

compressedDataLoc := chunkMetaLoc + offset
dataLength := curChunkEnd - compressedDataLoc
di.bytesRead += uint64(dataLength + offset)
di.curChunkData = s.mem[compressedDataLoc : compressedDataLoc+dataLength]
di.curChunkNum = chunkNumber
di.uncompressed = di.uncompressed[:0]
Expand Down Expand Up @@ -295,6 +316,7 @@ func (s *SegmentBase) VisitDocValues(localDocNum uint64, fields []string,
if err != nil {
return dvs, err
}
s.bytesRead += dvr.BytesRead()
}

_ = dvr.visitDocValues(localDocNum, visitor)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/RoaringBitmap/roaring v0.9.4
github.com/blevesearch/bleve_index_api v1.0.1
github.com/blevesearch/mmap-go v1.0.4
github.com/blevesearch/scorch_segment_api/v2 v2.1.0
github.com/blevesearch/scorch_segment_api/v2 v2.1.1
github.com/blevesearch/vellum v1.0.8
github.com/golang/snappy v0.0.1
github.com/spf13/cobra v0.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ github.com/blevesearch/bleve_index_api v1.0.1 h1:nx9++0hnyiGOHJwQQYfsUGzpRdEVE5L
github.com/blevesearch/bleve_index_api v1.0.1/go.mod h1:fiwKS0xLEm+gBRgv5mumf0dhgFr2mDgZah1pqv1c1M4=
github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCDPWmc=
github.com/blevesearch/mmap-go v1.0.4/go.mod h1:EWmEAOmdAS9z/pi/+Toxu99DnsbhG1TIxUoRmJw/pSs=
github.com/blevesearch/scorch_segment_api/v2 v2.1.0 h1:NFwteOpZEvJk5Vg0H6gD0hxupsG3JYocE4DBvsA2GZI=
github.com/blevesearch/scorch_segment_api/v2 v2.1.0/go.mod h1:uch7xyyO/Alxkuxa+CGs79vw0QY8BENSBjg6Mw5L5DE=
github.com/blevesearch/scorch_segment_api/v2 v2.1.1 h1:J8UDudUpDJz21d/hCMIshCeRordwnDTftgXcSDMUx40=
github.com/blevesearch/scorch_segment_api/v2 v2.1.1/go.mod h1:uch7xyyO/Alxkuxa+CGs79vw0QY8BENSBjg6Mw5L5DE=
github.com/blevesearch/vellum v1.0.8 h1:iMGh4lfxza4BnWO/UJTMPlI3HsK9YawjPv+TteVa9ck=
github.com/blevesearch/vellum v1.0.8/go.mod h1:+cpRi/tqq49xUYSQN2P7A5zNSNrS+MscLeeaZ3J46UA=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
Expand Down
13 changes: 13 additions & 0 deletions intDecoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type chunkedIntDecoder struct {
curChunkBytes []byte
data []byte
r *memUvarintReader
numBytesRead uint64
}

// newChunkedIntDecoder expects an optional or reset chunkedIntDecoder for better reuse.
Expand Down Expand Up @@ -55,10 +56,20 @@ func newChunkedIntDecoder(buf []byte, offset uint64, rv *chunkedIntDecoder) *chu
rv.chunkOffsets[i], read = binary.Uvarint(buf[offset+n : offset+n+binary.MaxVarintLen64])
n += uint64(read)
}
rv.numBytesRead += n
rv.dataStartOffset = offset + n
return rv
}

// A util function which fetches the query time
// specific bytes encoded by intcoder (for eg the
// freqNorm and location details of a term in document)
// the loadChunk retrieves the next chunk and the
// number of bytes retrieve in that operation is accounted
func (d *chunkedIntDecoder) bytesRead() uint64 {
return d.numBytesRead
}

func (d *chunkedIntDecoder) loadChunk(chunk int) error {
if d.startOffset == termNotEncoded {
d.r = newMemUvarintReader([]byte(nil))
Expand All @@ -75,6 +86,7 @@ func (d *chunkedIntDecoder) loadChunk(chunk int) error {
start += s
end += e
d.curChunkBytes = d.data[start:end]
d.numBytesRead += uint64(len(d.curChunkBytes))
if d.r == nil {
d.r = newMemUvarintReader(d.curChunkBytes)
} else {
Expand All @@ -89,6 +101,7 @@ func (d *chunkedIntDecoder) reset() {
d.dataStartOffset = 0
d.chunkOffsets = d.chunkOffsets[:0]
d.curChunkBytes = d.curChunkBytes[:0]
d.numBytesRead = 0
d.data = d.data[:0]
if d.r != nil {
d.r.Reset([]byte(nil))
Expand Down
40 changes: 40 additions & 0 deletions posting.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type PostingsList struct {
normBits1Hit uint64

chunkSize uint64
bytesRead uint64
}

// represents an immutable, empty postings list
Expand Down Expand Up @@ -208,11 +209,13 @@ func (p *PostingsList) iterator(includeFreq, includeNorm, includeLocs bool,
// initialize freq chunk reader
if rv.includeFreqNorm {
rv.freqNormReader = newChunkedIntDecoder(p.sb.mem, p.freqOffset, rv.freqNormReader)
rv.bytesRead += rv.freqNormReader.bytesRead()
}

// initialize the loc chunk reader
if rv.includeLocs {
rv.locReader = newChunkedIntDecoder(p.sb.mem, p.locOffset, rv.locReader)
rv.bytesRead += rv.locReader.bytesRead()
}

rv.all = p.postings.Iterator()
Expand Down Expand Up @@ -244,6 +247,18 @@ func (p *PostingsList) Count() uint64 {
return n - e
}

// Implements the segment.DiskStatsReporter interface
// The purpose of this implementation is to get
// the bytes read from the postings lists stored
// on disk, while querying
func (p *PostingsList) SetBytesRead(val uint64) {
p.bytesRead = val
}

func (p *PostingsList) BytesRead() uint64 {
return p.bytesRead
}

func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error {
rv.postingsOffset = postingsOffset

Expand All @@ -268,6 +283,8 @@ func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error {

roaringBytes := d.sb.mem[postingsOffset+n : postingsOffset+n+postingsLen]

rv.bytesRead += (n + postingsLen)

if rv.postings == nil {
rv.postings = roaring.NewBitmap()
}
Expand Down Expand Up @@ -316,6 +333,8 @@ type PostingsIterator struct {

includeFreqNorm bool
includeLocs bool

bytesRead uint64
}

var emptyPostingsIterator = &PostingsIterator{}
Expand All @@ -331,19 +350,40 @@ func (i *PostingsIterator) Size() int {
return sizeInBytes
}

// Implements the segment.DiskStatsReporter interface
// The purpose of this implementation is to get
// the bytes read from the disk which includes
// the freqNorm and location specific information
// of a hit
func (i *PostingsIterator) SetBytesRead(val uint64) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commentary for the interface implementations..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

i.bytesRead = val
}

func (i *PostingsIterator) BytesRead() uint64 {
return i.bytesRead
}

func (i *PostingsIterator) loadChunk(chunk int) error {
if i.includeFreqNorm {
err := i.freqNormReader.loadChunk(chunk)
if err != nil {
return err
}

// assign the bytes read at this point, since
// the postingsIterator is tracking only the chunk loaded
// and the cumulation is tracked correctly in the downstream
// intDecoder
i.bytesRead = i.freqNormReader.bytesRead()

}

if i.includeLocs {
err := i.locReader.loadChunk(chunk)
if err != nil {
return err
}
i.bytesRead = i.locReader.bytesRead()
}

i.currChunk = uint32(chunk)
Expand Down
4 changes: 4 additions & 0 deletions read.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ func (s *SegmentBase) getDocStoredMetaAndCompressed(docNum uint64) ([]byte, []by
meta := s.mem[storedOffset+n : storedOffset+n+metaLen]
data := s.mem[storedOffset+n+metaLen : storedOffset+n+metaLen+dataLen]

s.bytesRead += (metaLen + dataLen)

return meta, data
}

Expand All @@ -39,5 +41,7 @@ func (s *SegmentBase) getDocStoredOffsets(docNum uint64) (
dataLen, read := binary.Uvarint(s.mem[storedOffset+n : storedOffset+n+binary.MaxVarintLen64])
n += uint64(read)

s.bytesRead += n

return indexOffset, storedOffset, n, metaLen, dataLen
}
24 changes: 24 additions & 0 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type SegmentBase struct {
fieldDvReaders map[uint16]*docValueReader // naive chunk cache per field
fieldDvNames []string // field names cached in fieldDvReaders
size uint64
bytesRead uint64

m sync.Mutex
fieldFSTs map[uint16]*vellum.FST
Expand Down Expand Up @@ -210,9 +211,26 @@ func (s *Segment) loadConfig() error {

numDocsOffset := storedIndexOffset - 8
s.numDocs = binary.BigEndian.Uint64(s.mm[numDocsOffset : numDocsOffset+8])

// 8*4 + 4*3 = 44 bytes being accounted from all the offsets
// above being read from the file
s.bytesRead += 44
return nil
}

// Implements the segment.DiskStatsReporter interface
// Only the persistedSegment type implments the
// interface, as the intention is to retrieve the bytes
// read from the on-disk segment as part of the current
// query.
func (s *Segment) SetBytesRead(val uint64) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I'm looking at this - feels a bit distasteful to allow another library to just overwrite this parameter.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you @abhinavdangeti,
@Thejas-bhat ,

Why do we need this API? This would be inherent task within the merge process.
Meaning the the merge process would take care of tracking the prev_bytes from segments and carrying it further to the newly formed segment.

Wouldn't that suffice the requirement?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm.. it may not possible without making disk format changes.

s.SegmentBase.bytesRead = val
}

func (s *Segment) BytesRead() uint64 {
return s.bytesRead + s.SegmentBase.bytesRead
}

func (s *SegmentBase) loadFields() error {
// NOTE for now we assume the fields index immediately precedes
// the footer, and if this changes, need to adjust accordingly (or
Expand All @@ -224,6 +242,9 @@ func (s *SegmentBase) loadFields() error {
for s.fieldsIndexOffset+(8*fieldID) < fieldsIndexEnd {
addr := binary.BigEndian.Uint64(s.mem[s.fieldsIndexOffset+(8*fieldID) : s.fieldsIndexOffset+(8*fieldID)+8])

// accounting the address of the dictLoc being read from file
s.bytesRead += 8

dictLoc, read := binary.Uvarint(s.mem[addr:fieldsIndexEnd])
n := uint64(read)
s.dictLocs = append(s.dictLocs, dictLoc)
Expand All @@ -233,6 +254,7 @@ func (s *SegmentBase) loadFields() error {
n += uint64(read)

name := string(s.mem[addr+n : addr+n+nameLen])
s.bytesRead += (n + nameLen)
s.fieldsInv = append(s.fieldsInv, name)
s.fieldsMap[name] = uint16(fieldID + 1)

Expand Down Expand Up @@ -267,6 +289,7 @@ func (sb *SegmentBase) dictionary(field string) (rv *Dictionary, err error) {
// read the length of the vellum data
vellumLen, read := binary.Uvarint(sb.mem[dictStart : dictStart+binary.MaxVarintLen64])
fstBytes := sb.mem[dictStart+uint64(read) : dictStart+uint64(read)+vellumLen]
sb.bytesRead += (uint64(read) + vellumLen)
rv.fst, err = vellum.Load(fstBytes)
if err != nil {
sb.m.Unlock()
Expand Down Expand Up @@ -556,6 +579,7 @@ func (s *SegmentBase) loadDvReaders() error {
}
read += uint64(n)

s.bytesRead += read
fieldDvReader, err := s.loadFieldDocValueReader(field, fieldLocStart, fieldLocEnd)
if err != nil {
return err
Expand Down