Skip to content

Commit

Permalink
Revert "Revert "fix review comments""
Browse files Browse the repository at this point in the history
This reverts commit ac6d2c0.
  • Loading branch information
Ibrahim Jarif committed Mar 5, 2020
1 parent c6cd35f commit 10e0c2b
Showing 1 changed file with 36 additions and 24 deletions.
60 changes: 36 additions & 24 deletions table/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ import (
const (
KB = 1024
MB = KB * 1024

// When a block is encrypted, it's length increases. We add 200 bytes of padding to
// handle cases when block size increases. This is an approximate number.
padding = 200
)

type header struct {
Expand Down Expand Up @@ -72,7 +76,7 @@ type Builder struct {
// Typically tens or hundreds of meg. This is for one single file.
buf []byte
sz int
bufLock sync.Mutex
bufLock sync.Mutex // This lock guards the buf. We acquire lock when we resize the buf.

baseKey []byte // Base key for the current block.
baseOffset uint32 // Offset for the current block.
Expand All @@ -90,9 +94,9 @@ type Builder struct {
// NewTableBuilder makes a new TableBuilder.
func NewTableBuilder(opts Options) *Builder {
b := &Builder{
// Additional 2 MB to store index (approximate).
// Additional 5 MB to store index (approximate).
// We trim the additional space in table.Finish().
buf: make([]byte, opts.TableSize+MB*2),
buf: make([]byte, opts.TableSize+5*MB),
tableIndex: &pb.TableIndex{},
keyHashes: make([]uint64, 0, 1024), // Avoid some malloc calls.
opt: &opts,
Expand Down Expand Up @@ -125,7 +129,7 @@ var slicePool = sync.Pool{
func (b *Builder) handleBlock() {
defer b.wg.Done()
for item := range b.blockChan {
// Extract the item
// Extract the block.
blockBuf := item.data[item.start:item.end]
var dst *[]byte
// Compress the block.
Expand All @@ -144,17 +148,27 @@ func (b *Builder) handleBlock() {
blockBuf = eBlock
}

// The newend should always be less than or equal to the original end
// plus the padding. If the new end is greater than item.end+padding
// that means the data from this block cannot be stored in its existing
// location and trying to copy it over would mean we would over-write
// some data of the next block.
y.AssertTruef(uint32(len(blockBuf)) <= item.end+padding,
"newend: %d item.end: %d padding: %d", len(blockBuf), item.end, padding)

// Acquire the buflock here. The builder.grow function might change
// the b.buf while this goroutine was running.
b.bufLock.Lock()
// Copy over compressed/encrypted data back to the main buffer.
copy(b.buf[item.start:], blockBuf)
b.bufLock.Unlock()

// Fix the boundary of the block.
item.end = item.start + uint32(len(blockBuf))

if dst != nil {
slicePool.Put(dst)
}

newend := item.start + uint32(len(blockBuf))
item.end = newend
}
}

Expand Down Expand Up @@ -202,9 +216,8 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) {
b.append(h.Encode())
b.append(diffKey)

// Continue growing until we have enough space.
for uint32(len(b.buf)) < uint32(b.sz)+v.EncodedSize() {
b.grow()
if uint32(len(b.buf)) < uint32(b.sz)+v.EncodedSize() {
b.grow(int(v.EncodedSize()))
}
b.sz += v.Encode(b.buf[b.sz:])

Expand All @@ -214,29 +227,31 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) {
b.tableIndex.EstimatedSize += (sstSz + vpLen)
}

// grow increases the size of b.buf by 50%.
func (b *Builder) grow() {
// grow increases the size of b.buf by atleast 50%.
func (b *Builder) grow(n int) {
if n < len(b.buf)/2 {
n = len(b.buf) / 2
}
b.bufLock.Lock()
newBuf := make([]byte, len(b.buf)+len(b.buf)/2)
newBuf := make([]byte, len(b.buf)+n)
copy(newBuf, b.buf)
b.buf = newBuf
b.bufLock.Unlock()
}
func (b *Builder) append(data []byte) {
// Continue growing until we have enough space.
for len(b.buf) < b.sz+len(data) {
b.grow()
// Ensure we have enough space to store new data.
if len(b.buf) < b.sz+len(data) {
b.grow(len(data))
}
copy(b.buf[b.sz:], data)
b.sz += len(data)
}

func (b *Builder) addPadding(sz int) {
b.sz += sz
// Continue growing until we have enough space.
for len(b.buf) < b.sz {
b.grow()
if len(b.buf) < b.sz+sz {
b.grow(sz)
}
b.sz += sz
}

/*
Expand Down Expand Up @@ -264,9 +279,6 @@ func (b *Builder) finishBlock() {
return
}

// When a block is encrypted, it's length increases. We add 200 bytes to padding to
// handle cases when block size increases.
padding := 200
b.addPadding(padding)

// Block end is the actual end of the block ignoring the padding.
Expand Down Expand Up @@ -379,7 +391,7 @@ func (b *Builder) Finish() []byte {
if len(b.blockList) > 0 {
start := uint32(0)
for i, bl := range b.blockList {
// Length of the block is start minues the end.
// Length of the block is start minus the end.
b.tableIndex.Offsets[i].Len = bl.end - bl.start
b.tableIndex.Offsets[i].Offset = start

Expand Down

0 comments on commit 10e0c2b

Please sign in to comment.