-
Notifications
You must be signed in to change notification settings - Fork 21.9k
core/rawdb: implement sequential reads in freezer_table #23117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
441bdd4
d8a24b4
ff8267c
d6b40b8
821c696
4e5b6f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -70,6 +70,19 @@ func (i *indexEntry) marshallBinary() []byte { | |
| return b | ||
| } | ||
|
|
||
| // bounds returns the start- and end- offsets, and the file number of where to | ||
| // read there data item marked by the two index entries. The two entries are | ||
| // assumed to be sequential. | ||
| func (start *indexEntry) bounds(end *indexEntry) (startOffset, endOffset, fileId uint32) { | ||
| if start.filenum != end.filenum { | ||
| // If a piece of data 'crosses' a data-file, | ||
| // it's actually in one piece on the second data-file. | ||
| // We return a zero-indexEntry for the second file as start | ||
| return 0, end.offset, end.filenum | ||
| } | ||
| return start.offset, end.offset, end.filenum | ||
| } | ||
|
|
||
| // freezerTable represents a single chained data table within the freezer (e.g. blocks). | ||
| // It consists of a data file (snappy encoded arbitrary data blobs) and an indexEntry | ||
| // file (uncompressed 64 bit indices into the data file). | ||
|
|
@@ -546,84 +559,183 @@ func (t *freezerTable) append(item uint64, encodedBlob []byte, wlock bool) (bool | |
| return false, nil | ||
| } | ||
|
|
||
| // getBounds returns the indexes for the item | ||
| // returns start, end, filenumber and error | ||
| func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) { | ||
| buffer := make([]byte, indexEntrySize) | ||
| var startIdx, endIdx indexEntry | ||
| // Read second index | ||
| if _, err := t.index.ReadAt(buffer, int64((item+1)*indexEntrySize)); err != nil { | ||
| return 0, 0, 0, err | ||
| } | ||
| endIdx.unmarshalBinary(buffer) | ||
| // Read first index (unless it's the very first item) | ||
| if item != 0 { | ||
| if _, err := t.index.ReadAt(buffer, int64(item*indexEntrySize)); err != nil { | ||
| return 0, 0, 0, err | ||
| } | ||
| startIdx.unmarshalBinary(buffer) | ||
| } else { | ||
| // getIndices returns the index entries for the given from-item, covering 'count' items. | ||
| // N.B: The actual number of returned indices for N items will always be N+1 (unless an | ||
| // error is returned). | ||
| // OBS: This method assumes that the caller has already verified (and/or trimmed) the range | ||
| // so that the items are within bounds. If this method is used to read out of bounds, | ||
| // it will return error. | ||
| func (t *freezerTable) getIndices(from, count uint64) ([]*indexEntry, error) { | ||
| // Apply the table-offset | ||
| from = from - uint64(t.itemOffset) | ||
| // For reading N items, we need N+1 indices. | ||
| buffer := make([]byte, (count+1)*indexEntrySize) | ||
| if _, err := t.index.ReadAt(buffer, int64(from*indexEntrySize)); err != nil { | ||
| return nil, err | ||
| } | ||
| var ( | ||
| indices []*indexEntry | ||
| offset int | ||
| ) | ||
| for i := from; i <= from+count; i++ { | ||
| index := new(indexEntry) | ||
| index.unmarshalBinary(buffer[offset:]) | ||
| offset += indexEntrySize | ||
| indices = append(indices, index) | ||
| } | ||
| if from == 0 { | ||
| // Special case if we're reading the first item in the freezer. We assume that | ||
| // the first item always start from zero(regarding the deletion, we | ||
| // only support deletion by files, so that the assumption is held). | ||
| // This means we can use the first item metadata to carry information about | ||
| // the 'global' offset, for the deletion-case | ||
| return 0, endIdx.offset, endIdx.filenum, nil | ||
| indices[0].offset = 0 | ||
| indices[0].filenum = indices[1].filenum | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not really following. Isn't the offset already zero if it's the first item? I also don't understand why the file number is assigned?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alternatively, isn't this the case for all items that start a new file, not just
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or, is the point that "indices[0].filenum` might not exist at all (since it's a deleted file), vs. anything else exists and we can "read and see the data overflown"?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's more or less what the comment says above. So each index is a pair of However, the Therefore, back in the day, we decided to use this quirk to implement deletability. So if we want to delete the file I think you're right though, that we wouldn't really need to set the filenum. However, as you can see in the previous code, for item In this case, we want to return to the caller info about where to start reading, and where to stop. Therefore, doing this conversion here simplifies for the calling code, which can just treat the indices as direct data information without dealing with the intricacies of the data indexing scheme. |
||
| } | ||
| if startIdx.filenum != endIdx.filenum { | ||
| // If a piece of data 'crosses' a data-file, | ||
| // it's actually in one piece on the second data-file. | ||
| // We return a zero-indexEntry for the second file as start | ||
| return 0, endIdx.offset, endIdx.filenum, nil | ||
| } | ||
| return startIdx.offset, endIdx.offset, endIdx.filenum, nil | ||
| return indices, nil | ||
| } | ||
|
|
||
| // Retrieve looks up the data offset of an item with the given number and retrieves | ||
| // the raw binary blob from the data file. | ||
| func (t *freezerTable) Retrieve(item uint64) ([]byte, error) { | ||
| blob, err := t.retrieve(item) | ||
| items, err := t.RetrieveItems(item, 1, 0) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
holiman marked this conversation as resolved.
|
||
| if t.noCompression { | ||
| return blob, nil | ||
| return items[0], nil | ||
| } | ||
|
|
||
| // RetrieveItems returns multiple items in sequence, starting from the index 'start'. | ||
| // It will return at most 'max' items, but will abort earlier to respect the | ||
| // 'maxBytes' argument. However, if the 'maxBytes' is smaller than the size of one | ||
| // item, it _will_ return one element and possibly overflow the maxBytes. | ||
| func (t *freezerTable) RetrieveItems(start, count, maxBytes uint64) ([][]byte, error) { | ||
| // First we read the 'raw' data, which might be compressed. | ||
| diskData, sizes, err := t.retrieveItems(start, count, maxBytes) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return snappy.Decode(nil, blob) | ||
| var ( | ||
| output = make([][]byte, 0, count) | ||
| offset int // offset for reading | ||
| outputSize int // size of uncompressed data | ||
| ) | ||
| // Now slice up the data and decompress. | ||
| for i, diskSize := range sizes { | ||
| item := diskData[offset : offset+diskSize] | ||
| offset += diskSize | ||
| decompressedSize := diskSize | ||
| if !t.noCompression { | ||
| decompressedSize, _ = snappy.DecodedLen(item) | ||
| } | ||
| if i > 0 && uint64(outputSize+decompressedSize) > maxBytes { | ||
| break | ||
| } | ||
| if !t.noCompression { | ||
| data, err := snappy.Decode(nil, item) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
holiman marked this conversation as resolved.
|
||
| output = append(output, data) | ||
| } else { | ||
| output = append(output, item) | ||
| } | ||
| outputSize += decompressedSize | ||
| } | ||
| return output, nil | ||
| } | ||
|
|
||
| // retrieve looks up the data offset of an item with the given number and retrieves | ||
| // the raw binary blob from the data file. OBS! This method does not decode | ||
| // compressed data. | ||
| func (t *freezerTable) retrieve(item uint64) ([]byte, error) { | ||
| // retrieveItems reads up to 'count' items from the table. It reads at least | ||
| // one item, but otherwise avoids reading more than maxBytes bytes. | ||
| // It returns the (potentially compressed) data, and the sizes. | ||
| func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []int, error) { | ||
| t.lock.RLock() | ||
| defer t.lock.RUnlock() | ||
| // Ensure the table and the item is accessible | ||
| if t.index == nil || t.head == nil { | ||
| return nil, errClosed | ||
| return nil, nil, errClosed | ||
| } | ||
| if atomic.LoadUint64(&t.items) <= item { | ||
| return nil, errOutOfBounds | ||
| itemCount := atomic.LoadUint64(&t.items) // max number | ||
| // Ensure the start is written, not deleted from the tail, and that the | ||
| // caller actually wants something | ||
| if itemCount <= start || uint64(t.itemOffset) > start || count == 0 { | ||
| return nil, nil, errOutOfBounds | ||
| } | ||
| // Ensure the item was not deleted from the tail either | ||
| if uint64(t.itemOffset) > item { | ||
| return nil, errOutOfBounds | ||
| if start+count > itemCount { | ||
| count = itemCount - start | ||
| } | ||
| startOffset, endOffset, filenum, err := t.getBounds(item - uint64(t.itemOffset)) | ||
| if err != nil { | ||
| return nil, err | ||
| var ( | ||
| output = make([]byte, maxBytes) // Buffer to read data into | ||
| outputSize int // Used size of that buffer | ||
| ) | ||
| // readData is a helper method to read a single data item from disk. | ||
| readData := func(fileId, start uint32, length int) error { | ||
| // In case a small limit is used, and the elements are large, may need to | ||
| // realloc the read-buffer when reading the first (and only) item. | ||
| if len(output) < length { | ||
| output = make([]byte, length) | ||
| } | ||
| dataFile, exist := t.files[fileId] | ||
| if !exist { | ||
| return fmt.Errorf("missing data file %d", fileId) | ||
| } | ||
| if _, err := dataFile.ReadAt(output[outputSize:outputSize+length], int64(start)); err != nil { | ||
| return err | ||
| } | ||
| outputSize += length | ||
|
holiman marked this conversation as resolved.
|
||
| return nil | ||
| } | ||
| dataFile, exist := t.files[filenum] | ||
| if !exist { | ||
| return nil, fmt.Errorf("missing data file %d", filenum) | ||
| // Read all the indexes in one go | ||
| indices, err := t.getIndices(start, count) | ||
| if err != nil { | ||
| return nil, nil, err | ||
| } | ||
| // Retrieve the data itself, decompress and return | ||
| blob := make([]byte, endOffset-startOffset) | ||
| if _, err := dataFile.ReadAt(blob, int64(startOffset)); err != nil { | ||
| return nil, err | ||
| var ( | ||
| sizes []int // The sizes for each element | ||
| totalSize = 0 // The total size of all data read so far | ||
| readStart = indices[0].offset // Where, in the file, to start reading | ||
| unreadSize = 0 // The size of the as-yet-unread data | ||
| ) | ||
|
|
||
| for i, firstIndex := range indices[:len(indices)-1] { | ||
| secondIndex := indices[i+1] | ||
| // Determine the size of the item. | ||
| offset1, offset2, _ := firstIndex.bounds(secondIndex) | ||
| size := int(offset2 - offset1) | ||
| // Crossing a file boundary? | ||
| if secondIndex.filenum != firstIndex.filenum { | ||
| // If we have unread data in the first file, we need to do that read now. | ||
| if unreadSize > 0 { | ||
| if err := readData(firstIndex.filenum, readStart, unreadSize); err != nil { | ||
| return nil, nil, err | ||
| } | ||
| unreadSize = 0 | ||
| } | ||
| readStart = 0 | ||
| } | ||
| if i > 0 && uint64(totalSize+size) > maxBytes { | ||
| // About to break out due to byte limit being exceeded. We don't | ||
| // read this last item, but we need to do the deferred reads now. | ||
| if unreadSize > 0 { | ||
| if err := readData(secondIndex.filenum, readStart, unreadSize); err != nil { | ||
| return nil, nil, err | ||
| } | ||
| } | ||
| break | ||
|
holiman marked this conversation as resolved.
|
||
| } | ||
| // Defer the read for later | ||
| unreadSize += size | ||
| totalSize += size | ||
| sizes = append(sizes, size) | ||
| if i == len(indices)-2 || uint64(totalSize) > maxBytes { | ||
| // Last item, need to do the read now | ||
| if err := readData(secondIndex.filenum, readStart, unreadSize); err != nil { | ||
| return nil, nil, err | ||
| } | ||
| break | ||
| } | ||
| } | ||
| t.readMeter.Mark(int64(len(blob) + 2*indexEntrySize)) | ||
| return blob, nil | ||
| return output[:outputSize], sizes, nil | ||
| } | ||
|
|
||
| // has returns an indicator whether the specified number data | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.