Skip to content

Commit

Permalink
leveldb: fix table's data blocks' internal keys decoding
Browse files Browse the repository at this point in the history
In the LevelDB encoding, the internal key can be cut at any byte:
including the user_key, type, or sequence_number. The resulting prefix
is shared among subsequent keys and not specified explicitly by them.

This fixes a previous mistaken belief that cuts can't happen in the
last 8 bytes of the type & sequence number.

Tests are added.
  • Loading branch information
mikez committed Dec 8, 2023
1 parent 287ed36 commit 8665df5
Show file tree
Hide file tree
Showing 13 changed files with 5,764 additions and 57 deletions.
7 changes: 6 additions & 1 deletion format/leveldb/leveldb_descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,12 @@ func readTagInternalKey(name string, d *decode.D) {
d.FieldStruct(name, func(d *decode.D) {
length := d.FieldULEB128("length")
d.FieldStruct("data", func(d *decode.D) {
readInternalKey(int(length), d)
err := readInternalKey(nil, int(length), d)
if err != nil {
// TK(2023-12-08): how do I propagate this
// error `err` into the `d` object?
d.Errorf("Key read failure (size %d)", length)
}
})
})
}
224 changes: 178 additions & 46 deletions format/leveldb/leveldb_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package leveldb
import (
"bytes"
"embed"
"fmt"
"hash/crc32"

"github.com/golang/snappy"
Expand Down Expand Up @@ -43,8 +44,7 @@ const (
// echo http://code.google.com/p/leveldb/ | sha1sum
// https://github.com/google/leveldb/blob/main/table/format.h#L76
tableMagicNumber = 0xdb4775248b80fb57
uint32Size = 32
uint64Size = 64
uint32BitSize = 32
)

// https://github.com/google/leveldb/blob/main/include/leveldb/options.h#L25
Expand Down Expand Up @@ -104,29 +104,35 @@ func ldbTableDecode(d *decode.D) any {

d.SeekAbs(metaIndexOffset * 8)
var metaHandles []blockHandle
readTableBlock("metaindex", metaIndexSize, keyValueContentsReader(nil, func(size int, d *decode.D) {
// blockHandle
// https://github.com/google/leveldb/blob/main/table/format.cc#L24
handle := blockHandle{
offset: d.FieldULEB128("offset"),
size: d.FieldULEB128("size"),
}
metaHandles = append(metaHandles, handle)
}), d)
readTableBlock(
"metaindex",
metaIndexSize,
keyValueContentsReader(
nil,
func(d *decode.D) {
handle := readBlockHandle(d)
metaHandles = append(metaHandles, handle)
},
),
d,
)

// index

d.SeekAbs(indexOffset * 8)
var dataHandles []blockHandle
readTableBlock("index", indexSize, keyValueContentsReader(readInternalKey, func(size int, d *decode.D) {
// blockHandle
// https://github.com/google/leveldb/blob/main/table/format.cc#L24
handle := blockHandle{
offset: d.FieldULEB128("offset"),
size: d.FieldULEB128("size"),
}
dataHandles = append(dataHandles, handle)
}), d)
readTableBlock(
"index",
indexSize,
keyValueContentsReader(
readInternalKey,
func(d *decode.D) {
handle := readBlockHandle(d)
dataHandles = append(dataHandles, handle)
},
),
d,
)

// meta

Expand Down Expand Up @@ -161,7 +167,7 @@ func ldbTableDecode(d *decode.D) any {
// Readers

// Read block contents as well as compression + checksum bytes following it.
// The function `readTableBlockContents` gets the uncompressed bytebuffer.
// The function `readTableBlockContents` gets the _uncompressed_ bytebuffer.
// https://github.com/google/leveldb/blob/main/table/format.cc#L69
func readTableBlock(
name string,
Expand Down Expand Up @@ -216,21 +222,21 @@ func readTableBlock(

// Read content encoded as a sequence of key/value-entries and a trailer of restarts.
// https://github.com/google/leveldb/blob/main/table/block_builder.cc#L16
// https://github.com/google/leveldb/blob/main/table/block.cc
// https://github.com/google/leveldb/blob/main/table/block.cc#L48
func readKeyValueContents(
keyCallbackFn func(size int, d *decode.D),
valueCallbackFn func(size int, d *decode.D),
keyCallbackFn func(sharedBytes []byte, unsharedSize int, d *decode.D) error,
valueCallbackFn func(d *decode.D),
size int64,
d *decode.D,
) {
start := d.Pos()
end := start + size*8

var restartOffset int64
d.SeekAbs(end - uint32Size)
d.SeekAbs(end - uint32BitSize)
d.FieldStruct("trailer", func(d *decode.D) {
numRestarts := int64(d.FieldU32("num_restarts"))
restartOffset = size*8 - (1+numRestarts)*uint32Size
restartOffset = size*8 - (1+numRestarts)*uint32BitSize
d.SeekAbs(start + restartOffset)
d.FieldArray("restarts", func(d *decode.D) {
for i := 0; i < int(numRestarts); i++ {
Expand All @@ -245,51 +251,177 @@ func readKeyValueContents(
}
d.SeekAbs(start)
d.FieldArray("entries", func(d *decode.D) {
var lastKey []byte
for d.Pos() < start+restartOffset {
d.FieldStruct("entry", func(d *decode.D) {
d.FieldULEB128("shared_bytes")
unshared := int(d.FieldULEB128("unshared_bytes"))
valueLength := int(d.FieldULEB128("value_length"))
if keyCallbackFn == nil {
d.FieldUTF8("key", unshared)
// https://github.com/google/leveldb/blob/main/table/block.cc#L48-L75
shared := int64(d.FieldULEB128("shared_bytes"))
unshared := int64(d.FieldULEB128("unshared_bytes"))
valueLength := int64(d.FieldULEB128("value_length"))

// read key
// https://github.com/google/leveldb/blob/main/table/block.cc#L261
if int(shared) > len(lastKey) {
d.Fatalf("`shared` size is larger than `lastKey` length")
}
keyPrefix := lastKey[:shared]
keySuffix := readBytesWithoutChangingPosition(unshared, d)
lastKey = append(keyPrefix, keySuffix...)

if keyCallbackFn == nil && shared == 0 {
d.FieldUTF8("key", int(unshared))
} else {
d.FieldStruct("key", func(d *decode.D) {
keyCallbackFn(unshared, d)
if keyCallbackFn == nil {
keyCallbackFn = readPrefixedBytes
}
err := keyCallbackFn(keyPrefix, int(unshared), d)
if err != nil {
// TK(2023-12-08): how do I propagate this
// error `err` into the `d` object?
d.Errorf("Key read failure (size %d)", unshared)
}
})
}

// read value
if valueCallbackFn == nil {
d.FieldUTF8("value", valueLength)
d.FieldUTF8("value", int(valueLength))
} else {
d.FieldStruct("value", func(d *decode.D) {
valueCallbackFn(valueLength, d)
})
d.FieldStruct("value", valueCallbackFn)
}
})
}
})
}

func readInternalKey(bitSize int, d *decode.D) {
// InternalKey
// https://github.com/google/leveldb/blob/main/db/dbformat.h#L171
d.FieldUTF8("user_key", bitSize-uint64Size/8)
d.FieldU8("type", valueTypes, scalar.UintHex)
d.FieldU56("sequence_number")
func readBytesWithoutChangingPosition(nBytes int64, d *decode.D) []byte {
var result []byte
d.RangeFn(d.Pos(), nBytes*8, func(d *decode.D) {
br := d.RawLen(d.BitsLeft())
result = d.ReadAllBits(br)
})
return result
}

// https://github.com/google/leveldb/blob/main/table/format.cc#L24
func readBlockHandle(d *decode.D) blockHandle {
return blockHandle{
offset: d.FieldULEB128("offset"),
size: d.FieldULEB128("size"),
}
}

// Read bytes and prefix with given bytes;
// name read bytes "suffix" and the merged bytes "full" (synthetic field).
func readPrefixedBytes(prefixBytes []byte, nBytes int, d *decode.D) error {
br, err := d.TryFieldRawLen("suffix", int64(nBytes)*8)
if err != nil {
return err
}
full := append(prefixBytes, d.ReadAllBits(br)...)
d.FieldValueStr("full", string(full), strInferred)
return nil
}

// An "internal key" consists of the triple (user_key, type, sequence_number).
// https://github.com/google/leveldb/blob/main/db/dbformat.h#L171
func readInternalKey(sharedBytes []byte, unsharedSize int, d *decode.D) error {
// In the LevelDB encoding, the internal key can be cut at any byte:
// including the user_key, type, or sequence_number:
// https://github.com/google/leveldb/blob/main/table/block_builder.cc#L79-L83
//
// The resulting prefix is then shared among subsequent keys and not
// specified explicitly by them. Here, we handle each cutoff case.
//
// All sizes are in bytes unless mentioned otherwise.
keySize := len(sharedBytes) + unsharedSize
typeAndSequenceNumberSize := 8
// key
// +-----------------------------------------------+
// user_key
// +---------------------------------+
// ⁞ user_key_suffix type sequence_number
// [AAAAAAAAAAAA]⁞[BBBBBBBBBBBBBBBBBB] [T] [SSSSSSS]
// ⁞ 1 7 bytes
// +------------+⁞+--------------------------------+
// shared ⁞ unshared
// ⁞
// cutoff
if keySize < typeAndSequenceNumberSize || int64(unsharedSize) > d.BitsLeft()/8 {
return fmt.Errorf("invalid key size")
}

// case 1: user_key, type, and sequence_number fit fully in unshared.
if len(sharedBytes) == 0 {
d.FieldUTF8("user_key", keySize-typeAndSequenceNumberSize)
d.FieldU8("type", valueTypes, scalar.UintHex)
d.FieldU56("sequence_number")
return nil
}

// case 2: type and sequence_number fit fully in unshared: simulate user_key value.
if unsharedSize >= typeAndSequenceNumberSize {
br := d.FieldRawLen("user_key_suffix", int64(unsharedSize-typeAndSequenceNumberSize)*8)
d.FieldValueStr("user_key", string(append(sharedBytes, d.ReadAllBits(br)...)), strInferred)
d.FieldU8("type", valueTypes, scalar.UintHex)
d.FieldU56("sequence_number")
return nil
}

// case 3: sequence_number fits fully in unshared: simulate user_key and type value,
sequenceNumberSize := typeAndSequenceNumberSize - 1
if unsharedSize == sequenceNumberSize {
lastIndex := len(sharedBytes) - 1
d.FieldValueStr("user_key", string(sharedBytes[:lastIndex]), strInferred)
d.FieldValueUint("type", uint64(sharedBytes[lastIndex]), valueTypes, scalar.UintHex, uintInferred)
d.FieldU56("sequence_number")
return nil
}

// case 4: sequence_number cut: simulate user_key, type, and sequence_number value.
typeByteIndex := keySize - typeAndSequenceNumberSize
d.FieldValueStr("user_key", string(sharedBytes[:typeByteIndex]), strInferred)
d.FieldValueUint("type", uint64(sharedBytes[typeByteIndex]), valueTypes, scalar.UintHex, uintInferred)
var suffixBytes []byte
if unsharedSize > 0 {
br := d.FieldRawLen("suffix", int64(unsharedSize)*8)
suffixBytes = d.ReadAllBits(br)
}
sequenceNumberBytes := append(
sharedBytes[typeByteIndex+1:keySize-unsharedSize],
suffixBytes...,
)
sequenceNumberBE := bitio.Read64(sequenceNumberBytes[:], 0, int64(sequenceNumberSize*8))
sequenceNumberLE := bitio.ReverseBytes64(56, sequenceNumberBE)
d.FieldValueUint("sequence_number", sequenceNumberLE, uintInferred)

return nil
}

// Read content encoded in the "filter" or "stats" Meta Block format.
// https://github.com/google/leveldb/blob/main/doc/table_format.md#filter-meta-block
// https://github.com/google/leveldb/blob/main/table/filter_block.cc
func readMetaContent(size int64, d *decode.D) {
func readMetaContent(nBytes int64, d *decode.D) {
// TK(2023-12-04)
d.FieldRawLen("raw", size*8)
d.FieldRawLen("raw", nBytes*8)
}

// Helpers

var strInferred = scalar.StrFn(func(s scalar.Str) (scalar.Str, error) {
s.Description = "inferred"
return s, nil
})

var uintInferred = scalar.UintFn(func(s scalar.Uint) (scalar.Uint, error) {
s.Description = "inferred"
return s, nil
})

func keyValueContentsReader(
keyCallbackFn func(size int, d *decode.D),
valueCallbackFn func(size int, d *decode.D),
keyCallbackFn func(sharedPrefix []byte, unsharedSize int, d *decode.D) error,
valueCallbackFn func(d *decode.D),
) func(size int64, d *decode.D) {
return func(size int64, d *decode.D) {
readKeyValueContents(keyCallbackFn, valueCallbackFn, size, d)
Expand Down
Loading

0 comments on commit 8665df5

Please sign in to comment.