From 760343feb7b5d3c552655656f92aff5d5c306616 Mon Sep 17 00:00:00 2001 From: Thejas-bhat Date: Tue, 12 Jul 2022 21:36:53 +0530 Subject: [PATCH] Includes accounting of bytes read during query time from the persisted zap files --- docvalues.go | 26 ++++++++++++++++++++++++-- go.mod | 2 +- go.sum | 4 ++-- intDecoder.go | 13 +++++++++++++ posting.go | 40 ++++++++++++++++++++++++++++++++++++++++ read.go | 4 ++++ segment.go | 24 ++++++++++++++++++++++++ 7 files changed, 108 insertions(+), 5 deletions(-) diff --git a/docvalues.go b/docvalues.go index a530aa5a..6aa1cc2f 100644 --- a/docvalues.go +++ b/docvalues.go @@ -49,6 +49,7 @@ type docValueReader struct { curChunkHeader []MetaData curChunkData []byte // compressed data cache uncompressed []byte // temp buf for snappy decompression + bytesRead uint64 } func (di *docValueReader) size() int { @@ -96,6 +97,10 @@ func (s *SegmentBase) loadFieldDocValueReader(field string, chunkOffsetsLen := binary.BigEndian.Uint64(s.mem[fieldDvLocEnd-16 : fieldDvLocEnd-8]) // acquire position of chunk offsets chunkOffsetsPosition = (fieldDvLocEnd - 16) - chunkOffsetsLen + + // 16 bytes since it corresponds to the length + // of chunk offsets and the position of the offsets + s.bytesRead += uint64(16) } else { return nil, fmt.Errorf("loadFieldDocValueReader: fieldDvLoc too small: %d-%d", fieldDvLocEnd, fieldDvLocStart) } @@ -116,13 +121,28 @@ func (s *SegmentBase) loadFieldDocValueReader(field string, fdvIter.chunkOffsets[i] = loc offset += uint64(read) } - + s.bytesRead += offset // set the data offset fdvIter.dvDataLoc = fieldDvLocStart return fdvIter, nil } +// Implements the segment.DiskStatsReporter interface +// The purpose of this implementation is to get +// the bytes read from the disk (pertaining to the +// docvalues) while querying. +// the loadDvChunk retrieves the next chunk of docvalues +// and the bytes retrieved off the disk pertaining to that +// is accounted as well. +func (di *docValueReader) BytesRead() uint64 { + return di.bytesRead +} + +func (di *docValueReader) SetBytesRead(val uint64) { + di.bytesRead = val +} + func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *SegmentBase) error { // advance to the chunk where the docValues // reside for the given docNum @@ -145,7 +165,7 @@ func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *SegmentBase) error return fmt.Errorf("failed to read the chunk") } chunkMetaLoc := destChunkDataLoc + uint64(read) - + di.bytesRead += uint64(read) offset := uint64(0) if cap(di.curChunkHeader) < int(numDocs) { di.curChunkHeader = make([]MetaData, int(numDocs)) @@ -161,6 +181,7 @@ func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *SegmentBase) error compressedDataLoc := chunkMetaLoc + offset dataLength := curChunkEnd - compressedDataLoc + di.bytesRead += uint64(dataLength + offset) di.curChunkData = s.mem[compressedDataLoc : compressedDataLoc+dataLength] di.curChunkNum = chunkNumber di.uncompressed = di.uncompressed[:0] @@ -295,6 +316,7 @@ func (s *SegmentBase) VisitDocValues(localDocNum uint64, fields []string, if err != nil { return dvs, err } + s.bytesRead += dvr.BytesRead() } _ = dvr.visitDocValues(localDocNum, visitor) diff --git a/go.mod b/go.mod index 6988efd6..e95e33a7 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/RoaringBitmap/roaring v0.9.4 github.com/blevesearch/bleve_index_api v1.0.1 github.com/blevesearch/mmap-go v1.0.4 - github.com/blevesearch/scorch_segment_api/v2 v2.1.0 + github.com/blevesearch/scorch_segment_api/v2 v2.1.1 github.com/blevesearch/vellum v1.0.8 github.com/golang/snappy v0.0.1 github.com/spf13/cobra v0.0.5 diff --git a/go.sum b/go.sum index 01c1dad0..2da6f6a8 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,8 @@ github.com/blevesearch/bleve_index_api v1.0.1 h1:nx9++0hnyiGOHJwQQYfsUGzpRdEVE5L github.com/blevesearch/bleve_index_api v1.0.1/go.mod h1:fiwKS0xLEm+gBRgv5mumf0dhgFr2mDgZah1pqv1c1M4= github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCDPWmc= github.com/blevesearch/mmap-go v1.0.4/go.mod h1:EWmEAOmdAS9z/pi/+Toxu99DnsbhG1TIxUoRmJw/pSs= -github.com/blevesearch/scorch_segment_api/v2 v2.1.0 h1:NFwteOpZEvJk5Vg0H6gD0hxupsG3JYocE4DBvsA2GZI= -github.com/blevesearch/scorch_segment_api/v2 v2.1.0/go.mod h1:uch7xyyO/Alxkuxa+CGs79vw0QY8BENSBjg6Mw5L5DE= +github.com/blevesearch/scorch_segment_api/v2 v2.1.1 h1:J8UDudUpDJz21d/hCMIshCeRordwnDTftgXcSDMUx40= +github.com/blevesearch/scorch_segment_api/v2 v2.1.1/go.mod h1:uch7xyyO/Alxkuxa+CGs79vw0QY8BENSBjg6Mw5L5DE= github.com/blevesearch/vellum v1.0.8 h1:iMGh4lfxza4BnWO/UJTMPlI3HsK9YawjPv+TteVa9ck= github.com/blevesearch/vellum v1.0.8/go.mod h1:+cpRi/tqq49xUYSQN2P7A5zNSNrS+MscLeeaZ3J46UA= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= diff --git a/intDecoder.go b/intDecoder.go index 2f777fc9..cc38bdad 100644 --- a/intDecoder.go +++ b/intDecoder.go @@ -26,6 +26,7 @@ type chunkedIntDecoder struct { curChunkBytes []byte data []byte r *memUvarintReader + numBytesRead uint64 } // newChunkedIntDecoder expects an optional or reset chunkedIntDecoder for better reuse. @@ -55,10 +56,20 @@ func newChunkedIntDecoder(buf []byte, offset uint64, rv *chunkedIntDecoder) *chu rv.chunkOffsets[i], read = binary.Uvarint(buf[offset+n : offset+n+binary.MaxVarintLen64]) n += uint64(read) } + rv.numBytesRead += n rv.dataStartOffset = offset + n return rv } +// A util function which fetches the query time +// specific bytes encoded by intcoder (for eg the +// freqNorm and location details of a term in document) +// the loadChunk retrieves the next chunk and the +// number of bytes retrieve in that operation is accounted +func (d *chunkedIntDecoder) bytesRead() uint64 { + return d.numBytesRead +} + func (d *chunkedIntDecoder) loadChunk(chunk int) error { if d.startOffset == termNotEncoded { d.r = newMemUvarintReader([]byte(nil)) @@ -75,6 +86,7 @@ func (d *chunkedIntDecoder) loadChunk(chunk int) error { start += s end += e d.curChunkBytes = d.data[start:end] + d.numBytesRead += uint64(len(d.curChunkBytes)) if d.r == nil { d.r = newMemUvarintReader(d.curChunkBytes) } else { @@ -89,6 +101,7 @@ func (d *chunkedIntDecoder) reset() { d.dataStartOffset = 0 d.chunkOffsets = d.chunkOffsets[:0] d.curChunkBytes = d.curChunkBytes[:0] + d.numBytesRead = 0 d.data = d.data[:0] if d.r != nil { d.r.Reset([]byte(nil)) diff --git a/posting.go b/posting.go index 484bd0e1..c05aa232 100644 --- a/posting.go +++ b/posting.go @@ -108,6 +108,7 @@ type PostingsList struct { normBits1Hit uint64 chunkSize uint64 + bytesRead uint64 } // represents an immutable, empty postings list @@ -208,11 +209,13 @@ func (p *PostingsList) iterator(includeFreq, includeNorm, includeLocs bool, // initialize freq chunk reader if rv.includeFreqNorm { rv.freqNormReader = newChunkedIntDecoder(p.sb.mem, p.freqOffset, rv.freqNormReader) + rv.bytesRead += rv.freqNormReader.bytesRead() } // initialize the loc chunk reader if rv.includeLocs { rv.locReader = newChunkedIntDecoder(p.sb.mem, p.locOffset, rv.locReader) + rv.bytesRead += rv.locReader.bytesRead() } rv.all = p.postings.Iterator() @@ -244,6 +247,18 @@ func (p *PostingsList) Count() uint64 { return n - e } +// Implements the segment.DiskStatsReporter interface +// The purpose of this implementation is to get +// the bytes read from the postings lists stored +// on disk, while querying +func (p *PostingsList) SetBytesRead(val uint64) { + p.bytesRead = val +} + +func (p *PostingsList) BytesRead() uint64 { + return p.bytesRead +} + func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error { rv.postingsOffset = postingsOffset @@ -268,6 +283,8 @@ func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error { roaringBytes := d.sb.mem[postingsOffset+n : postingsOffset+n+postingsLen] + rv.bytesRead += (n + postingsLen) + if rv.postings == nil { rv.postings = roaring.NewBitmap() } @@ -316,6 +333,8 @@ type PostingsIterator struct { includeFreqNorm bool includeLocs bool + + bytesRead uint64 } var emptyPostingsIterator = &PostingsIterator{} @@ -331,12 +350,32 @@ func (i *PostingsIterator) Size() int { return sizeInBytes } +// Implements the segment.DiskStatsReporter interface +// The purpose of this implementation is to get +// the bytes read from the disk which includes +// the freqNorm and location specific information +// of a hit +func (i *PostingsIterator) SetBytesRead(val uint64) { + i.bytesRead = val +} + +func (i *PostingsIterator) BytesRead() uint64 { + return i.bytesRead +} + func (i *PostingsIterator) loadChunk(chunk int) error { if i.includeFreqNorm { err := i.freqNormReader.loadChunk(chunk) if err != nil { return err } + + // assign the bytes read at this point, since + // the postingsIterator is tracking only the chunk loaded + // and the cumulation is tracked correctly in the downstream + // intDecoder + i.bytesRead = i.freqNormReader.bytesRead() + } if i.includeLocs { @@ -344,6 +383,7 @@ func (i *PostingsIterator) loadChunk(chunk int) error { if err != nil { return err } + i.bytesRead = i.locReader.bytesRead() } i.currChunk = uint32(chunk) diff --git a/read.go b/read.go index e47d4c6a..1db0bab0 100644 --- a/read.go +++ b/read.go @@ -22,6 +22,8 @@ func (s *SegmentBase) getDocStoredMetaAndCompressed(docNum uint64) ([]byte, []by meta := s.mem[storedOffset+n : storedOffset+n+metaLen] data := s.mem[storedOffset+n+metaLen : storedOffset+n+metaLen+dataLen] + s.bytesRead += (metaLen + dataLen) + return meta, data } @@ -39,5 +41,7 @@ func (s *SegmentBase) getDocStoredOffsets(docNum uint64) ( dataLen, read := binary.Uvarint(s.mem[storedOffset+n : storedOffset+n+binary.MaxVarintLen64]) n += uint64(read) + s.bytesRead += n + return indexOffset, storedOffset, n, metaLen, dataLen } diff --git a/segment.go b/segment.go index bc29f3f4..04c697e5 100644 --- a/segment.go +++ b/segment.go @@ -101,6 +101,7 @@ type SegmentBase struct { fieldDvReaders map[uint16]*docValueReader // naive chunk cache per field fieldDvNames []string // field names cached in fieldDvReaders size uint64 + bytesRead uint64 m sync.Mutex fieldFSTs map[uint16]*vellum.FST @@ -210,9 +211,26 @@ func (s *Segment) loadConfig() error { numDocsOffset := storedIndexOffset - 8 s.numDocs = binary.BigEndian.Uint64(s.mm[numDocsOffset : numDocsOffset+8]) + + // 8*4 + 4*3 = 44 bytes being accounted from all the offsets + // above being read from the file + s.bytesRead += 44 return nil } +// Implements the segment.DiskStatsReporter interface +// Only the persistedSegment type implments the +// interface, as the intention is to retrieve the bytes +// read from the on-disk segment as part of the current +// query. +func (s *Segment) SetBytesRead(val uint64) { + s.SegmentBase.bytesRead = val +} + +func (s *Segment) BytesRead() uint64 { + return s.bytesRead + s.SegmentBase.bytesRead +} + func (s *SegmentBase) loadFields() error { // NOTE for now we assume the fields index immediately precedes // the footer, and if this changes, need to adjust accordingly (or @@ -224,6 +242,9 @@ func (s *SegmentBase) loadFields() error { for s.fieldsIndexOffset+(8*fieldID) < fieldsIndexEnd { addr := binary.BigEndian.Uint64(s.mem[s.fieldsIndexOffset+(8*fieldID) : s.fieldsIndexOffset+(8*fieldID)+8]) + // accounting the address of the dictLoc being read from file + s.bytesRead += 8 + dictLoc, read := binary.Uvarint(s.mem[addr:fieldsIndexEnd]) n := uint64(read) s.dictLocs = append(s.dictLocs, dictLoc) @@ -233,6 +254,7 @@ func (s *SegmentBase) loadFields() error { n += uint64(read) name := string(s.mem[addr+n : addr+n+nameLen]) + s.bytesRead += (n + nameLen) s.fieldsInv = append(s.fieldsInv, name) s.fieldsMap[name] = uint16(fieldID + 1) @@ -267,6 +289,7 @@ func (sb *SegmentBase) dictionary(field string) (rv *Dictionary, err error) { // read the length of the vellum data vellumLen, read := binary.Uvarint(sb.mem[dictStart : dictStart+binary.MaxVarintLen64]) fstBytes := sb.mem[dictStart+uint64(read) : dictStart+uint64(read)+vellumLen] + sb.bytesRead += (uint64(read) + vellumLen) rv.fst, err = vellum.Load(fstBytes) if err != nil { sb.m.Unlock() @@ -556,6 +579,7 @@ func (s *SegmentBase) loadDvReaders() error { } read += uint64(n) + s.bytesRead += read fieldDvReader, err := s.loadFieldDocValueReader(field, fieldLocStart, fieldLocEnd) if err != nil { return err